ActiveMQ持久化订阅后,为什么Messages Dequeued一直都是0

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

/**

  • 消息的生产者(发送者)
  • @author chen
    *
    */
    public class JMSPub {
    //private static final String USERNAME = SystemContent.getUSERNAME();
    //private static final String PASSWORD = SystemContent.getPASSWORD();
    //private static final String BROKEURL = SystemContent.getURL();
    //private static final int SENDNUM = 10;

    public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //链接
    Connection connection = null;
    //会话 接受或者发送消息的线程
    Session session = null;
    //消息的目的地
    Destination destiation = null;
    //消息生成者
    MessageProducer messageProducer = null;
    try {
    //实例化连接工厂
    connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
    //通过工厂获得连接
    connection = connectionFactory.createConnection();
    //启动连接
    connection.start();
    //创建session
    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    destiation = session.createTopic("userSyncTopic");
    //创建消息生产者
    messageProducer = session.createProducer(destiation);
    //设置持久化方式/非持久化 如果非持久化,那么意味着MQ的重启会导致消息丢失
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //发送消息
    sendMessage(session, messageProducer);
    session.commit();
    } catch (JMSException e) {
    e.printStackTrace();
    }finally{
    if(connection != null){
    try {
    connection.close();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }

    /**

    • 发送消息
    • @param session
    • @param messageProducer
    • @throws JMSException */ public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ try{ for(int i = 0;i < 10;i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送消息 "+i); message.setStringProperty("property", "消息Property"); messageProducer.send(message); System.out.println("ActiveMQ 已发送消息:"+i); } }catch(Exception e){ e.printStackTrace(); }

    }
    }

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

public class JMSSub{

public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //连接
    Connection connection = null;
    //会话
    Session session = null;
    //消息的目的地
    Topic topic = null;
    //消息的消费者(接收方)
    MessageConsumer  consumer = null;
    try {
        connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
        connection = connectionFactory.createConnection();
        connection.setClientID("client_ids");
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic("userSyncTopic");
        consumer = session.createDurableSubscriber(topic, "client_ids");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage mes = (TextMessage) message;
                    try {
                        System.out.println("收到的消息:" + mes.getText());
                        mes.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }finally{

    }
}

}

消费者即使下线后仍能收到消息,可是为什么队列里面的消息数不会减少呀,用的是mysql的持久化方式

1个回答

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问