at89c51 2019-06-20 21:17 采纳率: 0%
浏览 376
已结题

ActiveMQ发布订阅问题?

使用ActiveMQ开发,如果发布者每1秒发布一条数据,有些订阅者每4秒读取一条,怎么才能让订阅者跳过缓存,读取到最新数据。

 //发布者测试代码
    public class MessageProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://localhost:61616";

        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            //Destination destination = session.createQueue(QUEUE_NAME);

            Topic test = session.createTopic("572890");
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(test);
            //创建模拟100个消息

            int num = 0;
            long sTime = System.currentTimeMillis();
            TextMessage message = null;
            while (true) {
                num++;
                message = session.createTextMessage(num+"");
                //发送消息
                producer.send(message);
                try {
                    System.out.println(num);
                    Thread.sleep(1000);
                } catch (Exception e) {

                }
                if (num > 100) {
                    break;
                }
            }
            long eTime = System.currentTimeMillis();
            System.out.println("总计用时:" + (eTime - sTime));
            //关闭连接
            connection.close();
        }
    }

    //订阅者测试代码

    public class MessageConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "PP_ERROR";

        static int num = 0;
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            //Destination destination = session.createQueue(QUEUE_NAME);
            Topic test = session.createTopic("572890");

            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(test);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    num ++;
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                        Thread.sleep(4000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            });
        }
    }


  • 写回答

2条回答 默认 最新

  • at89c51 2019-06-20 21:27
    关注

    生产者设置过期时间 producer.setTimeToLive(200);

    评论

报告相同问题?

悬赏问题

  • ¥15 HFSS 中的 H 场图与 MATLAB 中绘制的 B1 场 部分对应不上
  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?