月夜归醉 2020-08-05 16:29 采纳率: 0%
浏览 816

rocketmq 广播模式消费,消费者消费消息后,rocketmq-console依旧显示消息堆积

我的rocketmq配置是双主双从,异步模式的。我发现一个问题,就是我使用广播模式消费消息,消息有被消费者消费,但是rocketmq-console控制台依旧显示消息没被消费掉(消息堆积)。

我的生产者代码如下:

public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("springboot-mq", "Tag1", ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:" + result);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }

        //6.关闭生产者producer
       // producer.shutdown();
    }

消费者代码如下:

public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("springboot-mq", "*");

        //设定消费模式:负载均衡|广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容cc
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }

生产者把消息发送到rocketmq,消费者都可以消费到消息。但是消费者消费完消息后,rocketmq-console控制台依旧没有显示消息被消费掉(消息堆积)。如果把消费者采用集群模式消费,消息被消费后,rocektmq-console正常,没有消息堆积。

下面是我的rocketmq-console相关截图:
图片说明

对应的源码:https://gitee.com/brozer/rocketmq-demo

  • 写回答

2条回答 默认 最新

  • 男人至死是少年丶 2021-05-21 16:06
    关注

    请问这个问题有解决方案吗?我目前也遇到了和您一样的问题

    评论

报告相同问题?

悬赏问题

  • ¥15 基于PLC的三轴机械手程序
  • ¥15 多址通信方式的抗噪声性能和系统容量对比
  • ¥15 winform的chart曲线生成时有凸起
  • ¥15 msix packaging tool打包问题
  • ¥15 finalshell节点的搭建代码和那个端口代码教程
  • ¥15 Centos / PETSc / PETGEM
  • ¥15 centos7.9 IPv6端口telnet和端口监控问题
  • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 海浪数据 南海地区海况数据,波浪数据