我有一个任务,每个5分钟定时存储数据库。最后一个时间窗体不够5分钟,不会触发窗体结束事件,用什么方法存库?
结果是只存了前面够的,最后末尾的没有存库。
我有一个任务,每个5分钟定时存储数据库。最后一个时间窗体不够5分钟,不会触发窗体结束事件,用什么方法存库?
结果是只存了前面够的,最后末尾的没有存库。
可以使用windowAll算子,它能够等待时间窗口结束或者数据数量累计到一定程度之后触发窗口结束事件。如果你希望在时间窗口结束之后执行存储操作,可以将其设置为以时间窗口结束触发的。
你可以使用如下代码来实现:
stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new ProcessAllWindowFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>, TimeWindow>() {
@Override
public void process(Context context, Iterable<Tuple2<String, Long>> iterable, Collector<List<Tuple2<String, Long>>> collector) throws Exception {
// 存储数据库操作
}
});
当时间窗口结束时,上面的process函数就会被调用,这个时候你就可以对窗口内的数据进行存储操作。
另外,如果你希望在窗口结束之后能获取到最后一个时间窗口数据,可以使用allowedLateness()方法设置延迟时间。
stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.seconds(30)) // 设置延迟时间
.process(new ProcessAllWindowFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>, TimeWindow>() {
@Override
public void process(Context context, Iterable<Tuple2<String, Long>> iterable, Collector<List<Tuple2<String, Long>>> collector) throws Exception {……