使用ActiveMQ开发,如果发布者每1秒发布一条数据,有些订阅者每4秒读取一条,怎么才能让订阅者跳过缓存,读取到最新数据。
//发布者测试代码
public class MessageProducer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
//Destination destination = session.createQueue(QUEUE_NAME);
Topic test = session.createTopic("572890");
//创建一个生产者
javax.jms.MessageProducer producer = session.createProducer(test);
//创建模拟100个消息
int num = 0;
long sTime = System.currentTimeMillis();
TextMessage message = null;
while (true) {
num++;
message = session.createTextMessage(num+"");
//发送消息
producer.send(message);
try {
System.out.println(num);
Thread.sleep(1000);
} catch (Exception e) {
}
if (num > 100) {
break;
}
}
long eTime = System.currentTimeMillis();
System.out.println("总计用时:" + (eTime - sTime));
//关闭连接
connection.close();
}
}
//订阅者测试代码
public class MessageConsumer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
//定义发送消息的队列名称
private static final String QUEUE_NAME = "PP_ERROR";
static int num = 0;
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
//Destination destination = session.createQueue(QUEUE_NAME);
Topic test = session.createTopic("572890");
//创建消费者
javax.jms.MessageConsumer consumer = session.createConsumer(test);
//创建消费的监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
num ++;
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}