2301_79388189 2025-06-07 13:23 采纳率: 0%
浏览 11

windows单机storm词频统计任务失败

在windows上单机测试storm的词频统计,zookeeper版本是apache-zookeeper-3.6.1-bin,storm版本是apache-storm-2.4.0

测试代码一直没有打印结果,而且控制台信息一直在输出,求看看是哪里出了问题

img

storm的配置如下:

storm.zookeeper.servers:
    - "localhost"
ui.port: 9090               
ui.host: "0.0.0.0"         
nimbus.seeds: ["localhost"]
storm.local.dir: "D:/apache-storm-2.4.0/data"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
storm.zookeeper.port: 2181

stormUI界面如下,提交项目拓扑的时候未能显示

img

文件树如下

img

nimbus.log

2025-06-07 13:13:06.999 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:17.016 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:27.031 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:37.045 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:47.063 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:13:57.077 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:07.091 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:17.117 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.
2025-06-07 13:14:27.131 o.a.s.s.b.BlacklistScheduler timer [INFO] Supervisors [] are blacklisted.

supervisor.log

2025-06-07 13:14:04.953 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 0 [INFO] Starting cleanup
2025-06-07 13:14:04.962 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 0 [INFO] Finish cleanup
2025-06-07 13:14:04.973 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:14:04.973 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.
2025-06-07 13:14:34.941 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 1 [INFO] Starting cleanup
2025-06-07 13:14:34.950 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 1 [INFO] Finish cleanup
2025-06-07 13:14:34.974 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:14:34.974 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.
2025-06-07 13:15:04.939 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 2 [INFO] Starting cleanup
2025-06-07 13:15:04.947 o.a.s.l.AsyncLocalizer AsyncLocalizer Task Executor - 2 [INFO] Finish cleanup
2025-06-07 13:15:04.974 o.a.s.d.s.t.SupervisorHealthCheck EventTimer [INFO] Running supervisor healthchecks...
2025-06-07 13:15:04.974 o.a.s.h.HealthChecker EventTimer [INFO] The supervisor healthchecks succeeded.

RandomSentenceSpout.Java

public class RandomSentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private Random random;
    private String[] sentences = {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "storm is awesome"
    };
    private int maxSentences = 3; // 限制总发送条数
    private int sentCount = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.random = new Random();
    }

    @Override
    public void nextTuple() {
        if (sentCount < maxSentences) {
            String sentence = sentences[random.nextInt(sentences.length)];
            collector.emit(new Values(sentence));
            sentCount++;
            Utils.sleep(100); // 控制发送速度(100ms/条)
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}

SplitSentenceBolt.Java

public class SplitSentenceBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCountBolt.java

public class WordCountBolt extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String word = tuple.getString(0);
        counts.put(word, counts.getOrDefault(word, 0) + 1);
        System.out.println(word + ":************************************************************** " + counts.get(word));
        collector.emit(new Values(word, counts.get(word)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

WordCountTopology.java

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 1);
        builder.setBolt("split", new SplitSentenceBolt(), 1).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 1).fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            // 集群模式
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

        } else {
            // 本地模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(30000);
            cluster.shutdown();
        }
    }
}
  • 写回答

6条回答 默认 最新

  • 阿里嘎多学长 2025-06-07 13:30
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    问题解答

    你在 Windows 上单机测试 Storm 的词频统计任务失败了,ZooKeeper 版本是 3.6.1,Storm 版本是 2.4.0。测试代码一直没有打印。

    可能的解决方案:

    1. 检查 ZooKeeper 和 Storm 的安装是否正确,确保它们可以正常运行。
    2. 检查 Storm 的配置文件 storm.yaml,确保配置正确,例如 topology 的名称、ZooKeeper 的连接信息等。
    3. 检查代码中的错误,例如 topology 的定义是否正确,是否有语法错误等。
    4. 使用 Storm 的日志工具 storm log 查看日志,了解错误的原因。

    以下是一个简单的 Storm 词频统计任务的示例代码:

    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.TopologyContext;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class WordCountTopology {
        public static class WordCountSpout extends BaseRichSpout {
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
    
            @Override
            public void nextTuple() {
                // 生成词频数据
                String word = "hello";
                emit(new Values(word));
            }
        }
    
        public static class WordCountBolt extends BaseRichBolt {
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
    
            @Override
            public void execute(Tuple input) {
                String word = input.getString(0);
                int count = 1;
                // 统计词频
                System.out.println(word + ": " + count);
                collector.emit(new Values(word, count));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Config config = new Config();
            config.setNumWorkers(2);
    
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("word-count-spout", new WordCountSpout());
            builder.setBolt("word-count-bolt", new WordCountBolt()).shuffleGrouping("word-count-spout");
    
            StormSubmitter.submitTopology("word-count", config, builder.createTopology());
        }
    }
    

    这个示例代码定义了一个词频统计任务,使用 WordCountSpout 生成词频数据,然后使用 WordCountBolt 统计词频。

    评论

报告相同问题?

问题事件

  • 创建了问题 6月7日