1、kafka结合springboot开发。
2、生产消息的系统有多个,可能会动态的改变消费系统的个数。举例:A、B、C这3个系统生产消息后分别想让不同的系统使用广播模式消费消息。A想要系统1、2、3消费同一个消息,B想要系统2、3、4、5消费同一个消息,C想要系统1、2、5、6消费同一个消息。这样消费组的个数相当于在动态改变着,可能后面新增或减少,实际环境中生产消息的系统有几十个,消费消息的系统也有几十个,这种网上完全找不到代码案例,都是简单的案例或写死的。求指导代码该怎么写!太复杂了完全没思路!
springboot kafka 多个接收系统订阅多个发送系统,多个消费组多个消费者该如何写代码?
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
1条回答 默认 最新
- danci_btq 2024-02-14 10:07关注
在Spring Boot中使用Kafka时,你可以通过编程方式动态地创建和管理多个消费者组,以适应动态变化的生产者和消费者。每个消费者组可以订阅一个或多个主题,并且可以根据需要动态地添加或删除消费者。
以下是一个高级的概念示例,它展示了如何在Spring Boot应用程序中设置多个动态消费者组来处理来自不同生产者的消息。
首先,你需要定义一些配置类来设置Kafka的基础配置,包括连接信息、消费者配置等。@Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id-prefix}") private String groupIdPrefix; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPrefix); // 其他消费者配置... return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
然后,你可以创建一个服务来处理消费者组的动态创建和管理。这个服务将使用Kafka的AdminClient来创建、删除消费者组,并使用Spring Kafka的ConcurrentKafkaListenerContainerFactory来创建消费者实例。
@Service public class KafkaConsumerService { @Autowired private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory; @Autowired private AdminClient adminClient; @Value("${spring.kafka.consumer.group-id-prefix}") private String groupIdPrefix; public void createConsumerGroup(String groupName, Set<String> topics) { // 创建消费者组 // 注意:消费者组ID必须是唯一的,你可以根据业务逻辑来生成它 String groupId = groupIdPrefix + "-" + groupName; // 创建消费者组 adminClient.createTopics(Collections.singleton(new NewTopic(groupId, 1, (short) 1))); // 创建消费者容器 ConcurrentMessageListenerContainer<String, String> container = kafkaListenerContainerFactory.createContainer(groupId, topics); // 启动消费者容器 container.start(); } public void removeConsumerGroup(String groupName) { // 删除消费者组 String groupId = groupIdPrefix + "-" + groupName; // 删除消费者组 adminClient.deleteTopics(Collections.singleton(groupId)); } // 其他逻辑,如动态更新消费者组的订阅主题等... }
最后,在你的应用程序中,你可以根据业务逻辑来调用KafkaConsumerService的createConsumerGroup和removeConsumerGroup方法来动态管理消费者组。
@RestController public class KafkaController { @Autowired private KafkaConsumerService kafkaConsumerService; @PostMapping("/create-consumer-group") public ResponseEntity<?> createConsumerGroup(@RequestBody CreateConsumerGroupRequest request) { String groupName = request.getGroupName(); Set<String> topics = request.getTopics(); kafkaConsumerService.createConsumerGroup(groupName, topics); return ResponseEntity.ok().build(); } @PostMapping("/remove-consumer-group") public ResponseEntity<?> removeConsumerGroup(@RequestBody RemoveConsumerGroupRequest request) { String groupName = request.getGroupName(); kafkaConsumerService.removeConsumerGroup(groupName); return ResponseEntity.ok().build(); } // DTOs for request/response public static class CreateConsumerGroupRequest { private String groupName; private Set<String> topics; // getters and setters... } public static class RemoveConsumerGroupRequest { private String groupName; // getters and setters... } }
请注意,以上代码是一个概念示例,它展示了如何在Spring Boot应用程序中设置动态消费者组。在实际应用中,你需要根据具体的问题具体分析和作相应的变化。
解决 无用评论 打赏 举报
悬赏问题
- ¥15 【提问】基于Invest的水源涵养
- ¥20 微信网友居然可以通过vx号找到我绑的手机号
- ¥15 spring后端vue前端
- ¥15 寻一个支付宝扫码远程授权登录的软件助手app
- ¥15 解riccati方程组
- ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
- ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
- ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
- ¥50 树莓派安卓APK系统签名
- ¥65 汇编语言除法溢出问题