锦瑟年华人无恙 2021-01-22 23:36 采纳率: 0%
浏览 107

服务器集群消费了kafka的同一条数据

问题来源:目前项目需求下:作为kafka消息consumer方,如果因为某些原因造成服务不可用,期间产生的历史数据,服务器重启之后希望从最新的offset开始消废。比如服务宕机前消废到了offset值是100,服务宕机一段时间,产生了100条数据,当服务重启之后希望从offset 201开始消废,从101到200之间的消息丢弃。

思路:目前有两种解决方案

1.每次重启生成新的groupId消废改topic,可以达到此目的

2.通过消息监听器,获取每个分区上最后的offset,设置从该offset开始消废

 

经过考虑采用第二种解决方案,通过配置ConcurrentMessageListenerContainer类,获取指定topic下的分区,得到当前的offset,然后设置消费者从该位置消废。此时发布测试产生了问题

 

测试环境有4台服务器a  b  c  d ,主题为test-topic , 并且都在同一个group中,groupid为test-group。当来了一条消息,abcd四台服务器都进行了消废。理论上来说应该只会有一台服务器消废该消息。目前该问题尚未解决

希望有大佬能帮我答疑,有长

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-09-07 18:05
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥50 易语言把MYSQL数据库中的数据添加至组合框
  • ¥20 求数据集和代码#有偿答复
  • ¥15 关于下拉菜单选项关联的问题
  • ¥20 java-OJ-健康体检
  • ¥15 rs485的上拉下拉,不会对a-b<-200mv有影响吗,就是接受时,对判断逻辑0有影响吗
  • ¥15 使用phpstudy在云服务器上搭建个人网站
  • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
  • ¥15 vue3+express部署到nginx
  • ¥20 搭建pt1000三线制高精度测温电路
  • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况