qq_34596620 2025-01-28 17:21 采纳率: 0%
浏览 18

Spring-Kafka在同时使用ErrorHandler和手动提交的情况下,当需要把消息发送到死信队列时,offset该怎么提交?

在spring-kafka中,我们可以使用ErrorHandler来处理错误。通常是重试规定次数后,如果任然报错那么发送到死信队列。现在我希望手动提交offset,手动提交的基本逻辑应该是:如果不报错或者消息被发送至死信队列时要提交,否则还要重试的话就不提交。
可是在@KafkaListener标注的方法中,我没有办法知道这次报错的话是重试还是发送到死信队列,所以我不明白如果要把消息发送到死信队列的话,该怎么手动提交这个消息。就算是在ConsumerRecordRecoverer中定义的恢复方法中,也没有办法进行手动提交呀

  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 优质创作者: 编程框架技术领域 2025-01-28 17:21
    关注

    以下回复引自免费微信小程序:皆我百晓生。 如问题解决,还请采纳。如仍有疑问,请回复留言或私信博主,以便获得更精准的帮助。

    在SPRING-KAFKA中,我们可以通过添加自定义的HANDLER来实现手动提交。以下是一个简单的例子:

    IMPORT ORG.APACHE.KAFKA.CLIENTS.CONSUMER.CONSUMERRECORD;
    IMPORT ORG.SPRINGFRAMEWORK.STEREOTYPE.COMPONENT;
    
    @COMPONENT
    PUBLIC CLASS MYCONSUMER IMPLEMENTS CONSUMERRECORDS<STRING, STRING> {
    
        @OVERRIDE
        PUBLIC VOID CONSUME(CONSUMERRECORDS<STRING, STRING> CONSUMERRECORDS) {
            FOR (CONSUMERRECORD<STRING, STRING> RECORD : CONSUMERRECORDS) {
                IF (!RECORD.VALUE().CONTAINS("ERROR")) { // 假设ERROR为FALSE
                    SYSTEM.OUT.PRINTLN(RECORD.KEY());
                    SYSTEM.OUT.PRINTLN(RECORD.VALUE());
                    // 你可以在这里执行任何你需要的操作,比如保存、日志等
                    BREAK; // 如果没有报错,则跳出循环并返回
                }
            }
        }
    
        @OVERRIDE
        PUBLIC VOID HANDLEEXCEPTION(EXCEPTION E) {
            // 这里可以添加你的异常处理逻辑
        }
    }
    

    在这个例子中,每当消费者记录中的VALUE不包含"ERROR"时,都会打印出对应的KEY和VALUE。这将触发我们的手动提交逻辑。

    如果你想要实现自动提交,你可能需要考虑以下几个步骤:

    1. 创建一个定时任务或CRON表达式来定期执行这个方法。
    2. 在这个定时任务或CRON表达式的回调函数中,检查是否已经成功地提交了OFFSET(即调用HANDLEEXCEPTION方法)。
    3. 如果已经成功提交了OFFSET,那么就可以继续从消费器中读取数据,无需手动提交。
    4. 如果还没有成功提交OFFSET,那么你应该在CONSUMERRECORDS中添加一个新的CONSUMERRECORD,并且在HANDLEEXCEPTION方法中进行相应的操作,以确保OFFSET被正确提交。

    以上就是解决这个问题的一般思路。具体实现细节会根据你的需求有所不同。

    评论

报告相同问题?

问题事件

  • 创建了问题 1月28日