weixin_46095148 2024-01-26 18:57 采纳率: 91.1%
浏览 10
已结题

事务消息,报错,请求修改调试成功后再粘贴出正确答案


package com.example.rocketmq.coml;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

//事务消息
public class Comsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消息组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Synccomsumer");
        //连接
        consumer.setNamesrvAddr("192.168.88.130:9876");
        //拿到Simple,再过滤
        consumer.subscribe("Transaction","*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i=1;i<list.size();i++){
                    System.out.println("消息消费成功_"+new String(list.get(i).getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动成功");

    }
}
package com.example.rocketmq.coml;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//事务消息
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("Producer");
        producer.setNamesrvAddr("192.168.88.130:9876");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("ExecutorService");

                return thread;
            }
        });
        producer.setExecutorService(threadPoolExecutor);
        producer.setTransactionListener(new setTransactionListener());
        producer.start();
        String[] tags= new String[]{"TagA","TagB","TagC","TagD","TagE"};
        for (int i=0;i<10;i++){
            Message message = new Message(" Transaction","tags[i%tags.length]",(tags[i%tags.length]+"TransactionMQProducer").getBytes(StandardCharsets.UTF_8));
            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            System.out.println("消息发送成功"+transactionSendResult);
            Thread.sleep(10);
        }
        Thread.sleep(100000);
        producer.shutdown();

    }
}
package com.example.rocketmq.coml;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.core.metrics.StartupStep;
//事务消息
public class setTransactionListener implements org.apache.rocketmq.client.producer.TransactionListener {
    @Override//执行
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String tags = message.getTags();
        if (StringUtils.contains("TagA", tags)){
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if (StringUtils.contains("TagB", tags)){
            //回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        else {
            //无状态
            return LocalTransactionState.UNKNOW;


        }

    }

    @Override//回查
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        String tags = messageExt.getTags();
        if (StringUtils.contains("TagC", tags)){
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if (StringUtils.contains("TagD", tags)){
            //回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        else {
            //无状态
            return LocalTransactionState.UNKNOW;


        }
    }
}

  • 写回答

12条回答 默认 最新

  • 叫兽-郭老师 新星创作者: Java技术领域 2024-01-26 19:00
    关注

    参考通义千问和郭老师的小迷弟雅思莫了-编写提供,您可以参考如下,如果回答的不正确,及时评论区回复我,我会根据你错误描述追加回复。如果解决了您的问题,麻烦帮我点击个采纳!谢谢您的肯定!

    根据您提供的代码,我发现了以下问题:

    1. com.example.rocketmq.coml 包名可能存在错误或重复。
    2. Producer 类中,setTransactionListener 类名与接口名称不匹配,应该是实现类的名字,我将其改为 TransactionListenerImpl

    已修正的代码如下:

    // Consumer.java
    package com.example.rocketmq.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            // 创建消息组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncConsumer");
            // 连接
            consumer.setNamesrvAddr("192.168.88.130:9876");
            // 拿到Simple,再过滤
            consumer.subscribe("Transaction", "*");
            consumer.setMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (int i = 0; i < list.size(); i++) {
                        System.out.println("消息消费成功_" + new String(list.get(i).getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("消费者启动成功");
        }
    }
    
    // Producer.java
    package com.example.rocketmq.producer;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.common.message.Message;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
            producer.setNamesrvAddr("192.168.88.130:9876");
    
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("ExecutorService");
                    return thread;
                }
            });
    
            producer.setExecutorService(threadPoolExecutor);
            producer.setTransactionListener(new TransactionListenerImpl());
            producer.start();
    
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                String tag = tags[i % tags.length];
                Message message = new Message("Transaction", tag, (tag + "TransactionMQProducer").getBytes(StandardCharsets.UTF_8));
                TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
                System.out.println("消息发送成功:" + transactionSendResult);
                Thread.sleep(10);
            }
    
            Thread.sleep(100000);
            producer.shutdown();
        }
    }
    
    // TransactionListenerImpl.java
    package com.example.rocketmq.producer;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class TransactionListenerImpl implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
            String tags = message.getTags();
            if (StringUtils.contains(tags, "TagA")) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            if (StringUtils.contains(tags, "TagB")) {
                // 回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else {
                // 无状态
                return LocalTransactionState.UNCOMMITTED;
            }
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            String tags = messageExt.getTags();
            if (StringUtils.contains(tags, "TagC")) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            if (StringUtils.contains(tags, "TagD")) {
                // 回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else {
                // 无状态
                return LocalTransactionState.UNCOMMITTED;
            }
        }
    }
    

    请注意,在实际场景中,您需要根据业务逻辑来编写 executeLocalTransactioncheckLocalTransaction 方法。上述示例仅作为演示用途。另外,请确保在您的 RocketMQ 集群中存在名为 "Transaction" 的主题。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(11条)

报告相同问题?

问题事件

  • 系统已结题 2月4日
  • 已采纳回答 1月27日
  • 创建了问题 1月26日