ucyuujin 2018-11-30 03:07 采纳率: 0%
浏览 339

storm的executor、task、spout、bolt的关系

  public static class WordCountBolt extends BaseRichBolt{
        OutputCollector _collector;
        Map<String, Integer> counts = new HashMap<>();

        @Override
        public void prepare(Map conf,TopologyContext context,OutputCollector collector){
            _collector=collector;
        }

        @Override
        public void execute(Tuple input){
            // 接收一个单词
            String word = input.getString(0);
            // 获取该单词对应的计数
            Integer count = counts.get(word);

            // 计数增加
            if(count == null) {
                count = 0;
            } else {
                count++;
            }

            // 将单词和对应的计数加入map中
            counts.put(word, count);
            System.out.println(word + ":" + count);

            // 发送单词和计数(分别对应字段word和count)
            _collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            // 定义一个字段
            declarer.declare(new Fields("word","count"));
        }
    }

    /**
     * 主函数,创建topology
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //创建一个Topology
        TopologyBuilder builder=new TopologyBuilder();
        // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5
        builder.setSpout("spout", new WordCountSpout(), 5);
        // 设置分词Bolt,并行度为8,它的数据来源是spout的
        builder.setBolt("split", new WordSplitBolt(), 8).shuffleGrouping("spout");
        // 设置计数Bolt,你并行度为12,它的数据来源是split的word字段
        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(false);

        conf.setNumWorkers(3);
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }

spout和bolt的executor设置为5,task为10,那实际运行的时候,它是会产生多少个实例?
如果是多例的话,在WordCountBolt中的成员变量counts不是会发生并发问题吗?

  • 写回答

1条回答 默认 最新

  • lshen01 2023-03-15 18:36
    关注

    参考GPT和自己的思路:

    根据给出的代码和设置,spout和bolt的executor数量都是5,而task数量是10,每个executor会运行2个task,因此实际运行时会有10个实例。每个实例都是在独立的worker进程中运行,它们之间是并行的。

    在WordCountBolt中,counts确实会发生并发问题。因为在多个task同时执行execute方法时,它们会同时访问counts变量,可能出现同时对同一个单词做计数的情况。为了解决这个问题,可以考虑将counts变量放在一个支持并发访问的数据结构中,例如ConcurrentHashMap。这样就可以避免并发问题。

    评论

报告相同问题?

悬赏问题

  • ¥15 高德地图点聚合中Marker的位置无法实时更新
  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办