在spring-kafka中,我们可以使用ErrorHandler来处理错误。通常是重试规定次数后,如果任然报错那么发送到死信队列。现在我希望手动提交offset,手动提交的基本逻辑应该是:如果不报错或者消息被发送至死信队列时要提交,否则还要重试的话就不提交。
可是在@KafkaListener标注的方法中,我没有办法知道这次报错的话是重试还是发送到死信队列,所以我不明白如果要把消息发送到死信队列的话,该怎么手动提交这个消息。就算是在ConsumerRecordRecoverer中定义的恢复方法中,也没有办法进行手动提交呀
Spring-Kafka在同时使用ErrorHandler和手动提交的情况下,当需要把消息发送到死信队列时,offset该怎么提交?
- 写回答
- 好问题 0 提建议
- 关注问题
- 邀请回答
-
2条回答 默认 最新
关注以下回复引自免费微信小程序:皆我百晓生。 如问题解决,还请采纳。如仍有疑问,请回复留言或私信博主,以便获得更精准的帮助。
在SPRING-KAFKA中,我们可以通过添加自定义的HANDLER来实现手动提交。以下是一个简单的例子:
IMPORT ORG.APACHE.KAFKA.CLIENTS.CONSUMER.CONSUMERRECORD; IMPORT ORG.SPRINGFRAMEWORK.STEREOTYPE.COMPONENT; @COMPONENT PUBLIC CLASS MYCONSUMER IMPLEMENTS CONSUMERRECORDS<STRING, STRING> { @OVERRIDE PUBLIC VOID CONSUME(CONSUMERRECORDS<STRING, STRING> CONSUMERRECORDS) { FOR (CONSUMERRECORD<STRING, STRING> RECORD : CONSUMERRECORDS) { IF (!RECORD.VALUE().CONTAINS("ERROR")) { // 假设ERROR为FALSE SYSTEM.OUT.PRINTLN(RECORD.KEY()); SYSTEM.OUT.PRINTLN(RECORD.VALUE()); // 你可以在这里执行任何你需要的操作,比如保存、日志等 BREAK; // 如果没有报错,则跳出循环并返回 } } } @OVERRIDE PUBLIC VOID HANDLEEXCEPTION(EXCEPTION E) { // 这里可以添加你的异常处理逻辑 } }在这个例子中,每当消费者记录中的VALUE不包含"ERROR"时,都会打印出对应的KEY和VALUE。这将触发我们的手动提交逻辑。
如果你想要实现自动提交,你可能需要考虑以下几个步骤:
- 创建一个定时任务或CRON表达式来定期执行这个方法。
- 在这个定时任务或CRON表达式的回调函数中,检查是否已经成功地提交了OFFSET(即调用HANDLEEXCEPTION方法)。
- 如果已经成功提交了OFFSET,那么就可以继续从消费器中读取数据,无需手动提交。
- 如果还没有成功提交OFFSET,那么你应该在CONSUMERRECORDS中添加一个新的CONSUMERRECORD,并且在HANDLEEXCEPTION方法中进行相应的操作,以确保OFFSET被正确提交。
以上就是解决这个问题的一般思路。具体实现细节会根据你的需求有所不同。
解决 无用评论 打赏 举报