package com.activmq.tms.PubSub;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activmq.tms.util.SystemContent;
/**
- 消息的生产者(发送者)
-
@author chen
*
*/
public class JMSPub {
//private static final String USERNAME = SystemContent.getUSERNAME();
//private static final String PASSWORD = SystemContent.getPASSWORD();
//private static final String BROKEURL = SystemContent.getURL();
//private static final int SENDNUM = 10;public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory = null;
//链接
Connection connection = null;
//会话 接受或者发送消息的线程
Session session = null;
//消息的目的地
Destination destiation = null;
//消息生成者
MessageProducer messageProducer = null;
try {
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
//通过工厂获得连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destiation = session.createTopic("userSyncTopic");
//创建消息生产者
messageProducer = session.createProducer(destiation);
//设置持久化方式/非持久化 如果非持久化,那么意味着MQ的重启会导致消息丢失
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//发送消息
sendMessage(session, messageProducer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}/**
- 发送消息
- @param session
- @param messageProducer
- @throws JMSException */ public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ try{ for(int i = 0;i < 10;i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送消息 "+i); message.setStringProperty("property", "消息Property"); messageProducer.send(message); System.out.println("ActiveMQ 已发送消息:"+i); } }catch(Exception e){ e.printStackTrace(); }
}
}
package com.activmq.tms.PubSub;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activmq.tms.util.SystemContent;
public class JMSSub{
public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory = null;
//连接
Connection connection = null;
//会话
Session session = null;
//消息的目的地
Topic topic = null;
//消息的消费者(接收方)
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
connection = connectionFactory.createConnection();
connection.setClientID("client_ids");
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("userSyncTopic");
consumer = session.createDurableSubscriber(topic, "client_ids");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage mes = (TextMessage) message;
try {
System.out.println("收到的消息:" + mes.getText());
mes.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}finally{
}
}
}
消费者即使下线后仍能收到消息,可是为什么队列里面的消息数不会减少呀,用的是mysql的持久化方式