yassee 2022-03-17 15:26 采纳率: 0%
浏览 11

请教windowAll时间窗口问题

我有一个任务,每个5分钟定时存储数据库。最后一个时间窗体不够5分钟,不会触发窗体结束事件,用什么方法存库?

结果是只存了前面够的,最后末尾的没有存库。

 

  • 写回答

1条回答 默认 最新

  • m0_54204465 2023-01-16 10:22
    关注

    可以使用windowAll算子,它能够等待时间窗口结束或者数据数量累计到一定程度之后触发窗口结束事件。如果你希望在时间窗口结束之后执行存储操作,可以将其设置为以时间窗口结束触发的。
    你可以使用如下代码来实现:

    stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
        .process(new ProcessAllWindowFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>, TimeWindow>() {
            @Override
            public void process(Context context, Iterable<Tuple2<String, Long>> iterable, Collector<List<Tuple2<String, Long>>> collector) throws Exception {
                // 存储数据库操作
            }
        });
    
    

    当时间窗口结束时,上面的process函数就会被调用,这个时候你就可以对窗口内的数据进行存储操作。

    另外,如果你希望在窗口结束之后能获取到最后一个时间窗口数据,可以使用allowedLateness()方法设置延迟时间。

    stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
        .allowedLateness(Time.seconds(30)) // 设置延迟时间
        .process(new ProcessAllWindowFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>, TimeWindow>() {
            @Override
            public void process(Context context, Iterable<Tuple2<String, Long>> iterable, Collector<List<Tuple2<String, Long>>> collector) throws Exception {……
    
    
    评论

报告相同问题?

问题事件

  • 创建了问题 3月17日

悬赏问题

  • ¥15 为什么安装Anaconda时报系统找不到指定文件?
  • ¥15 如何将这个项目的ssh-TCP,改成ssh3-UDP协议
  • ¥20 ic卡dump文件校检码解密
  • ¥15 关于:接收到的数据不是有效的JSON格式
  • ¥15 apdl语言如何增加受力分析
  • ¥15 算法对比:学校优化算法与蚁群算法对比
  • ¥15 机电一体化系统设计说明书
  • ¥20 QChart画大量横向叠加柱状图时,颜色变为白色
  • ¥20 sgy数据提取地震波速,有人能回答吗小馋
  • ¥20 c#实现打开word的功能,并且需要安装成windows服务,word打不开怎么办