tjw1218 2016-09-28 08:58 采纳率: 80%
浏览 3928
已采纳

amq消费者接受消息如何保证有序,可靠

activemq点对点:
消费者:
public class AMQSend{
public static void send(String filePath, String newFilename, String contentType, String queueName) throws Exception{
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();

        //发送消息的线程
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queueName);

        //设置数据持久化
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //将文件内容复制到StringBuffer里面
        StringBuffer sb = new StringBuffer();
        Reader reader = null;
        BufferedReader br = null;
        try {
            reader = new FileReader(filePath);
            br = new BufferedReader(reader);
            String data = null;
            while ((data = br.readLine()) != null) {
                sb.append(data);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
                if(reader != null) {
                    try {
                        reader.close();
                    }catch (Exception e){
                        e.printStackTrace();
                }
                if(br != null) {
                    try {
                        br.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
     TextMessage message;
        message = session.createTextMessage(sb.toString());
        producer.send(message);

        producer.close();
        session.close();
        connection.close();

        }catch(Exception e){
            System.out.println("上传异常:\r\n" + e.getMessage());
        }
    }

}
消费者:
public class AMQReci{
public static void Reci(String queueName) throws Exception {
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();

                // 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(queueName);
                MessageConsumer consumer = session.createConsumer(destination);

                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try{
                            if(message instanceof TextMessage) {
                                TextMessage tm = (TextMessage) message;
                                String text = tm.getText();
                                System.out.println(text);
                            }
                        }catch(JMSException e){
                            e.printStackTrace();
                        }
                    }
                });

                consumer.close();
                session.close();
                connection.close();

            }catch(Exception e){
                e.printStackTrace();
            }
        }

}
测试代码:
import java.io.File;

public class AMQTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
String fileName = "a";
String contentType = "txt";
String queueName = "queue.test1";
AMQSend amqSend = new AMQSend();
AMQReci amqReci = new AMQReci();
File file = new File("E:\test");
File[] tempList = file.listFiles();
System.out.println("该目录下对象个数." + tempList.length);
try{
for (int i = 0; i < tempList.length; i++) {
if (tempList[i].isFile()) {
amqSend.send(tempList[i].getAbsolutePath(), fileName, contentType, queueName);
amqReci.Reci(queueName);
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
测试结果:(文件夹下有两个txt文件)
该目录下对象个数.2
This is t2
This is t1
This is t2
This is t1
This is t2
This is t1
每次运行的消费者有可能接收不到消息,然后下次运行会把上菜在消息队列的内容
取出来,但是有可能会乱序。
我要怎么样才能保证每次调用都能够发送和稳定可靠接收?
PS:可能问题太长了,希望有大神能帮忙解决,任务很急。

  • 写回答

2条回答 默认 最新

  • Evankaka 博客专家认证 2016-09-29 03:49
    关注

    发消息都加一个时间戳,然后取消息时先把消息取下来,放到一个延时队列,规定在过一定的时间才能取出来。可以搜一搜delayQueue

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 lammps拉伸应力应变曲线分析
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥15 请问Lammps做复合材料拉伸模拟,应力应变曲线问题
  • ¥30 python代码,帮调试,帮帮忙吧
  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建