动物园里的码农 2022-09-07 15:07 采纳率: 80%
浏览 13

kafka注解过时处理

spring cloud stream整合kafka对应的注解过时怎么解决?

  • 写回答

2条回答 默认 最新

  • 念舒_C.ying 2022-09-07 15:36
    关注

    1.pom文件导入依赖

    <!-- kafka -->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    

    2.application.yml文件配置

    spring: 
     cloud:
      stream:
       kafka:
        binder:
         brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
       bindings:
        xxx_output: // 通道名称 
         destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 
         // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
        xxx_input:
         destination: xxx // 消息发往的目的地,对应topic
         group: xxx // 对应kafka的group
    

    3.创建消息发送者

    @EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
    @Service
    public class MqService {
    
      @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
      private MessageChannel oesWorkbenchChannel;
    
      /**
       * 发送一条kafka消息
       */
      public boolean sendLifeData(Object object) {
        return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
      }
    }
    
    // 发布通道
    public interface Source {
      @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
      MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel 
    }
    

    4.创建消息监听者

    @Slf4j
    @EnableBinding(Sink.class)
    public class WorkbenchStreamListener {
    
      @Resource
      private FileService fileService;
    
      @StreamListener(KafkaConstants.xxx_input) // 监听接受通道
      public void receiveData(MoveMessage moveMessage) {
      }
    }
    
    // 接受通道
    public interface Sink {
      @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
      SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel 
    }
    

    接下来就可以愉快的发送监听消息了

    评论

报告相同问题?

问题事件

  • 创建了问题 9月7日