at89c51 2019-06-20 21:17 采纳率: 0%
浏览 376
已结题

ActiveMQ发布订阅问题?

使用ActiveMQ开发,如果发布者每1秒发布一条数据,有些订阅者每4秒读取一条,怎么才能让订阅者跳过缓存,读取到最新数据。

 //发布者测试代码
    public class MessageProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://localhost:61616";

        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            //Destination destination = session.createQueue(QUEUE_NAME);

            Topic test = session.createTopic("572890");
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(test);
            //创建模拟100个消息

            int num = 0;
            long sTime = System.currentTimeMillis();
            TextMessage message = null;
            while (true) {
                num++;
                message = session.createTextMessage(num+"");
                //发送消息
                producer.send(message);
                try {
                    System.out.println(num);
                    Thread.sleep(1000);
                } catch (Exception e) {

                }
                if (num > 100) {
                    break;
                }
            }
            long eTime = System.currentTimeMillis();
            System.out.println("总计用时:" + (eTime - sTime));
            //关闭连接
            connection.close();
        }
    }

    //订阅者测试代码

    public class MessageConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "PP_ERROR";

        static int num = 0;
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            //Destination destination = session.createQueue(QUEUE_NAME);
            Topic test = session.createTopic("572890");

            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(test);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    num ++;
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                        Thread.sleep(4000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            });
        }
    }


  • 写回答

2条回答 默认 最新

  • at89c51 2019-06-20 21:27
    关注

    生产者设置过期时间 producer.setTimeToLive(200);

    评论

报告相同问题?

悬赏问题

  • ¥20 simulink中怎么使用solve函数?
  • ¥30 dspbuilder中使用signalcompiler时报错Error during compilation: Fitter failed,求解决办法
  • ¥15 gwas 分析-数据质控之过滤稀有突变中出现的问题
  • ¥15 没有注册类 (异常来自 HRESULT: 0x80040154 (REGDB_E_CLASSNOTREG))
  • ¥15 知识蒸馏实战博客问题
  • ¥15 用PLC设计纸袋糊底机送料系统
  • ¥15 simulink仿真中dtc控制永磁同步电机如何控制开关频率
  • ¥15 用C语言输入方程怎么
  • ¥15 网站显示不安全连接问题
  • ¥15 51单片机显示器问题