package com.example.rocketmq.coml;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
//事务消息
public class Comsumer {
public static void main(String[] args) throws MQClientException {
//创建消息组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
//连接
consumer.setNamesrvAddr("192.168.88.130:9876");
//拿到Simple,再过滤
consumer.subscribe("Transaction","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i=1;i<list.size();i++){
System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
package com.example.rocketmq.coml;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//事务消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("Producer");
producer.setNamesrvAddr("192.168.88.130:9876");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("ExecutorService");
return thread;
}
});
producer.setExecutorService(threadPoolExecutor);
producer.setTransactionListener(new setTransactionListener());
producer.start();
String[] tags= new String[]{"TagA","TagB","TagC","TagD","TagE"};
for (int i=0;i<10;i++){
Message message = new Message(" Transaction","tags[i%tags.length]",(tags[i%tags.length]+"TransactionMQProducer").getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println("消息发送成功"+transactionSendResult);
Thread.sleep(10);
}
Thread.sleep(100000);
producer.shutdown();
}
}
package com.example.rocketmq.coml;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.core.metrics.StartupStep;
//事务消息
public class setTransactionListener implements org.apache.rocketmq.client.producer.TransactionListener {
@Override//执行
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String tags = message.getTags();
if (StringUtils.contains("TagA", tags)){
return LocalTransactionState.COMMIT_MESSAGE;
}
if (StringUtils.contains("TagB", tags)){
//回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else {
//无状态
return LocalTransactionState.UNKNOW;
}
}
@Override//回查
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String tags = messageExt.getTags();
if (StringUtils.contains("TagC", tags)){
return LocalTransactionState.COMMIT_MESSAGE;
}
if (StringUtils.contains("TagD", tags)){
//回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else {
//无状态
return LocalTransactionState.UNKNOW;
}
}
}
事务消息,报错,请求修改调试成功后再粘贴出正确答案
- 写回答
- 好问题 0 提建议
- 关注问题
- 邀请回答
-
12条回答 默认 最新
关注参考通义千问和郭老师的小迷弟雅思莫了-编写提供,您可以参考如下,如果回答的不正确,及时评论区回复我,我会根据你错误描述追加回复。如果解决了您的问题,麻烦帮我点击个采纳!谢谢您的肯定!
根据您提供的代码,我发现了以下问题:
com.example.rocketmq.coml包名可能存在错误或重复。- 在
Producer类中,setTransactionListener类名与接口名称不匹配,应该是实现类的名字,我将其改为TransactionListenerImpl。
已修正的代码如下:
// Consumer.java package com.example.rocketmq.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { // 创建消息组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncConsumer"); // 连接 consumer.setNamesrvAddr("192.168.88.130:9876"); // 拿到Simple,再过滤 consumer.subscribe("Transaction", "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (int i = 0; i < list.size(); i++) { System.out.println("消息消费成功_" + new String(list.get(i).getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动成功"); } } // Producer.java package com.example.rocketmq.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import java.nio.charset.StandardCharsets; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("192.168.88.130:9876"); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("ExecutorService"); return thread; } }); producer.setExecutorService(threadPoolExecutor); producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { String tag = tags[i % tags.length]; Message message = new Message("Transaction", tag, (tag + "TransactionMQProducer").getBytes(StandardCharsets.UTF_8)); TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null); System.out.println("消息发送成功:" + transactionSendResult); Thread.sleep(10); } Thread.sleep(100000); producer.shutdown(); } } // TransactionListenerImpl.java package com.example.rocketmq.producer; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { String tags = message.getTags(); if (StringUtils.contains(tags, "TagA")) { return LocalTransactionState.COMMIT_MESSAGE; } if (StringUtils.contains(tags, "TagB")) { // 回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } else { // 无状态 return LocalTransactionState.UNCOMMITTED; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String tags = messageExt.getTags(); if (StringUtils.contains(tags, "TagC")) { return LocalTransactionState.COMMIT_MESSAGE; } if (StringUtils.contains(tags, "TagD")) { // 回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } else { // 无状态 return LocalTransactionState.UNCOMMITTED; } } }请注意,在实际场景中,您需要根据业务逻辑来编写
executeLocalTransaction和checkLocalTransaction方法。上述示例仅作为演示用途。另外,请确保在您的 RocketMQ 集群中存在名为 "Transaction" 的主题。本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报