小小新手 2023-08-18 11:27 采纳率: 0%
浏览 2

spring boot kafka

spring boot kafka
实现发布订阅 代码如下
问题为 KafkaConsumer 客户端类 的process 方法会实时收到消息处理?

发布生产类
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);

private static final String TOPIC = "topic-test";

@Resource
private KafkaTemplate kafkaTemplate;

public KafkaProducerController(KafkaTemplate kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}


@PostMapping("/push")
public ResponseEntity<String> pushMessage(@RequestBody Message message) {
    Date time = new Date();
    message.setSendTime(time);
    kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
            -> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
            -> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
    return new ResponseEntity<>("success", HttpStatus.OK);
}

}

消费类
订阅类
@Component
public class KafkaConsumer {

@Autowired
private static KafkaTemplate kafkaTemplate;

/**
 * 单个topic订阅
 */
@KafkaListener(topics = "topic-test")
public void onDeviceSubStatusMessageDevice(List<ConsumerRecord<String, ?>> listRecord) {
    process(listRecord);
}

private void process(List<ConsumerRecord<String, ?>> listRecord) {
    System.out.println("订阅者接收到消息");
    listRecord.forEach(record -> {
        System.out.println(record.key()+record.value());

    });
}


//发送数据到kafka
private void sendKafka(String abcd, String efg) {
    Map<String, Object> body = new HashMap<>(8);
    body.put("time", System.currentTimeMillis());
    body.put("abcd", abcd);
    body.put("efg", efg);
    kafkaTemplate.send("test-topic", JSON.toJSONString(body));
}

}

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-08-18 15:14
    关注

    【以下回答由 GPT 生成】

    在KafkaConsumer类中,你可以使用@EnableKafka注解启动Kafka监听器的自动配置。在配置类上添加@EnableKafka注解后,可以通过设置KafkaListenerContainerFactory来配置监听器的一些属性,例如并发数量,消费组ID等。

    首先,你需要在配置类上添加@EnableKafka注解:

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        // 配置其他相关的配置
    
    }
    

    然后,你可以创建一个KafkaListenerContainerFactory的bean,并指定一些属性,如下所示:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 设置并发消费者数量
        factory.getContainerProperties().setPollTimeout(3000); // 设置等待时间
        return factory;
    }
    

    最后,在KafkaConsumer类中的方法上添加@KafkaListener注解,指定要订阅的topic即可:

    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = "topic-test")
        public void onMessage(String message) {
            process(message);
        }
    
        private void process(String message) {
            System.out.println("收到消息: " + message);
        }
    }
    

    通过以上步骤,你就可以实现实时收到消息的功能了。



    【相关推荐】



    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 8月18日

悬赏问题

  • ¥15 GDI处理通道视频时总是带有白色锯齿
  • ¥20 用雷电模拟器安装百达屋apk一直闪退
  • ¥15 算能科技20240506咨询(拒绝大模型回答)
  • ¥15 自适应 AR 模型 参数估计Matlab程序
  • ¥100 角动量包络面如何用MATLAB绘制
  • ¥15 merge函数占用内存过大
  • ¥15 Revit2020下载问题
  • ¥15 使用EMD去噪处理RML2016数据集时候的原理
  • ¥15 神经网络预测均方误差很小 但是图像上看着差别太大
  • ¥15 单片机无法进入HAL_TIM_PWM_PulseFinishedCallback回调函数