我用spring boot写了两个kafka消费组,采用kafka Listner可以发现消息通道是没有问题的,但是我这边springboot消费消息比较不稳定,接收不到的概率比较大,另外感觉采用debug模式运行了话接收到的概率会大很多,我的consumer大概类似如下:
@KafkaListener(id = "IRPAGroup", topicPattern = "IRPA.*",containerFactory = "kafkaListenerFactory")
public void resultConsumer(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
try {
if (kafkaMessage.isPresent()) {
// 解析消息内容为 JSON
String messageValue = kafkaMessage.get().toString();
JsonNode jsonNode = objectMapper.readTree(messageValue);
// 提取 hashId 和 result
String hashId = jsonNode.get("hashId").asText();
String result = jsonNode.get("result").asText();
// 打印提取的字段(调试用)
log.info("[{}] 提取字段 - hashId: {}, result: {}", getCurrentFormattedTime(), hashId, result);
// 将 result 存入数据库
logIndividualRiskMapper.updateOutputByHashId(hashId, result);
acknowledgment.acknowledge(); // 提交偏移量
// 打印日志(原逻辑保持不变)
String topic = consumerRecord.topic();
int partition = consumerRecord.partition();
long offset = consumerRecord.offset();
String logMessage = String.format("\n[%s] 收到个人风险进度消息:"
+ "\n\t主题: %s"
+ "\n\t分区: %d"
+ "\n\t偏移量: %d"
+ "\n\thashId: %s"
+ "\n\tresult: %s",
getCurrentFormattedTime(),
topic,
partition,
offset,
hashId,
result);
System.out.println(logMessage);
log.info(logMessage);
}
} catch (IOException e) {
log.error("[{}] JSON 解析失败: {}", getCurrentFormattedTime(), e.getMessage());
} catch (Exception e) {
log.error("[{}] 处理消息失败: {}", getCurrentFormattedTime(), e.getMessage(), e);
}
}
kafka相关的配置如下:
#kafka
spring.kafka.bootstrap-servers=172.20.138.172:9092
spring.kafka.producer.retries=3
spring.kafka.producer.retry-backoff-ms=1000
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.properties.linger.ms=100
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=90000
spring.kafka.consumer.properties.request.timeout.ms=100000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.missing-topics-fatal=false
想请问各位劳这个问题可能会出现在哪里?能不能帮忙解决一下,不胜感激,可有偿