m0_71078915 2023-07-13 14:24 采纳率: 0%
浏览 19
已结题

rabbitmq消息生成id为空

rabbitmq获取id序号时:
message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); 消息序号id取值为null

 //发送消息
        rabbitTemplate.convertAndSend()
...
自己Debug..
然后走到RabbitTemplate类中发现
long nextPublishSeqNo = channel.getNextPublishSeqNo();  值为0
但是不知道为啥,
求解答..
  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-07-13 17:32
    关注
    • 这有个类似的问题, 你可以参考下: https://ask.csdn.net/questions/7504757
    • 这篇博客也不错, 你可以看下RabbitMQ学习系列:一、RabbitMQ 的安装
    • 除此之外, 这篇博客: rabbitMq顺序消费方案中的 发送的时,如何获取组内的上一个消息id 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
    • 在《rabbitMq事务消息方案》中增加了发送任务表rabbit_producer_record,如果发送成功后,不清理发送记录,是可以用该表来实现“获取组内的上一个消息id”的目的。但是出于数据量的考虑,发送成功后,还是删除更好。
      额外再增加一张表producer_sequence_record来保存顺序消息记录:

      CREATE TABLE `producer_sequence_record` (
        `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '消息id',
        `message_id` VARCHAR(50) NOT NULL COMMENT '消息id',
        `application` VARCHAR(50) NOT NULL COMMENT '应用名称',
        `group_name` VARCHAR(50) NOT NULL COMMENT '消息分组,同分组内,消息序号按发送顺序递增',
        `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
        `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
        PRIMARY KEY (`id`),
        UNIQUE KEY `idx_message_app` (`message_id`, `application`),
        KEY `idx_app_group` (`application`, `group_name`),
        KEY `idx_create_time` (`create_time`)
      ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COMMENT = '顺序mq的记录表';
      

      当查询“组内的上一个消息id”时,可以用这样的语句select max(id) where application=? and group_name=? and id < ?
      发送消息的时候,会判断消息类型,如果消息类型是顺序消息,就会执行上述sql,得到id,再根据id查producer_sequence_record的整行记录,得到message_id。
      具体代码可以看com.zidongxiangxi.reliablemq.producer.service.DefaultRabbitProducerServiceImpl

      public class DefaultRabbitProducerServiceImpl implements RabbitProducerService {
      	......
      	
      	@Async
          @Override
          public void send(RabbitProducer producer) {
              try {
                  Message message;
                  if (Objects.nonNull(sequenceRecordManager)
                      && Objects.equals(producer.getType(), MessageTypeEnum.SEQUENCE.getValue())) {
                      // 从sequenceRecordManager获取上个消息id
                      String previousMessageId = sequenceRecordManager.getPreviousMessageId(producer.getMessageId(),
                          producer.getApplication());
                      message = RabbitUtils.generateMessage(producer, previousMessageId);
                  } else {
                      message = RabbitUtils.generateMessage(producer);
                  }
                  CorrelationData correlationData = RabbitUtils.generateCorrelationData(producer);
                  rabbitTemplate.send(producer.getExchange(), producer.getRoutingKey(), message, correlationData);
              } catch (Throwable throwable) {
                  log.error("send message failed producer={}", producer.toString(), throwable);
                  if (Objects.nonNull(alarm)) {
                      alarm.failWhenProduce(JSON.toJSONString(producer), throwable);
                  }
              }
          }
      }
      

      传了previousMessageId 参数,RabbitUtils.generateMessage方法就会在构造Message对象的时候,往消息头设置previousMessageId。

    • 您还可以看一下 Array老师老师的RabbitMQ3.8.x单机+集群搭建教程课程中的 RabbitMQ的三种模式:单机,默认,镜像小节, 巩固相关知识点
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 11月23日
  • 创建了问题 7月13日