现在是服务器调用我的接口,然后我手动打开kafka监听,返回数据给接口,但是考虑到服务器 的性能,一次只希望消费比如10条左右,但是我设置了max-poll-records这个参数以后,还是会监听到所有的消息
List<String> dataList=new ArrayList<>();
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止自动启动
container.setAutoStartup(false);
return container;
}
@KafkaListener( topics = "topic.message1",containerFactory = "delayContainerFactory",id="KAFKA_ID")
public void durableListener(String data) {
//这里做数据持久化的操作
log.info("topic.message receive : " + data);
JSONObject obj = new JSONObject(data);
String subject = (String) (obj.get("subject"));
log.info(subject);
dataList.add(subject);
}
public void startListener() {
log.info("开启监听");
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer("KAFKA_ID").isRunning()) {
registry.getListenerContainer("KAFKA_ID").start();
}
registry.getListenerContainer("KAFKA_ID").resume();
}
public void shutDownListener() {
log.info("关闭监听");
registry.getListenerContainer("KAFKA_ID").pause();
}
public List<String> receivedData(){
return dataList;
}
server:
port: 8080
Spring:
kafka:
bootstrap-servers: 132.232.8.25:32091
consumer:
enable-auto-commit: true
auto-commit-interval: 100ms
auto-offset-reset: latest
group-id: uap-message
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
我设置的最大是一条,但是消息传过来我还是能收到好几条,是什么原因