jiazhenhu2016 2019-06-20 21:00 采纳率: 0%
浏览 287

ActiveMQ发布订阅问题?

1、使用ActiveMQ开发,如果发布者每1秒发布一条数据,有些订阅者每4秒读取一条,怎么才能让订阅者跳过缓存,读取到最新数据。

//发布者测试代码
public class MessageProducer {
    //定义ActivMQ的连接地址
    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ACTIVEMQ_URL);
        //创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开连接
        connection.start();
        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建队列目标
        //Destination destination = session.createQueue(QUEUE_NAME);

        Topic test = session.createTopic("572890");
        //创建一个生产者
        javax.jms.MessageProducer producer = session.createProducer(test);
        //创建模拟100个消息

        int num = 0;
        long sTime = System.currentTimeMillis();
        TextMessage message = null;
        while (true) {
            num++;
            message = session.createTextMessage(num+"");
            //发送消息
            producer.send(message);
            try {
                System.out.println(num);
                Thread.sleep(1000);
            } catch (Exception e) {

            }
            if (num > 100) {
                break;
            }
        }
        long eTime = System.currentTimeMillis();
        System.out.println("总计用时:" + (eTime - sTime));
        //关闭连接
        connection.close();
    }
}

//订阅者测试代码

public class MessageConsumer {
    //定义ActivMQ的连接地址
    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    //定义发送消息的队列名称
    private static final String QUEUE_NAME = "PP_ERROR";

    static int num = 0;
    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开连接
        connection.start();
        //创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建队列目标
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic test = session.createTopic("572890");

        //创建消费者
        javax.jms.MessageConsumer consumer = session.createConsumer(test);
        //创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                num ++;
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                    Thread.sleep(4000);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        });
    }
}
  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-09-21 01:53
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 LiBeAs的带隙等于0.997eV,计算阴离子的N和P
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 来真人,不要ai!matlab有关常微分方程的问题求解决,
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误
  • ¥199 rust编程架构设计的方案 有偿
  • ¥15 回答4f系统的像差计算