weixin_46095148 2024-01-26 18:13 采纳率: 91.2%
浏览 0
已结题

广播消息,报错,请求修改调试成功后粘贴正确代码


package com.example.rocketmq.comh;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.util.List;

//广播消息
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消息组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
        //连接
        consumer.setNamesrvAddr("192.168.88.130:9876");
        //拿到Simple,再过滤
        consumer.subscribe("Simple","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i=1;i<list.size();i++){
                    System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动成功");
    }
}
package com.example.rocketmq.comh;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.util.List;

//广播消息
public class BroadcastConsumer2 {
    public static void main(String[] args) throws MQClientException {
        //创建消息组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
        //连接
        consumer.setNamesrvAddr("192.168.88.130:9876");
        //拿到Simple,再过滤
        consumer.subscribe("Simple","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i=1;i<list.size();i++){
                    System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动成功");
    }
}
package com.example.rocketmq.comh;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

//广播消息
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //创建生产组
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducwe");
        //连接
        producer.setNamesrvAddr("192.168.88.130:9876");
        //开始
        producer.start();
        for (int i=0;i<2;i++){
            //把消息放入Simple中
            Message message = new Message("Simple","Tags","SyncProducwe".getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.send(message);
            System.out.println("消息发送成功"+sendResult);
        }

        producer.shutdown();
    }
}

  • 写回答

3条回答 默认 最新

查看更多回答(2条)

报告相同问题?

问题事件

  • 系统已结题 2月6日
  • 已采纳回答 1月29日
  • 创建了问题 1月26日

悬赏问题

  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 在虚拟机环境下完成以下,要求截图!
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见