hujobs 2019-12-13 14:12 采纳率: 0%
浏览 1258

springboot kafka 多个接收系统订阅多个发送系统,多个消费组多个消费者该如何写代码?

1、kafka结合springboot开发。
2、生产消息的系统有多个,可能会动态的改变消费系统的个数。举例:A、B、C这3个系统生产消息后分别想让不同的系统使用广播模式消费消息。A想要系统1、2、3消费同一个消息,B想要系统2、3、4、5消费同一个消息,C想要系统1、2、5、6消费同一个消息。这样消费组的个数相当于在动态改变着,可能后面新增或减少,实际环境中生产消息的系统有几十个,消费消息的系统也有几十个,这种网上完全找不到代码案例,都是简单的案例或写死的。求指导代码该怎么写!太复杂了完全没思路!

  • 写回答

1条回答 默认 最新

  • danci_btq 2024-02-14 10:07
    关注

    在Spring Boot中使用Kafka时,你可以通过编程方式动态地创建和管理多个消费者组,以适应动态变化的生产者和消费者。每个消费者组可以订阅一个或多个主题,并且可以根据需要动态地添加或删除消费者。

    以下是一个高级的概念示例,它展示了如何在Spring Boot应用程序中设置多个动态消费者组来处理来自不同生产者的消息。
    首先,你需要定义一些配置类来设置Kafka的基础配置,包括连接信息、消费者配置等。

    @Configuration  
    @EnableKafka  
    public class KafkaConfig {  
      
        @Value("${spring.kafka.bootstrap-servers}")  
        private String bootstrapServers;  
      
        @Value("${spring.kafka.consumer.group-id-prefix}")  
        private String groupIdPrefix;  
      
        @Bean  
        public ConsumerFactory<String, String> consumerFactory() {  
            Map<String, Object> props = new HashMap<>();  
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPrefix);  
            // 其他消费者配置...  
            return new DefaultKafkaConsumerFactory<>(props);  
        }  
      
        @Bean  
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {  
            ConcurrentKafkaListenerContainerFactory<String, String> factory =  
                    new ConcurrentKafkaListenerContainerFactory<>();  
            factory.setConsumerFactory(consumerFactory());  
            return factory;  
        }  
    }
    
    

    然后,你可以创建一个服务来处理消费者组的动态创建和管理。这个服务将使用Kafka的AdminClient来创建、删除消费者组,并使用Spring Kafka的ConcurrentKafkaListenerContainerFactory来创建消费者实例。

    @Service  
    public class KafkaConsumerService {  
      
        @Autowired  
        private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;  
      
        @Autowired  
        private AdminClient adminClient;  
      
        @Value("${spring.kafka.consumer.group-id-prefix}")  
        private String groupIdPrefix;  
      
        public void createConsumerGroup(String groupName, Set<String> topics) {  
            // 创建消费者组  
            // 注意:消费者组ID必须是唯一的,你可以根据业务逻辑来生成它  
            String groupId = groupIdPrefix + "-" + groupName;  
              
            // 创建消费者组  
            adminClient.createTopics(Collections.singleton(new NewTopic(groupId, 1, (short) 1)));  
              
            // 创建消费者容器  
            ConcurrentMessageListenerContainer<String, String> container =  
                    kafkaListenerContainerFactory.createContainer(groupId, topics);  
              
            // 启动消费者容器  
            container.start();  
        }  
      
        public void removeConsumerGroup(String groupName) {  
            // 删除消费者组  
            String groupId = groupIdPrefix + "-" + groupName;  
              
            // 删除消费者组  
            adminClient.deleteTopics(Collections.singleton(groupId));  
        }  
      
        // 其他逻辑,如动态更新消费者组的订阅主题等...  
    }
    
    

    最后,在你的应用程序中,你可以根据业务逻辑来调用KafkaConsumerService的createConsumerGroup和removeConsumerGroup方法来动态管理消费者组。

    @RestController  
    public class KafkaController {  
      
        @Autowired  
        private KafkaConsumerService kafkaConsumerService;  
      
        @PostMapping("/create-consumer-group")  
        public ResponseEntity<?> createConsumerGroup(@RequestBody CreateConsumerGroupRequest request) {  
            String groupName = request.getGroupName();  
            Set<String> topics = request.getTopics();  
              
            kafkaConsumerService.createConsumerGroup(groupName, topics);  
              
            return ResponseEntity.ok().build();  
        }  
      
        @PostMapping("/remove-consumer-group")  
        public ResponseEntity<?> removeConsumerGroup(@RequestBody RemoveConsumerGroupRequest request) {  
            String groupName = request.getGroupName();  
              
            kafkaConsumerService.removeConsumerGroup(groupName);  
              
            return ResponseEntity.ok().build();  
        }  
      
        // DTOs for request/response  
        public static class CreateConsumerGroupRequest {  
            private String groupName;  
            private Set<String> topics;  
            // getters and setters...  
        }  
      
        public static class RemoveConsumerGroupRequest {  
            private String groupName;  
            // getters and setters...  
        }  
    }
    
    

    请注意,以上代码是一个概念示例,它展示了如何在Spring Boot应用程序中设置动态消费者组。在实际应用中,你需要根据具体的问题具体分析和作相应的变化。

    评论

报告相同问题?

悬赏问题

  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 spring后端vue前端
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题