jupiterwangq 2020-10-09 15:46 采纳率: 0%
浏览 81

Kafka Streams的WindowStore可以保存多久?

我用类似下面的代码做开窗聚合,统计每分钟事件出现次数。

t.selectKey { _, rawEvent -> "${rawEvent.appid}${rawEvent.eid}" }
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
                .aggregate(
                        { CountWindow() },
                        { _, v, agg ->
                            // 窗口期的事件总数+1
                            agg.total += 1
                            agg
                        },
                        Materialized.`as`<String, CountWindow, WindowStore<Bytes, ByteArray>>("test_window_retentation1")
                                .withValueSerde(GenericGsonSerde<CountWindow> { gson, bytes ->
                                    gson.fromJson(String(bytes))
                                })
                )

然后通过ReadOnlyWindowStore#fetch()查询给定时间段内事件出现的次数,一开始的时候结果是正常的,但是一段时间后当我重启应用,fetch就获取不到窗口数据了,看上去像是历史窗口被丢弃了。

请问这种开窗操作的窗口是不是有个保留时间呢,如果有的话是否可以设置?

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-09-21 13:59
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line
  • ¥500 火焰左右视图、视差(基于双目相机)
  • ¥100 set_link_state
  • ¥15 虚幻5 UE美术毛发渲染
  • ¥15 CVRP 图论 物流运输优化
  • ¥15 Tableau online 嵌入ppt失败
  • ¥100 支付宝网页转账系统不识别账号
  • ¥15 基于单片机的靶位控制系统
  • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?