package com.example.rocketmq.comk;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
//过滤信息
public class Comsumer {
public static void main(String[] args) throws MQClientException {
//创建消息组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
//连接
consumer.setNamesrvAddr("192.168.88.130:9876");
//拿到Simple,再过滤
consumer.subscribe("Simple","TagA");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i=1;i<list.size();i++){
System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
package com.example.rocketmq.comk;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
//过滤信息
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//创建生产组
DefaultMQProducer producer = new DefaultMQProducer("Producer");
//连接
producer.setNamesrvAddr("192.168.88.130:9876");
//开始
producer.start();
String[] tags= new String[]{"TagA","TagB","TagC"};
for (int i=0;i<10;i++){
Message message = new Message("Sinple", "tags[i%tags.length]", (i + "Producer").getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
System.out.println(tags[i%tags.length]+"发送信息成功,"+sendResult);
}
producer.shutdown();
}
}
过滤信息,报错,请求修改调试成功后粘贴出正确答案
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
15条回答 默认 最新
- CSDN专家-sinJack 2024-01-29 17:58关注
发送消息时,容易超时,增加超时时间设置。
SendResult sendResult = producer.send(messages); 改为: SendResult sendResult = producer.send(messages,30*1000);
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报
悬赏问题
- ¥15 35114 SVAC视频验签的问题
- ¥15 impedancepy
- ¥15 在虚拟机环境下完成以下,要求截图!
- ¥15 求往届大挑得奖作品(ppt…)
- ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
- ¥50 浦育平台scratch图形化编程
- ¥20 求这个的原理图 只要原理图
- ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
- ¥20 微信的店铺小程序如何修改背景图
- ¥15 UE5.1局部变量对蓝图不可见