哪位大佬帮我看看这个程序可以吗?
运用storm流对年龄信息进行分区统计。要求对海量年龄信息,分别统计出0~20岁,21~60岁,61~100岁的人的数量并显示个人年龄。
public class AgeSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
//定义发射器
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("age"));
}//声明输出字段名称
public void nextTuple() {
int num = new Random().nextInt(100);
this.collector.emit(new Values(num)); //把数据发射出去
try {
Thread.sleep(1000); //间隔一秒发射一次
}catch(InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void open(Map conf, TopologyContext arg1, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
}//对定义的发射器进行赋初值
}
public class Bolt20 extends BaseRichBolt {
OutputCollector collector;
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
int num=input.getIntegerByField("age");
int count=0;
if(num<=20) {
count++;
}
System.out.print("total:"+count+"0~20age"+num);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collecto) {
// TODO Auto-generated method stub
this.collector=collector;
}//初始化方法
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
//无输出内容
}
}
public class Bolt60 extends BaseRichBolt{
OutputCollector collector;
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
int num=input.getIntegerByField("age");
int count=0;
if(num>=21 && num<=60) {
count++;
}
System.out.print("total:"+count+"21~60age"+num);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
// TODO Auto-generated method stub
this.collector=collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}
public class Bolt100 extends BaseRichBolt {
OutputCollector collector;
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
int num=input.getIntegerByField("age");
int count=0;
if(num>=61 && num<=100) {
count++;
}
System.out.print("total:"+count+"61~100age"+num);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
// TODO Auto-generated method stub
this.collector=collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}
public class AgeTopology {
public static void main(String[] args) {
Config conf = new Config();
AgeSpout aspout = new AgeSpout();
Bolt20 bolt1 = new Bolt20();
Bolt60 bolt2 = new Bolt60();
Bolt100 bolt3 = new Bolt100();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("aspout", aspout);
//builder.setBolt("stbolt", stbolt).globalGrouping("aspout");
builder.setBolt("bolt1", bolt1).globalGrouping("aspout","020");60");
builder.setBolt("bolt2", bolt2).globalGrouping("aspout","21
builder.setBolt("bolt3", bolt3).globalGrouping("aspout","61~100");
LocalCluster cluster = new LocalCluster();
StormTopology topology = builder.createTopology();
cluster.submitTopology("agetopology", conf, topology);
}
}