风起了,嗯 2024-03-08 15:17 采纳率: 0%
浏览 24

RocketMQ使用广播模式,每个结点消费完毕后,消息堆积

RocketMQ使用广播模式,每个结点消费完毕后,rocketmq-console控制台依旧显示消息没被消费掉(消息堆积);
更换为集群模式后,rocektmq-console正常,没有消息堆积。
有什么解决方案吗?

  • 写回答

3条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-03-08 15:18
    关注

    引自免费微信小程序:皆我百晓生

    在RocketMQ中,广播模式(Broadcasting)的特点是消息会被集群中的每一个消费者实例接收到并进行消费。对于您提到的问题,在广播模式下,即使每个节点消费完毕后,rocketmq-console控制台仍然显示消息堆积,这可能是因为 RocketMQ 的设计机制导致的——在广播模式下,由于消息被所有消费者接收且只消费一次,但控制台统计的是消息投递次数而非消费完成次数,所以从控制台的角度看,除了最后一个成功消费的消费者外,其他消费者的“未确认”状态可能会造成消息堆积的假象。

    解决这个问题的关键在于理解广播模式的特性,并非所有的消息都需要被确认消费。如果您希望看到控制台上消息无堆积的情况,请确保:

    1. 消费逻辑处理完毕:检查每个消费者消费消息后的业务处理是否及时且正确完成。
    2. 了解监控指标含义:明白在广播模式下,控制台的消息堆积可能只是表明消息已被广播到多个消费者,而不是真的未被消费。

    然而,如果您的需求确实需要每个消息被成功消费后不再显示堆积,那么集群模式(Clustering)可能更适合您,因为集群模式下的消息会在消费组内按照负载均衡策略分配给消费者,并且只有当消息被所有消费者都成功消费后,才会从队列中移除,从而避免了控制台显示消息堆积的现象。

    至于代码层面的调整,主要是在创建消费者时指定消费模式为集群模式:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
    consumer.setConsumeMode(MessageModel.CLUSTERING); // 设置为集群模式
    consumer.subscribe("TopicName", "*"); // 订阅主题
    
    // 添加消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 实现消息消费逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    
    consumer.start();
    

    总结一下,对于广播模式下消息堆积问题的解决方案,重点在于理解和适应其工作原理,同时根据实际业务需求选择合适的消费模式。如需确保消息不堆积展示,采用集群模式并保证消费逻辑的正常执行即可。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 3月8日

悬赏问题

  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Linux权限管理相关操作(求解答)
  • ¥15 Bug traq 数据包 大概什么价
  • ¥15 在anaconda上pytorch和paddle paddle下载报错
  • ¥25 自动填写QQ腾讯文档收集表
  • ¥15 DbVisualizer Pro 12.0.7 sql commander光标错位 显示位置与实际不符
  • ¥15 求一份STM32F10X的I2S外设库
  • ¥15 android 打包报错
  • ¥15 关于stm32的问题
  • ¥15 ncode振动疲劳分析中,noisefloor如何影响PSD函数?