2301_76592816 2025-03-29 12:13 采纳率: 0%
浏览 7

kafka问题请求各位劳们!

我用spring boot写了两个kafka消费组,采用kafka Listner可以发现消息通道是没有问题的,但是我这边springboot消费消息比较不稳定,接收不到的概率比较大,另外感觉采用debug模式运行了话接收到的概率会大很多,我的consumer大概类似如下:

    @KafkaListener(id = "IRPAGroup", topicPattern = "IRPA.*",containerFactory = "kafkaListenerFactory")
    public void resultConsumer(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());

        try {
            if (kafkaMessage.isPresent()) {
                // 解析消息内容为 JSON
                String messageValue = kafkaMessage.get().toString();
                JsonNode jsonNode = objectMapper.readTree(messageValue);

                // 提取 hashId 和 result
                String hashId = jsonNode.get("hashId").asText();
                String result = jsonNode.get("result").asText();

                // 打印提取的字段(调试用)
                log.info("[{}] 提取字段 - hashId: {}, result: {}", getCurrentFormattedTime(), hashId, result);

                // 将 result 存入数据库
                logIndividualRiskMapper.updateOutputByHashId(hashId, result);
                acknowledgment.acknowledge(); // 提交偏移量
                // 打印日志(原逻辑保持不变)
                String topic = consumerRecord.topic();
                int partition = consumerRecord.partition();
                long offset = consumerRecord.offset();
                String logMessage = String.format("\n[%s] 收到个人风险进度消息:"
                                + "\n\t主题: %s"
                                + "\n\t分区: %d"
                                + "\n\t偏移量: %d"
                                + "\n\thashId: %s"
                                + "\n\tresult: %s",
                        getCurrentFormattedTime(),
                        topic,
                        partition,
                        offset,
                        hashId,
                        result);
                System.out.println(logMessage);
                log.info(logMessage);
            }
        } catch (IOException e) {
            log.error("[{}] JSON 解析失败: {}", getCurrentFormattedTime(), e.getMessage());
        } catch (Exception e) {
            log.error("[{}] 处理消息失败: {}", getCurrentFormattedTime(), e.getMessage(), e);
        }
    }

kafka相关的配置如下:

#kafka
spring.kafka.bootstrap-servers=172.20.138.172:9092
spring.kafka.producer.retries=3  
spring.kafka.producer.retry-backoff-ms=1000  
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.properties.linger.ms=100
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=90000
spring.kafka.consumer.properties.request.timeout.ms=100000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=false

想请问各位劳这个问题可能会出现在哪里?能不能帮忙解决一下,不胜感激,可有偿

  • 写回答

3条回答 默认 最新

  • 阿里嘎多学长 2025-03-29 12:13
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    Kafka消费组不稳定问题

    你使用 Spring Boot 写了两个 Kafka 消费组,使用 Kafka Listener 发现消息通道没有问题,但是 Spring Boot 消费消息比较不稳定,接收不到的概率比较大。

    可能的原因:

    1. 消费组配置不正确:检查你的消费组配置,确保每个消费组的 offset 配置正确。
    2. 消息重复消费:检查你的消费组是否重复消费消息,导致消息丢失。
    3. 网络连接问题:检查你的网络连接是否稳定,是否有网络延迟或 packet loss。
    4. Kafka Broker 问题:检查你的 Kafka Broker 是否稳定,是否有 BrokerDown 或 NetworkPartition 问题。

    解决方案:

    1. 检查消费组配置:使用 kafka-consumer-groups 命令检查消费组配置,确保每个消费组的 offset 配置正确。
    2. 使用 idempotent 消费:使用 idempotent 消费机制,避免重复消费消息。
    3. 使用 retry 机制:使用 retry 机制,重试失败的消息消费。
    4. 监控 Kafka Broker:监控 Kafka Broker 的状态,确保 BrokerDown 或 NetworkPartition 问题不影响消费。

    代码示例:

    @Configuration
    public class KafkaConfig {
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    }
    
    @Service
    public class KafkaConsumer {
        @Autowired
        private ConsumerFactory<String, String> consumerFactory;
    
        public void consume() {
            Consumer<String, String> consumer = consumerFactory.createConsumer();
            consumer.subscribe(Arrays.asList("my-topic"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 处理消息
                        System.out.println(record.value());
                    } catch (Exception e) {
                        // 处理错误
                        System.out.println("Error: " + e.getMessage());
                    }
                }
                consumer.commitSync();
            }
        }
    }
    

    注意:上面的代码只是一个示例,需要根据你的实际情况进行修改。

    评论

报告相同问题?

问题事件

  • 创建了问题 3月29日