在使用kafka时,遇到了业务需求。需要在消费一条消息后,暂停消费。等回调接口触发时,恢复消费。
我使用的是Spring-kafka注解@KafkaListener。
在网上搜索一大堆后,得到了一种方式:
这种方式暂停或停止监听消费
是等消费完所有消息才会执行。并不会立刻暂停或停止。
我想要的是立即暂停消费行为。如何实现?
在使用kafka时,遇到了业务需求。需要在消费一条消息后,暂停消费。等回调接口触发时,恢复消费。
我使用的是Spring-kafka注解@KafkaListener。
在网上搜索一大堆后,得到了一种方式:
我找到方法了。
private int start = 0;
指定一个变量。
在监听过程中,如果状态为0则执行提交操作
if (start == 0){
//手动提交消费
ack.acknowledge();
start = 1;
}
在消费完一条数据后,继续消费但不提交。
关闭监听,现在消息队列中其实只消费了一条。
System.out.println("关闭监听器..." + DateUtil.date());
registry.getListenerContainer("add_1").stop();
在一个事件触发的地方,加上监听的启动
// 启动kafka监听
System.out.println("启动监听器..." + DateUtil.date());
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("add_1").isRunning()) {
registry.getListenerContainer("add_1").start();
}
当条件达到以后,重新启动监听器,将从头开始。重复上面的操作。
最终达成,消费一条数据后关闭,条件达成后继续消费的效果。