Darren i 2021-05-20 00:26 采纳率: 0%
浏览 102
已结题

关于RocketMQ顺序消费的问题

问题如下图所示,RocketMQ小白,希望能得到解答

 

  • 写回答

1条回答 默认 最新

  • 小P聊技术 2021-05-20 10:29
    关注

    关键就是Broker中还有多个MessageQueue,同一个MessageQueue中的消息才能顺序消费。

    官网中,发送消息时,使用了一个MessageQueueSelector

    发送端:

    SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            return mqs.get(0);
        }
    },i);

    消费端: 

    @Bean(name = "pushConsumerOrderly", initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQPushConsumer pushConsumerOrderly() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pushConsumerOrderly");
        consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
        consumer.subscribe("broker-a-topic","*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(5) * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }
    评论

报告相同问题?