public static class WordCountBolt extends BaseRichBolt{
OutputCollector _collector;
Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map conf,TopologyContext context,OutputCollector collector){
_collector=collector;
}
@Override
public void execute(Tuple input){
// 接收一个单词
String word = input.getString(0);
// 获取该单词对应的计数
Integer count = counts.get(word);
// 计数增加
if(count == null) {
count = 0;
} else {
count++;
}
// 将单词和对应的计数加入map中
counts.put(word, count);
System.out.println(word + ":" + count);
// 发送单词和计数(分别对应字段word和count)
_collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
// 定义一个字段
declarer.declare(new Fields("word","count"));
}
}
/**
* 主函数,创建topology
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
//创建一个Topology
TopologyBuilder builder=new TopologyBuilder();
// 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5
builder.setSpout("spout", new WordCountSpout(), 5);
// 设置分词Bolt,并行度为8,它的数据来源是spout的
builder.setBolt("split", new WordSplitBolt(), 8).shuffleGrouping("spout");
// 设置计数Bolt,你并行度为12,它的数据来源是split的word字段
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
spout和bolt的executor设置为5,task为10,那实际运行的时候,它是会产生多少个实例?
如果是多例的话,在WordCountBolt中的成员变量counts不是会发生并发问题吗?