ActiveMQ持久化订阅后,为什么Messages Dequeued一直都是0

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

/**

  • 消息的生产者(发送者)
  • @author chen
    *
    */
    public class JMSPub {
    //private static final String USERNAME = SystemContent.getUSERNAME();
    //private static final String PASSWORD = SystemContent.getPASSWORD();
    //private static final String BROKEURL = SystemContent.getURL();
    //private static final int SENDNUM = 10;

    public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //链接
    Connection connection = null;
    //会话 接受或者发送消息的线程
    Session session = null;
    //消息的目的地
    Destination destiation = null;
    //消息生成者
    MessageProducer messageProducer = null;
    try {
    //实例化连接工厂
    connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
    //通过工厂获得连接
    connection = connectionFactory.createConnection();
    //启动连接
    connection.start();
    //创建session
    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    destiation = session.createTopic("userSyncTopic");
    //创建消息生产者
    messageProducer = session.createProducer(destiation);
    //设置持久化方式/非持久化 如果非持久化,那么意味着MQ的重启会导致消息丢失
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //发送消息
    sendMessage(session, messageProducer);
    session.commit();
    } catch (JMSException e) {
    e.printStackTrace();
    }finally{
    if(connection != null){
    try {
    connection.close();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }

    /**

    • 发送消息
    • @param session
    • @param messageProducer
    • @throws JMSException */ public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ try{ for(int i = 0;i < 10;i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送消息 "+i); message.setStringProperty("property", "消息Property"); messageProducer.send(message); System.out.println("ActiveMQ 已发送消息:"+i); } }catch(Exception e){ e.printStackTrace(); }

    }
    }

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

public class JMSSub{

public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //连接
    Connection connection = null;
    //会话
    Session session = null;
    //消息的目的地
    Topic topic = null;
    //消息的消费者(接收方)
    MessageConsumer  consumer = null;
    try {
        connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
        connection = connectionFactory.createConnection();
        connection.setClientID("client_ids");
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic("userSyncTopic");
        consumer = session.createDurableSubscriber(topic, "client_ids");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage mes = (TextMessage) message;
                    try {
                        System.out.println("收到的消息:" + mes.getText());
                        mes.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }finally{

    }
}

}

消费者即使下线后仍能收到消息,可是为什么队列里面的消息数不会减少呀,用的是mysql的持久化方式

1个回答

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
activemq中持久化订阅
       在activemq中,当我们第一次通过session.createDurableSubscriber(topic,name)创建一个特定名称的持久化订阅时,broker就会为这个持久化订阅者维护一个主题消息列表,该主题消息列表存放了所有未被持久化订阅者消费的消息,持久化订阅者一旦消费了这条消息,这条消息就会从主题消息列表中移除,这样一来,持久化订阅者所在的机器如果突然宕机,那么当...
ActiveMQ的持久化订阅
默认情况下,ActiveMQ的发布订阅模式是不做持久化处理的,也就是说,采用发布订阅模式,发布者发布消息时,如果消费者不在线,该消息就丢失了,即便消费者再上线,也不会收到离线时的消息。我们可以做一些设置,使得消费者上线后,也能收到离线时的消息。代码如下。 pom文件<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http
关于ActiveMq的持久化订阅
1. ActiveMq 客户端 2 修改 clientId 与 durableSubscriptionName的值,每次启动 都会在ActiveMq实列注册持久定阅者,通过控制台可以看到多个持久订阅者,如图 数据库中表记录如下select * from ...
springboot activemq 2 持久化消息 与 持久化订阅
接着上一节http://blog.csdn.net/cons_step_by_step/article/details/78300427。 改动1.减少springboot重复创建session的问题 jmsTemplate的地方加入了CachingConnectionFactory,这样配置可以 @Bean(name = "myJmsTemplate") public JmsTem
ActiveMQ订阅模式持久化实现
我的诉求是,建一个订阅通道,然后多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。 本文主要参考了《使用Spring配置ActiveMQ的发布订阅模式》(http://nettm.iteye.com/blog/1828268),将他们复制粘贴过来,基本上就ok了。 在找到这篇文章前,《如何实现ActiveMq的Topic的持久订阅》(http://www.my
Spring配置ActiveMQ持久化发布、订阅Topic
通过Spring对ActiveMQ进行配置开发,发布订阅模式,支持消息的持久化。 需要Spring2.5版本以上,如果有多个订阅者,每个订阅者需要指定不同的 clientId 。   发布者的配置:   Xml代码   xml version="1.0" encoding="UTF-8"?>      beans xmlns="http://ww
ActiveMQ 持久化
ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存 在内存中的。 当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。 具体的持久化策略由配置文件中的具体配置决定。 ActiveMQ的默认存储策略是kahadb。如果使用JDBC作为持久化策略,则会将所有的 需要持久化...
jms持久化topic订阅
哪位有spring结合jms 的持久化topic订阅的配置,小弟急需,万分感谢!
JMS 发布/持久化订阅
– Start 发送消息 package shangbo.activeMQ.example7; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Mess...
spring+activemq topic持久化订阅
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:QueueProducer 、消息消费者1:SimpleJMSReceiver 消息消费者2:SimpleJMSReceiver2
activeMQ消息详解(续) 订阅(主题)消息(消息持久化)
activeMQ 持久化消息 订阅消息
ActiveMQ入门(四)-消息的持久化订阅、应答模式(可靠性)
一、持久化订阅 前几篇博客中提到,消息的两种模式,点对点(queue)和发布订阅(topic),queue模式下,消息是会被持久化到磁盘,而topic模式下,消息会随着服务的停止而消失,但是某些场景下,我们想将topic消息也进行持久化,只需要进行如下改动。 生产者端:只需要将目的地创建成Topic模式即可,其他不用变 session.createTopic("xxxx"); imp...
AMQ-Topic-主题-发布订阅模式持久化订阅与非持久化订阅
    目前在开发的项目使用了AMQ(ActiveMQ)中间件的Topic模式(主题模式、发布订阅模式)实现消息收发的功能。    为什么使用发布订阅模式呢,因为项目是C/S架构(民航业,没办法)。服务端对应多个客户端,客户端自己就可以实现订阅消息,接收消息,发送消息,甚至根据不同业务模块分发消息。因此,在MQ的使用上也不同于一般的B/S项目(B/S项目一般是使用队列模式-queue,在避免消息漏...
ActiveMq持久化数据的方式
ActiveMQ的另一个问题就是只要是软件就有可能挂掉,怕的是挂掉之后把信息给丢了,怎么办,可以进行消息的持久化,ActiveMQ提供了几种持久化方式。 一、AMQ(基于文件存储的方式) 它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将...
ActiveMQ的消息存储持久化(五)
                        ActiveMQ的消息存储持久化(五) Memory Message Store 内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。 Memory Message Store配置示例 &amp;lt;beans&amp;gt; &amp;lt;broker brokerNam...
ActiveMQ的消息持久化到Mysql数据库
1、将连接Mysql数据库的jar文件,放到ActiveMQ的lib目录下        2、修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式        2.1  修改原来的kshadb的持久化数据的方式 -->        2.2  连接Mysql的配置(注意配置文件放置的位置)
ActiveMQ持久化到mysql数据库
一:修改activeMq.xml 配置数据库连接信息: MQ本身不带mysql驱动需要把mysql驱动jar,复制到lib目录下,数据库连接池用的是MQ中自带的dbcp2 启动的时候如果报错: Cannot load JDBC driver class 'com.mysql.jdbc.Driver 有可能是mysql驱动包的版本问题,楼主的是这样换了一个版本就可以正常启动了。 ...
ActiveMQ 消息持久化到Mysql数据库
修改配置文件持久化到Mysql数据库 文件路径:\conf\activemq.xml &amp;lt;persistenceAdapter&amp;gt; &amp;lt;!--&amp;lt;kahaDB directory=&quot;${activemq.data}/kahadb&quot;/&amp;gt;--&amp;gt; &amp;lt;jdbcPersistenceAdapter dataSource=&quot;#mysql-ds&quot;/&amp;gt;...
ActiveMQ 消息存储持久化
有效的消息存储ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种:1:AMQ消息存储-基于文件的存储方式,是以前的默认消息存储2:KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式3:JDBC消息存储-消息基于JDBC存储的4:Memory 消息存储-基于内存的消息存储KahaDB Message Store概述KahaDB是目前默认的存储方式...
ActiveMQ的持久化与集群
ActiveMQ存储消息可以采用多种持久化方案,每种方案都有自己特有的集群方案。
ActiveMQ 持久化配置
修改配置文件activemq.xml修改persistenceAdapter将下面这段配置,注释掉<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> 然后更新成下面的配置:<persistenceAdapter> <jdbcPersistenceAdapter
ActiveMQ Oracle数据库持久化配置
ActiveMQ提供了可扩展的持久化方案,下面是我测试可行的oracle数据库持久化配置方案。注意: 需要在ActiveMQ中加入jdbc依赖的jar包,实测用到的jar包有: commons-pool-1.5.4.jar commons-dbcp-1.4.jar ojdbc14-10.2.0.4.0.jar 如果没有上述jar包启动ActiveMQ服务器会报错。ActiveMQ、SpringF
JMS之——ActiveMQ消息持久化
之前的几篇博文中,我们实现的ActiveMQ消息未实现消息的持久化,一旦ActiveMQ服务重启则未发送的消息全部丢失,所以实现ActiveMQ消息的持久化也是很重要的。那么,这篇文章,我就带着大家一起来实现ActiveMQ消息的持久化。本博文主要是将ActiveMQ消息持久化到MySQL数据库。实现ActiveMQ消息的持久化主要是修改ActiveMQ conf目录下的activemq.xml文
ActiveMQ持久化MySql配置
[b]apache-activemq-5.11.1-bin.zip[/b] activemq.xml文件配置: [code=&quot;java&quot;] file:${activemq.conf}/credentials.properties ...
ActiveMQ 持久化Mysql配置及实例
1. 基本配置其基本配置需要更改的有两块内容。其意就是配置activemq.xml文件,再者就是配置mysql数据库。首先,进入activemq安装目录,在conf文件夹下打开activemq.xml文件,修改一下配置:(1) 默认使用的是kahadb,如下:        &amp;lt;persistenceAdapter&amp;gt;                          &amp;lt;kahaDB...
ActiveMQ消息持久化到数据库
以持久化到mysql为例安装ActiveMQ默认是持久化到文件中,具体配置如下:&amp;lt;persistenceAdapter&amp;gt; &amp;lt;kahaDB directory=&quot;${activemq.base}/data/kahadb&quot;/&amp;gt; &amp;lt;/persistenceAdapter&amp;gt;1. 首先需要把MySql的驱动放到ActiveMQ的Lib目录下2. 修改配置文件&amp;lt;per...
ActiveMQ的消息存储持久化(一)
                                 ActiveMQ的消息存储持久化(一) 概述 ActiveMQ不仅支持persistent【持久的】和non-persistent【非持久的】两种方式,还支持消息的recovery【恢复】方式。 PTP Queue的存储是很简单的,就是一个FIFO的Queue。 PUB/SUB 对于持久化订阅主题,每一个消费者将获得...
ActiveMQ入门教程(四)之持久化
ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存在内存中。当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。 所有的持久化配置在conf/activemq.xml中配置,配置信息都在broker标签内部定义,ActiveMQ提供了插件式的消息存储,主要由如下几种: ...
ActiveMQ的消息存储持久化(二)
                               ActiveMQ的消息存储持久化(二) AMQ Message Store概述 AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。 这种方式中,Messages被保存到data logs中,同时被reference...
ActiveMQ的持久化方式
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。1、AMQAMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
ActiveMQ 数据库持久化配置
jdbc数据库持久化方式。  首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.1.30-bin.jar activemq将数据持久化到数据库中,不指定具体的数据库,可以使用任意的数据库中,本环节使用mysql数据库。  下述配置都在activemq.xml中配置, 首先定义一个mysql-ds的mysql数据源,该...
ActiveMq之持久化到Mysql数据库-yellowcong
持久化ActiveMq到Mysql中,默认的配置是持久化到kahadb里面,我们可以修改存储为 Mysql里面。kahadb是一个基于内存的数据库,所以效率是相对于mysql高的。
activemq 持久化的问题
1、用的是amq,未被消费的消息已经存文件里了,重启activemq后消息未被重新发送rn2、后上线的消费者,也未收到先发送的消息
ActiveMQ消息的发布与订阅
消息的发布与订阅 Publish类 package com.cb01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; i
ActiveMQ -- 消息持久化
消息持久化,就是将消息进行一个存储。如图所示,主题中的消息都是持久化后的,订阅同一topic的用户,都有自己的一个指针记录,消息处理的进展。KahaDB存储 基于文件形式存储的。 基于文件存储,不需要第三方存储数据库。 使用KahaDB存储需要配置activemq.xml中<persistenceAdapter> <persistenceAdapter> <ka
ActiveMQ持久化配置
ActiveMQ持久化配置 1  概述 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。 ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS
activemq发布订阅消息持久化
一:消息生产者MessageProducer调用setDeliverMode设置发布消息持久化方式,不设置默认也是持久化 import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProducer { public static void main(Str...
ActiveMQ持久化的方式
activeMq的持久化方式 通过activeMq的持久化, 即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。 AMQ:是一种文件存储形式,它具有写入速度快和容易恢复的特点,消息存储在一个个文件中,每个文件不能超过32m;(适合5.3之前的...
四、activemq 持久化
这里用mysql进行持久化,其他的都差不多第一步:将MySQL的数据库驱动复制到activeMQ的lib目录下 (需要jar包找我)第二步:在${activemq.base}/conf/activemq.xml文件中配置持久化适配器createTablesOnStartup=&quot;false&quot; 创建表 useDatabaseLock=&quot;false&quot; 上锁(问题2)&amp;lt;persisten...
activemq消息的持久化
/** * Created by brady on 17/4/8. */ public class TestProducer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFac...
相关热词 c# stream 复制 android c# c#监测窗口句柄 c# md5 引用 c# 判断tabtip 自己写个浏览器程序c# c# 字符串变成整数数组 c#语言编程写出一个方法 c# 转盘抽奖 c#选中treeview