1.字段是sbn,me,NodeB,cellID,都是int类型,time是long类型
2.100个计数器,分别是C1-C100,其中C1-C50是求sum,C51-C80是求avg,C81-C90是求max,C91-C100是求min
3.sendEvent(),发送事件1W条
4.汇总,time_batch(1 min)
5.updateListener 输出
实体类:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter @Setter
public class CounterInfo {
private int sbn;
private int me;
private int nodeB;
private int cellId;
private long time;
}
updateListener监听:
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
//EsCounter esCounter = new EsCounter();
if(newEvents != null){
int sumC1 = (Integer) newEvents[0].get("C1");
Double avgC2 = (Double) newEvents[0].get("C2");
int maxC3 = (Integer) newEvents[0].get("C3");
int minC4 = (Integer) newEvents[0].get("C4");
System.out.println("C1 Sum is:"+sumC1);
System.out.println("C2 Avg is:"+avgC2);
System.out.println("C3 Max is:"+maxC3);
System.out.println("C4 Max is:"+minC4);
}
main方法测试类:
public static void main(String[] args) {
EPServiceProvider provider = EPServiceProviderManager.getDefaultProvider();
EPAdministrator administrator =provider.getEPAdministrator();
EPRuntime runtime = provider.getEPRuntime();
String esCounter = EsCounter.class.getName();
String counterInfo = CounterInfo.class.getName();
Map<String, Object> counterMap = null;
String epl ="select sbn,me,nodeB,cellID,sum(C1) as C1,avg(C2) as C2,max(C3) as C3,min(C4) as C4 from "+ counterInfo +".win:time_batch(1 min) group by sbn,me,nodeB,cellID";
/*String epl ="select sum(C1) as C1,avg(C2) as C2,max(C3) as C3,min(C4) as C4 "
+ "from "+counterInfo+".win:time_batch(1 min)";*/
EPStatement statement = administrator.createEPL(epl);
statement.addListener(new CounterListener());
for (int i = 1; i < 10000; i++) {
counterMap = new HashMap<String, Object>();
counterMap.put("C" + (new Random().nextInt(100)+1), new Random().nextInt(6)+1);
System.out.println("send event"+i+":"+counterMap);
runtime.sendEvent(counterMap);
}
}
循环的1万条事件放入到100个计数器里面。
有会的大神帮忙回复下,非常感谢