ActiveMQ持久化订阅后,为什么Messages Dequeued一直都是0

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

/**

  • 消息的生产者(发送者)
  • @author chen
    *
    */
    public class JMSPub {
    //private static final String USERNAME = SystemContent.getUSERNAME();
    //private static final String PASSWORD = SystemContent.getPASSWORD();
    //private static final String BROKEURL = SystemContent.getURL();
    //private static final int SENDNUM = 10;

    public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //链接
    Connection connection = null;
    //会话 接受或者发送消息的线程
    Session session = null;
    //消息的目的地
    Destination destiation = null;
    //消息生成者
    MessageProducer messageProducer = null;
    try {
    //实例化连接工厂
    connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
    //通过工厂获得连接
    connection = connectionFactory.createConnection();
    //启动连接
    connection.start();
    //创建session
    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    destiation = session.createTopic("userSyncTopic");
    //创建消息生产者
    messageProducer = session.createProducer(destiation);
    //设置持久化方式/非持久化 如果非持久化,那么意味着MQ的重启会导致消息丢失
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //发送消息
    sendMessage(session, messageProducer);
    session.commit();
    } catch (JMSException e) {
    e.printStackTrace();
    }finally{
    if(connection != null){
    try {
    connection.close();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }

    /**

    • 发送消息
    • @param session
    • @param messageProducer
    • @throws JMSException */ public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ try{ for(int i = 0;i < 10;i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送消息 "+i); message.setStringProperty("property", "消息Property"); messageProducer.send(message); System.out.println("ActiveMQ 已发送消息:"+i); } }catch(Exception e){ e.printStackTrace(); }

    }
    }

package com.activmq.tms.PubSub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activmq.tms.util.SystemContent;

public class JMSSub{

public static void main(String[] args) {
    //连接工厂
    ConnectionFactory connectionFactory = null;
    //连接
    Connection connection = null;
    //会话
    Session session = null;
    //消息的目的地
    Topic topic = null;
    //消息的消费者(接收方)
    MessageConsumer  consumer = null;
    try {
        connectionFactory = new ActiveMQConnectionFactory(SystemContent.getUSERNAME(),SystemContent.getPASSWORD(),SystemContent.getURL());
        connection = connectionFactory.createConnection();
        connection.setClientID("client_ids");
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic("userSyncTopic");
        consumer = session.createDurableSubscriber(topic, "client_ids");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage mes = (TextMessage) message;
                    try {
                        System.out.println("收到的消息:" + mes.getText());
                        mes.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }finally{

    }
}

}

消费者即使下线后仍能收到消息,可是为什么队列里面的消息数不会减少呀,用的是mysql的持久化方式

0

1个回答

-2
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
ActiveMQ持久订阅设置
在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。     JMS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候
ActiveMQ(九)--持久的Topic消息示例
生产者 import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class PersistenceSender { public static void main(String[] args) throws JMSException, InterruptedException ...
spring+activemq topic持久化订阅
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:QueueProducer 、消息消费者1:SimpleJMSReceiver 消息消费者2:SimpleJMSReceiver2
activemq中持久订阅者和非持久订阅者区别
1、消息的传输类型: 上文已经讲过,activemq支持两只传输的模式:持久化和非持久化。可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; producer.setDeliveryMode(DeliveryMode.PERSISTENT); 持久传输和非持久传输最大的区别是:采用持久
Spring+ActiveMQ消息持久化,Topic持久化订阅
消息持久化就是将消息保存到磁盘,这样的好处就是如果服务挂了,则消息还保存在磁盘不会丢失,服务起来后还能找到消息并在此发送,消息的持久化和消息的发送模型是没有关系的。 消息持久化的配置很方便的,所以其他的那些就不写出来了,可以看看上一篇文章中的同步异步实现方式。这里只把持久化配置的列出来。 [html] view plain copy  print?
ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系
①DeliveryMode 这是传输模式。ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery),默认情况下使用的是持久传输。 可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; produce
ActiveMQ订阅模式持久化实现
我的诉求是,建一个订阅通道,然后多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。 本文主要参考了《使用Spring配置ActiveMQ的发布订阅模式》(http://nettm.iteye.com/blog/1828268),将他们复制粘贴过来,基本上就ok了。 在找到这篇文章前,《如何实现ActiveMq的Topic的持久订阅》(http://www.my
关于ActiveMq的持久化订阅
1. ActiveMq 客户端 2 修改 clientId 与 durableSubscriptionName的值,每次启动 都会在ActiveMq实列注册持久定阅者,通过控制台可以看到多个持久订阅者,如图 数据库中表记录如下select * from ...
springboot activemq 2 持久化消息 与 持久化订阅
接着上一节http://blog.csdn.net/cons_step_by_step/article/details/78300427。 改动1.减少springboot重复创建session的问题 jmsTemplate的地方加入了CachingConnectionFactory,这样配置可以 @Bean(name = "myJmsTemplate") public JmsTem
activeMQ消息详解(续) 订阅(主题)消息(消息持久化)
activeMQ 持久化消息 订阅消息
JMS学习十一(Spring+ActiveMQ消息持久化,Topic持久化订阅)
消息持久化就是将消息保存到磁盘,这样的好处就是如果服务挂了,则消息还保存在磁盘不会丢失,服务起来后还能找到消息并在此发送,消息的持久化和消息的发送模型是没有关系的。 消息持久化的配置很方便的,所以其他的那些就不写出来了,可以看看上一篇文章中的同步异步实现方式。这里只把持久化配置的列出来。 <bean class="org.springframework.j
五、ActiveMQ添加了mysql的持久化后,发了消息,但是MSGS表中没有记录.
1.持久化以后 activemq数据库 会创建3张表&amp;lt;bean id=&quot;derby-ds&quot; class=&quot;org.apache.commons.dbcp2.BasicDataSource&quot; destroy-method=&quot;close&quot;&amp;gt;&amp;lt;property name=&quot;driverClassName&quot; value=&quot;com.mysql.jdbc.Driver&quot;/&am
C#使用NMS与ActiveMQ通讯问题总结:如何持久化发布
/** * msg:发布的内容。 * Apache.NMS.MsgDeliveryMode.Persistent: 持久化 * Apache.NMS.MsgPriority.Normal:内容优先级(重要性)
eclipse paho包对于ActiveMQ持久化订阅者的设置
在实现基于ActiveMQ的电影推送系统的过程中,因为是Android端的应用程序,而在查阅网上的各种资料发现,Android端直接用原生的MQTT来做推送的比较少,而eclipse paho这个封装好的API似乎比较好用在Android端的推送上,于是就采用这个包来做。推送的大致流程可以查看这个网页:基于paho包的Android demo 将逻辑写在Service可以使程序在后台执行
如何实现ActiveMQ的Topic的持久订阅
消息持久化就是将消息保存到磁盘,这样的好处就是如果服务挂了,则消息还保存在磁盘不会丢失,服务起来后还能找到消息并在此发送,消息的持久化和消息的发送模型是没有关系的。 消息持久化的配置很方便的,所以其他的那些就不写出来了,可以看看上一篇文章中的同步异步实现方式。这里只把持久化配置的列出来。   [html] view plain copy    &amp;lt;!-- spring 使用jmsTe...
activemq持久订阅工作原理
对activemq消息订阅模式来说有两种:持久订阅/非持久订阅。 非持久订阅consumer只能消费在该consumer激活状态时传送给对应topic的消息才能被该consumer消费,一旦该consumer 挂掉到下次启动期间发布到该topic的消息不能被该consumer重新恢复时使用!!! 持久订阅:订阅之后,无论消息是否是在该consumer激活或者down掉期间发送的,最终都会被该c...
(转)消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用 PHP demo
原文链接:http://blog.csdn.net/shagoo/article/details/6077686   随 着互联网企业业务量的不断扩大,企业信息网络系统的愈加复杂,性能问题也就越来越凸显出来,串行的业务处理方式显然已经成为主要的瓶颈,我们需要更多 异 步的并行处理来提高企业信息系统的业务处理能力,因此独立的消息处理系统也就应运而生,ActiveMQ  就是诸多开源消息系统...
ActiveMQ之Topic的持久订阅
非持久化订阅持续到它们订阅对象的生命周期。这意味着,客户端只能在订阅者活动时看到相关主题发布的消息。如果订阅者不活动,它会错过相关主题的消息。如果花费较大的开销,订阅者可以被定义为durable(持久化的)。持久化的订阅者注册一个带有JMS保持的唯一标识的持久化订阅(subscription)。带有相同标识的后续订阅者会再续前一个订阅者的订阅状态。如果持久化订阅没有活动的订阅者,JMS会保持订阅消息,直到消息被订阅接收或者过期。
activemq发布订阅消息持久化
一:消息生产者MessageProducer调用setDeliverMode设置发布消息持久化方式,不设置默认也是持久化 import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProducer { public static void main(Str...
ActiveMQ消息持久化与消息有效期
在生产环境中,手工签收的方式比较合适,因为某个消息在消费端没有成功处理的情况下,可以不给ActiveMQ消息中间件发送针对这个消息的确认签收。同时,记录相关信息到日志文件或数据库中,以便后续做相应处理。在默认情况下,消息在ActiveMQ消息中间件中是不会过期的,可以根据实际的项目需要去设置消息的过期时间,单位毫秒。 消息优先级总共十个,即0-9。其中,0-4是普通消息,5-9是加急消息
activeMQ将消息持久化到数据库
本demo是将msg持久化到oracle数据库 需要在ActiveMQ中加入jdbc依赖的jar包,实测用到的jar包有: commons-pool-1.5.4.jarcommons-dbcp-1.4.jarojdbc14-10.2.0.4.0.jar 将这三个jar包下载后复制到activeMQ的conf包lib下 修改activeMQ.xml文件 替换已有的p
ActiveMQ整合spring实现持久化消息接收
       在我们生产过程中往往存在两个项目接口调用场景,但是这中场景下我们很难保证百分百的网络问题和服务问题,所在就会导致我们在调用接口的时候连接超时或者访问不到的情况发生,以致我们的数据丢失。         出现以上问题不用担心,本文就是重点介绍如何通过ActiveMQ的持久化操作解决请求丢失数据。 首先我们需要了解一下ActiveMQ的持久化方式,多的不说我们这里介绍两种持久化方式,...
关于ActiveMQ中Topic持久化配置问题
说明:记录一下在ActiveMQ中踩过的坑。 关键字:jms:listener-container,topic订阅持久化。 Demo的目录结构(说明中有各个文件的作用) 不懂ActiveMQ的可以,在网上搜搜,有很多很好的博客,在这里我只简单的介绍怎么用,标注一些坑。哪里不足,欢迎提出。前提:activeMQ(解压之后的文件夹)–conf –activemq.xml applicat
JMS学习七(ActiveMQ消息持久化)
上一篇文章中消息的持久化订阅有个前提条件就是消息的持久化,也就是将发送的消息保存磁盘,之后能再次获取,ActiveMQ提供了好几种消息持久化方案比如kahaDB 、JDBC等,下面来一个个看。
MQTT下ActiveMQ的消息持久化
当 MQTT客户端订阅者同时满足如下条件时,会变成持久订阅者,此时可以为MQTT订阅者持久化消息到数据库或文件存储: 1. cleanSession为false 2. clientId不为空
ActiveMQ的持久化与集群
ActiveMQ存储消息可以采用多种持久化方案,每种方案都有自己特有的集群方案。
ActiveMQ(三)———spring消息持久化配置
一、Topic与Queue比较1、Topic Publish Subscribe messaging 发布订阅消息。 topic数据默认不落地,是无状态的。 并不保证publisher发布的每条数据,Subscriber都能接受到 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 一对多的消息发布
activemq消息持久化所需Jar包
activemq消息持久化所需Jar包,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/68997105
activemq 持久化topic处理过程及其消息游标轮转问题的解决方案
    如果消息是持久化的,activemq收到消息后会存储在持久性cursor中。对于非持久化消息,会存储在File Cursor中。从名称上File Cursor是持久性cursor,实际上activemq把FilePendingMessageCursor作为非持久性cursor。File Cursor首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中,这...
ActiveMQ Spring 整合持久化到数据库的实现
http://topmanopensource.iteye.com/blog/1070096
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
消息中间件-activemq实战之消息持久化(六)
对于activemq消息的持久化我们在第二节的时候就简单介绍过,今天我们详细的来分析一下activemq的持久化过程以及持久化插件。在生产环境中为确保消息的可靠性,我们肯定的面临持久化消息的问题,今天就一起来攻克他吧。1. 持久化方式介绍前面我们也简单提到了activemq提供的插件式的消息存储,在这里再提一下,主要有以下几种方式: AMQ消息存储-基于文件的存储方式,是activemq开始的版本默
activeMQ持久化策略介绍与配置方式
activeMQ中对于投递模式设置为持久化的消息,broker接收到到消息之后,会先把消息存储到存储介质,然后再转发到消息的监听者,activeMQ提供以下几种消息持久化策略。 KahaDB存储 KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消...
ActiveMQ之定期清理离线的持久订阅者
概述 通常,我们不希望系统中存在长时间离线的持久订阅者,因为Broker需要为它们保留它们订阅的topic的所有消息。而且随着时间的推移,将会导致达到存储限制,从而导致系统变慢。 当然,你可以通过JConsole或Web Console等管理工具来手动取消不活跃的持久订阅者。但显然可以采取更多措施来帮助管理。 过期消息 一些应用程序发送的消息有一定的过期时间。如果这些消息存储在Broker上供离线...
ActiveMQ 持久化配置
修改配置文件activemq.xml修改persistenceAdapter将下面这段配置,注释掉<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> 然后更新成下面的配置:<persistenceAdapter> <jdbcPersistenceAdapter
Sping集成activeMQ演示queue和topic模式,持久化
  本文源码: https://download.csdn.net/download/qq_23009109/10605210 包含 activemq服务器, 已经配置好了数据库,按说明改下自己的 mysql数据库地址,用户名和密码,包含queue,topic持久化 第一步:  一 、下载activemq 官网:http://activemq.apache.org/download.htm...
mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给
mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给他保留,topic只有持久化订阅者会保留   (1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可 。然后,随便一个消费者取走处理即可。某个消费者
ActiveMQ Oracle数据库持久化配置
ActiveMQ提供了可扩展的持久化方案,下面是我测试可行的oracle数据库持久化配置方案。注意: 需要在ActiveMQ中加入jdbc依赖的jar包,实测用到的jar包有: commons-pool-1.5.4.jar commons-dbcp-1.4.jar ojdbc14-10.2.0.4.0.jar 如果没有上述jar包启动ActiveMQ服务器会报错。ActiveMQ、SpringF
ActiveMQ的消息持久化到Mysql数据库
1、将连接Mysql数据库的jar文件,放到ActiveMQ的lib目录下        2、修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式        2.1  修改原来的kshadb的持久化数据的方式 -->        2.2  连接Mysql的配置(注意配置文件放置的位置)
Activemq同时支持多个Topic类型通信,并且配置添加到服务里面方便管理
并且配置添加到服务里面方便管理,并且配置添加到服务里面方便管理 详细配置请访问:https://blog.csdn.net/Joe192/article/details/81215188
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 人人都是产品经理培训 人人都是产品经理视频