ucyuujin
2018-11-30 03:07
浏览 337

storm的executor、task、spout、bolt的关系

  public static class WordCountBolt extends BaseRichBolt{
        OutputCollector _collector;
        Map<String, Integer> counts = new HashMap<>();

        @Override
        public void prepare(Map conf,TopologyContext context,OutputCollector collector){
            _collector=collector;
        }

        @Override
        public void execute(Tuple input){
            // 接收一个单词
            String word = input.getString(0);
            // 获取该单词对应的计数
            Integer count = counts.get(word);

            // 计数增加
            if(count == null) {
                count = 0;
            } else {
                count++;
            }

            // 将单词和对应的计数加入map中
            counts.put(word, count);
            System.out.println(word + ":" + count);

            // 发送单词和计数(分别对应字段word和count)
            _collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){
            // 定义一个字段
            declarer.declare(new Fields("word","count"));
        }
    }

    /**
     * 主函数,创建topology
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //创建一个Topology
        TopologyBuilder builder=new TopologyBuilder();
        // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5
        builder.setSpout("spout", new WordCountSpout(), 5);
        // 设置分词Bolt,并行度为8,它的数据来源是spout的
        builder.setBolt("split", new WordSplitBolt(), 8).shuffleGrouping("spout");
        // 设置计数Bolt,你并行度为12,它的数据来源是split的word字段
        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(false);

        conf.setNumWorkers(3);
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }

spout和bolt的executor设置为5,task为10,那实际运行的时候,它是会产生多少个实例?
如果是多例的话,在WordCountBolt中的成员变量counts不是会发生并发问题吗?

相关推荐 更多相似问题