package com.example.rocketmq.comf;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
//拉模式之指定获取一个队列信息
public class AsyncProducer3 {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//创建生产组
DefaultMQProducer producer = new DefaultMQProducer("SyncProducwe");
//连接
producer.setNamesrvAddr("192.168.88.130:9876");
//开始
producer.start();
for (int i=0;i<100;i++){
final int index=i;
Message message = new Message("Simple","TagA",(i+"HH").getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback() {
@Override//发送成功调用该方法
public void onSuccess(SendResult sendResult) {
System.out.println(index+"消息发送成功_"+sendResult);
}
@Override//发送失败调用该方法
public void onException(Throwable throwable) {
System.out.println(index+"消息发送失败_"+throwable.getStackTrace());
}
});
}
producer.shutdown();
}
}
package com.example.rocketmq.comf;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
//拉模式之指定获取一个队列信息
public class LitePullConsumerAssign {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
consumer.setNamesrvAddr("192.168.88.130:9876");
//启动消费者
consumer.start();
//从这个topic拿到信息
Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues("Simple");
ArrayList<MessageQueue> messageQueues1 = new ArrayList<>(messageQueues);
consumer.assign(messageQueues);
consumer.seek(messageQueues1.get(0),10);
System.out.println("ok");
while (true){
List<MessageExt> messageExtList = consumer.poll();
System.out.println("信息拉取成功");
messageExtList.forEach(n->{
System.out.println("信息消费成功_"+n);
});
}
}
}
拉模式之指定获取一个队列信息,报错,请求修改调试成功后再粘贴出正确答案
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
2条回答 默认 最新
- CSDN专家-sinJack 2024-01-26 18:00关注
1、producer.shutdown();注释掉
2、增加第三个参数,超时时间设置public class AsyncProducer3 { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { //创建生产组 DefaultMQProducer producer = new DefaultMQProducer("SyncProducwe"); //连接 producer.setNamesrvAddr("192.168.88.130:9876"); //开始 producer.start(); for (int i=0;i<100;i++){ final int index=i; Message message = new Message("Simple","TagA",(i+"HH").getBytes(StandardCharsets.UTF_8)); producer.send(message, new SendCallback() { @Override//发送成功调用该方法 public void onSuccess(SendResult sendResult) { System.out.println(index+"消息发送成功_"+sendResult); } @Override//发送失败调用该方法 public void onException(Throwable throwable) { System.out.println(index+"消息发送失败_"+throwable.getStackTrace()); } },30*1000); } //producer.shutdown(); } }
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报 编辑记录
悬赏问题
- ¥15 35114 SVAC视频验签的问题
- ¥15 impedancepy
- ¥15 在虚拟机环境下完成以下,要求截图!
- ¥15 求往届大挑得奖作品(ppt…)
- ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
- ¥50 浦育平台scratch图形化编程
- ¥20 求这个的原理图 只要原理图
- ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
- ¥20 微信的店铺小程序如何修改背景图
- ¥15 UE5.1局部变量对蓝图不可见