在windows上单机测试storm的词频统计,zookeeper版本是apache-zookeeper-3.6.1-bin,storm版本是apache-storm-2.4.0
测试代码一直没有打印结果,而且控制台信息一直在输出,求看看是哪里出了问题

storm的配置如下:
storm.zookeeper.servers:
- "localhost"
ui.port: 9090
ui.host: "0.0.0.0"
nimbus.seeds: ["localhost"]
storm.local.dir: "D:/apache-storm-2.4.0/data"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.zookeeper.port: 2181
stormUI界面如下,提交项目拓扑的时候未能显示

文件树如下

nimbus.log
2025-06-07 13:13:06.999 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:17.016 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:27.031 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:37.045 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:47.063 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:57.077 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:07.091 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:17.117 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:27.131 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
supervisor.log
2025-06-07 13:14:04.953 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 0 [INFO] Starting cleanup
2025-06-07 13:14:04.962 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 0 [INFO] Finish cleanup
2025-06-07 13:14:04.973 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:14:04.973 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.
2025-06-07 13:14:34.941 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 1 [INFO] Starting cleanup
2025-06-07 13:14:34.950 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 1 [INFO] Finish cleanup
2025-06-07 13:14:34.974 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:14:34.974 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.
2025-06-07 13:15:04.939 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 2 [INFO] Starting cleanup
2025-06-07 13:15:04.947 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 2 [INFO] Finish cleanup
2025-06-07 13:15:04.974 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:15:04.974 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.
RandomSentenceSpout.Java
public class RandomSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;
private String[] sentences = {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"storm is awesome"
};
private int maxSentences = 3; // 限制总发送条数
private int sentCount = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
@Override
public void nextTuple() {
if (sentCount < maxSentences) {
String sentence = sentences[random.nextInt(sentences.length)];
collector.emit(new Values(sentence));
sentCount++;
Utils.sleep(100); // 控制发送速度(100ms/条)
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
SplitSentenceBolt.Java
public class SplitSentenceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCountBolt.java
public class WordCountBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
counts.put(word, counts.getOrDefault(word, 0) + 1);
System.out.println(word + ":************************************************************** " + counts.get(word));
collector.emit(new Values(word, counts.get(word)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
WordCountTopology.java
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 1).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 1).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
// 集群模式
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
// 本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
}
}