生产者
public class Producer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic= new ActiveMQTopic("testTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i=0; i<10; i++){
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
// session.close();
// connection.stop();
// connection.close();
}
}
发布消息的结果
Sent message: message_1341915173083
Sent message: message_1341915173085
Sent message: message_1341915173085
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173087
Sent message: message_1341915173087
Sent message: message_1341915173088
Sent message: message_1341915173088
消费者
public class Consumer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic= new ActiveMQTopic("testTopic");
// javax.jms.Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createConsumer( topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// session.close();
// connection.stop();
// connection.close();
}
}
消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉, 求解答