1、kafka结合springboot开发。
2、生产消息的系统有多个,可能会动态的改变消费系统的个数。举例:A、B、C这3个系统生产消息后分别想让不同的系统使用广播模式消费消息。A想要系统1、2、3消费同一个消息,B想要系统2、3、4、5消费同一个消息,C想要系统1、2、5、6消费同一个消息。这样消费组的个数相当于在动态改变着,可能后面新增或减少,实际环境中生产消息的系统有几十个,消费消息的系统也有几十个,这种网上完全找不到代码案例,都是简单的案例或写死的。求指导代码该怎么写!太复杂了完全没思路!
springboot kafka 多个接收系统订阅多个发送系统,多个消费组多个消费者该如何写代码?
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
1条回答 默认 最新
关注 在Spring Boot中使用Kafka时,你可以通过编程方式动态地创建和管理多个消费者组,以适应动态变化的生产者和消费者。每个消费者组可以订阅一个或多个主题,并且可以根据需要动态地添加或删除消费者。
以下是一个高级的概念示例,它展示了如何在Spring Boot应用程序中设置多个动态消费者组来处理来自不同生产者的消息。
首先,你需要定义一些配置类来设置Kafka的基础配置,包括连接信息、消费者配置等。@Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id-prefix}") private String groupIdPrefix; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPrefix); // 其他消费者配置... return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
然后,你可以创建一个服务来处理消费者组的动态创建和管理。这个服务将使用Kafka的AdminClient来创建、删除消费者组,并使用Spring Kafka的ConcurrentKafkaListenerContainerFactory来创建消费者实例。
@Service public class KafkaConsumerService { @Autowired private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory; @Autowired private AdminClient adminClient; @Value("${spring.kafka.consumer.group-id-prefix}") private String groupIdPrefix; public void createConsumerGroup(String groupName, Set<String> topics) { // 创建消费者组 // 注意:消费者组ID必须是唯一的,你可以根据业务逻辑来生成它 String groupId = groupIdPrefix + "-" + groupName; // 创建消费者组 adminClient.createTopics(Collections.singleton(new NewTopic(groupId, 1, (short) 1))); // 创建消费者容器 ConcurrentMessageListenerContainer<String, String> container = kafkaListenerContainerFactory.createContainer(groupId, topics); // 启动消费者容器 container.start(); } public void removeConsumerGroup(String groupName) { // 删除消费者组 String groupId = groupIdPrefix + "-" + groupName; // 删除消费者组 adminClient.deleteTopics(Collections.singleton(groupId)); } // 其他逻辑,如动态更新消费者组的订阅主题等... }
最后,在你的应用程序中,你可以根据业务逻辑来调用KafkaConsumerService的createConsumerGroup和removeConsumerGroup方法来动态管理消费者组。
@RestController public class KafkaController { @Autowired private KafkaConsumerService kafkaConsumerService; @PostMapping("/create-consumer-group") public ResponseEntity<?> createConsumerGroup(@RequestBody CreateConsumerGroupRequest request) { String groupName = request.getGroupName(); Set<String> topics = request.getTopics(); kafkaConsumerService.createConsumerGroup(groupName, topics); return ResponseEntity.ok().build(); } @PostMapping("/remove-consumer-group") public ResponseEntity<?> removeConsumerGroup(@RequestBody RemoveConsumerGroupRequest request) { String groupName = request.getGroupName(); kafkaConsumerService.removeConsumerGroup(groupName); return ResponseEntity.ok().build(); } // DTOs for request/response public static class CreateConsumerGroupRequest { private String groupName; private Set<String> topics; // getters and setters... } public static class RemoveConsumerGroupRequest { private String groupName; // getters and setters... } }
请注意,以上代码是一个概念示例,它展示了如何在Spring Boot应用程序中设置动态消费者组。在实际应用中,你需要根据具体的问题具体分析和作相应的变化。
解决 无用评论 打赏 举报
悬赏问题
- ¥15 如何在scanpy上做差异基因和通路富集?
- ¥20 关于#硬件工程#的问题,请各位专家解答!
- ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
- ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
- ¥30 截图中的mathematics程序转换成matlab
- ¥15 动力学代码报错,维度不匹配
- ¥15 Power query添加列问题
- ¥50 Kubernetes&Fission&Eleasticsearch
- ¥15 報錯:Person is not mapped,如何解決?
- ¥15 c++头文件不能识别CDialog