DataStream<ItemViewCount> windowAggStream = dataStream
.filter(data -> "pv".equals(data.getBehavior())) // 过滤pv行为
.keyBy("itemId") // 按商品ID分组
.timeWindow(Time.hours(1), Time.minutes(5)) // 开滑窗
.aggregate(new ItemCountAgg(), new WindowItemCountResult());
public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
Long itemId = tuple.getField(0);
Long windowEnd = window.getEnd();
Long count = input.iterator().next();
out.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
上面是截取的代码,运行报错:
Error:(62, 17) java: 对于aggregate(com.zqs.flink.project.hotitemanalysis.HotItems.ItemCountAgg,com.zqs.flink.project.hotitemanalysis.HotItems.WindowItemCountResult), 找不到合适的方法
方法 org.apache.flink.streaming.api.datastream.WindowedStream.<ACC,R>aggregate(org.apache.flink.api.common.functions.AggregateFunction<com.zqs.flink.project.hotitemanalysis.beans.UserBehavior,ACC,R>)不适用
(无法推断类型变量 ACC,R
(实际参数列表和形式参数列表长度不同))