m0_37818588 2021-09-02 10:41 采纳率: 0%
浏览 337

Spring kafka设置最大拉取数量

现在是服务器调用我的接口,然后我手动打开kafka监听,返回数据给接口,但是考虑到服务器 的性能,一次只希望消费比如10条左右,但是我设置了max-poll-records这个参数以后,还是会监听到所有的消息


List<String> dataList=new ArrayList<>();

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }

    @KafkaListener( topics = "topic.message1",containerFactory = "delayContainerFactory",id="KAFKA_ID")
    public void durableListener(String data) {
        //这里做数据持久化的操作
        log.info("topic.message receive : " + data);
        JSONObject obj = new JSONObject(data);
        String subject = (String) (obj.get("subject"));
        log.info(subject);

        dataList.add(subject);
    }


    public void startListener() {
        log.info("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("KAFKA_ID").isRunning()) {
            registry.getListenerContainer("KAFKA_ID").start();
        }
        registry.getListenerContainer("KAFKA_ID").resume();
    }

    public void shutDownListener() {
        log.info("关闭监听");
        registry.getListenerContainer("KAFKA_ID").pause();
    }

    public List<String> receivedData(){
        return dataList;
    }

server:
    port: 8080
Spring:
    kafka:
        bootstrap-servers: 132.232.8.25:32091
        consumer:
            enable-auto-commit: true
            auto-commit-interval: 100ms
            auto-offset-reset: latest
            group-id: uap-message
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            max-poll-records: 1


我设置的最大是一条,但是消息传过来我还是能收到好几条,是什么原因

  • 写回答

1条回答 默认 最新

  • 凵忆 2021-09-02 11:13
    关注

    一次调用poll()操作时返回的最大记录数,默认值为500
    把max-poll-records值改为2 试试

    评论

报告相同问题?

问题事件

  • 创建了问题 9月2日

悬赏问题

  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突
  • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大
  • ¥15 import arcpy出现importing _arcgisscripting 找不到相关程序