ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息

 

生产者

public class Producer {
     public static void main(String[] args) throws JMSException {  
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
            Connection connection = factory.createConnection();  
            connection.start();  
              
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            ActiveMQTopic topic= new ActiveMQTopic("testTopic");  
      
            MessageProducer producer = session.createProducer(topic);  
           producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
      
    
            for(int i=0; i<10; i++){
                TextMessage message = session.createTextMessage();  
               message.setText("message_" + System.currentTimeMillis());  
               producer.send(message);  
               System.out.println("Sent message: " + message.getText());  
            }
              
           
      
//        session.close();  
//        connection.stop();  
//        connection.close();  
        }  
}

发布消息的结果
Sent message: message_1341915173083
Sent message: message_1341915173085
Sent message: message_1341915173085
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173087
Sent message: message_1341915173087
Sent message: message_1341915173088
Sent message: message_1341915173088






消费者
public class Consumer {
    public static void main(String[] args) throws JMSException {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
          
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        ActiveMQTopic topic= new ActiveMQTopic("testTopic");
       // javax.jms.Topic topic =  session.createTopic("myTopic.messages");  
  
        MessageConsumer consumer = session.createConsumer( topic);  
        consumer.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message: " + tm.getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        
        
        
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
             public void onMessage(Message message) {  
                 TextMessage tm = (TextMessage) message;  
                 try {  
                     System.out.println("Received message: " + tm.getText());  
                 } catch (JMSException e) {  
                     e.printStackTrace();  
                 }  
             }  
        });
//      session.close();  
//      connection.stop();  
//      connection.close();  
    }  
}
消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉,  求解答

 

 

0

1个回答

亲测 搞了个小例子 我的例子代码来自:
http://www.open-open.com/lib/view/open1328079945062.html
然后跑了一下 消费者能接到消息

ActiveMQTopic topic= new ActiveMQTopic("testTopic"); 怀疑你这个代码原因
觉得应该用这种方式
Topic topic = session.createTopic("testTopic");

另外我测试过程我写了个小博客 我的博客地址:[url]http://babydeed.iteye.com/blog/1584561[/url] 里面含有我的代码

0
works001
works001 说道这里 你应该明白了
大约 7 年之前 回复
works001
works001 ActiveMQTopic topic= new ActiveMQTopic("testTopic"); 这种方式 消费者和生产者未必是同一topic
大约 7 年之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
spring JMS、activemq中消费者收不到生产者发送的消息的原因解析
我们使用jms一般是使用spring-jms和activemq相结合,通过spring的JmsTemplate发送消息到指定的Destination。rn rn    首先定义一个activemq的连接池:rn rnrnrnnXml代码  nrnrnn&amp;lt;bean id=&quot;connectionFactory&quot; class=&quot;org.apache.activemq.pool.PooledConn...
activemq发送消息接收不到问题排查
昨天,在公司开发环境上尝试数据同步,涉及到两个服务,服务A发布主题,服务B监听消息,然后同步A表中的数据到B表;nn由服务A通过activemq发布消息到一个topic:VirtualTopic.topic,发现服务A日志上显示发送消息成功,VirtualTopic.topic对应的Messages Enqueued数目增加了2,表明topic上也产生了相应数目消息队列;但是服务B日志上迟迟没有打...
activemq中多个consumer引起接收不到信息
使用activemq时,若一个mq有两个comsumer时,会有接收不到信息的情况
ActiveMQ服务重启 收不到消息
ActiveMQ做大并发的缓冲,消息的生产和消费都是用的spring封装的消息监听容器。rn在系统测试过程中偶然发现,消费无法发送,发送端直接报异常。显示连接拒绝。直接重启ActiveMQ,问题依然存在。rn只能重启消费发送的服务,消息可以发送了。查看ActiveMQ控制台,有消息发送成功了,但是没有消费者。重启消费者服务,可以了。rn可以以为是spring的消息监听容器的问题,找了半天,没有找到
ActiveMQ消息传递
ActiveMQnnnnActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。nn简单来说ActiveMQ是JMS规范的实现者之一,因为它免费,所以用的人就多nn要想使用Activ...
淘淘商城系列——ActiveMQ发送topic消息和接收topic消息
我相信大家通过前面的学习,已然知道了如何发送队列消息及消费队列消息。本文我们将一起学习如何发送topic消息和接收topic消息。 n我们依然在TestActiceMQ单元测试类中添加一个测试方法,用来测试发送topic消息,如下图所示,其实这个方法与发送队列消息几乎一样,只是创建Destination对象的时候不一样而已。 n n为了方便大家复制,现将testTopicProducer测试方法的代
ActiveMQ Topic消息失败重发
       消息失败重发指的是当消息的接收方没有成功的消费消息,我们需要重发消息,让消息的接收方成功消费这个消息,保证事务的完整性和消息的一致性。nn一、JMS消息确认机制nn    在session接口中定义的几个常量:nn   AUTO_ACKNOWLEDGE =  1    自动确认nn  CLIENT_ACKNOWEDGE =  2    客户端手动确认nn  DUPS_OK_ACKNO...
ActiveMQ(二)———使用Topic来发送消息
摘要:每个消息可以有多个消费者,发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。引入三个jar包,这些jar包在activeMQ的安装包中都有: nge
突然不发消息给ActiveMQ但能接收
n        今天项目在联调过程中,ActiveMQ突然不好使了。在此之前一月内,项目组的人都没有去修改、重启过MQ服务。rn        虽然现在知道是由于权限的问题导致只能收不能发(站在ActiveMQ角度是收不到,但可以发)。只是到现在还不知道原来没变过的代码,怎么以前可以用,现在却不行了???rn       通过查询示例代码,发现有connectionFactory.setUser...
ActiveMQ中消费者是如何接收消息的(一)
rn       事先说明,本博客关于ActiveMQ的文章都是基于ActiveMQ5.10版本。        初步用过ActiveMQ但又没去研究过源码的朋友肯定有些好奇ActiveMQ中消费者是如何接收消息的呢?本文我就和大家一起从源码角度来初步探讨消费者接收消息的过程。        我们知道,消息传送有两种模型:点对点(P2P)和发布订阅(PUB/SUB),队列模式中,消息生产者叫做发送...
ActiveMQ支持多个topic订阅消息接收设置
描述:最近做一个预警的功能,使用到ActiveMQ消息订阅功能,但是问题来了,页面上有四个Topic订阅消息就接收不到消息,接收一个topic消息就没问题。排查问题后,程序是没问题的,问题应该出现在ActiveMQ配置文件设置那块。直接找到activemq.xml,如图:nnnn源代码:nn&amp;lt;broker xmlns=&quot;http://activemq.apache.org/schema/co...
Kafka学习笔记:消息生产者、消费者以及消息发布的不同模型
消息生产者、消费者以及消息发布的不同模型
ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息
 rnrn rn  
ActiveMQ:点对点队列消费者接收不到消息
一.环境说明nWindows 1709n阿里云ECS nCentOS 7.4nActiveMQ 5.15.2nJDK 1.8nIDEA 2017.3nMaven 3.5.0 n二.问题说明远程消息服务器使用的是阿里云ECS,在windows上编写测试类测试消息队列的点对点的通信,却发现,无法消费生产者生产的消息,即接收不到消息.三.代码testProducer @Testn public v
ActiveMQ暂停接收消息与恢复接收消息
目录nn nn●问题起因nn●解决方案nn●测试结果nn●问题起因nn使用ActiveMQ的过程中,有这么一个很常见的需求:如果我的消息消费者压力过大,接收消息来不及处理,我想暂停接收消息,先处理完已接收到的消息后,再恢复继续接收新的消息。nn比较诡异的是,这样一个看似简单的功能,百度上居然没有一个适用的方案,大家不信的话可以搜索试试:nnnn第一篇文章看似OK,但是点进去你会发现他的实现方式依旧...
activemq订阅模式以及消息时长和确认机制
代码如下:rnrnrnrnrnrn[java] viewrn plain copyrnrnrn rnrnrnrnpackage com.activemq;  rnimport org.apache.activemq.ActiveMQConnectionFactory;  rnimport javax.jms.*;   rn  rnpublic class TopicPub {  rn     pu
Spring-boot中简单操作activemq(生产者消费者代码)
pom.xml添加依赖:nnn&amp;lt;!-- activemq --&amp;gt;n&amp;lt;dependency&amp;gt;n &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt;n &amp;lt;artifactId&amp;gt;spring-boot-starter-activemq&amp;lt;/artifactId&amp;gt;n&amp;lt;/dependency&amp;gt
关于ActiveMQ中Topic持久化配置问题
说明:记录一下在ActiveMQ中踩过的坑。 n关键字:jms:listener-container,topic订阅持久化。nDemo的目录结构(说明中有各个文件的作用) n n不懂ActiveMQ的可以,在网上搜搜,有很多很好的博客,在这里我只简单的介绍怎么用,标注一些坑。哪里不足,欢迎提出。前提:activeMQ(解压之后的文件夹)–conf –activemq.xml n applicat
ActiveMQ的queue以及topic两种消息处理机制分析
rn        上一期介绍了我们项目要用到activeMQ来作为jms总线,并且给大家介绍了activeMQ的集群和高可用部署方案,本期给大家再介绍下,如何根据自己的项目需求,更好地使用activeMQ的两种消息处理模式。rn       rn1    queue与topic的技术特点对比rn rnrnnnrn     对比项rnrnrnTopicrnrnrnQueuernrnnnrn概要rn...
ActiveMQ和spring整合,订阅主题和消息消费
本文章适用初学ActiveMQ的统同学。n本演示为windows下进行n首先下载ActiveMQn1.ActiveMQ地址 n   ActiveMQ官方网站:http://activemq.apache.org/ nn2.下载完成或解压到本地F:/目录下nnnnn3.启动本地ActiveMQ服务。n  进入到安装目录双击activemq.bat文件启动,如果启动时窗口一闪而
MQ的概念及用ActiveMQ实现一个生产者多个消费者共享消息队列
MQ的优点及使用场景:nn消息总线(Message Queue),后文称MQ,是一种跨进程的通信机制,用于上下游传递消息。nn n在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。nn使用了MQ之后,消息发送上游只需要依赖MQ,逻辑上和物理上都不用依赖其他服务。nn什么时候不使用消息总线? n既然MQ是互联网分层架构中的解耦利器,那所有通讯都使用MQ岂不是很好?这是一...
ActiveMQ多个消费者消费不均匀问题
先上图rnrnrn如果客户端处理很慢的话,Broker会在之前发送消息的反馈之前,继续发送新的消息到客户端。如果客户端依旧很慢的话,没有得到确认反馈的消息会持续增长。在这种情况下,Broker有可能会停止发送消息给消费者。当未被反馈的消息达到了prefetchrn limit设置的数字时,Broker将会停止给消费者发送新的消息。除非消费者开始给与反馈,否则得不到任何消息。rnrnrnrnDefa
spring整合ActiveMQ订阅模式(对象消息发送)
归纳一下spring整合activemq的基础配置以及使用,监听到消息后的更多逻辑操作需要自己完成,需要源代码可联系:993610778
【消息队列】ActiveMQ的简单实例 - 生产者消费者模式
安装首先,需要去官网下载windows版本(如果使用的是Linux,就下载对应Linux的)的ActiveMQ并安装,下载地址下载完后解压缩,进入bin目录,打开一个控制台,输入:activemq.bat start就可以启动ActiveMQ了。Github上项目地址:https://github.com/zgljl2012/activemq-learn实现在Eclipse中创建一个Maven项目(
activemq消息的产生与消费 Java
activemq:进入bin目录启动,localhost:8161访问activemq客户端 n1.发送消息给activemq客户端:nnnn String url = &quot;tcp://ip地址:61616&quot;;n String user = ActiveMQConnection.DEFAULT_USER; //默认用户名adminn String password = Activ...
基于Kafka的生产者消费者消息处理本地调试
(尊重劳动成果,转载请注明出处:http://blog.csdn.net/qq_25827845/article/details/68174111冷血之心的博客)nnKafka下载地址:http://download.csdn.net/download/qq_25827845/9798176nn安装解压即可nn配置修改zookeeper.properties 与server.propert...
spring+activemq topic持久化订阅
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:QueueProducer 、消息消费者1:SimpleJMSReceiver 消息消费者2:SimpleJMSReceiver2
kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
kafka(java客户端)消费者取不到消息,生产者消息也没发送成功先说下我使用的各种版本: nkafka版本:kafka_2.12-1.0.0 nzookeeper版本:3.4.11 npom中使用的客户端版本:0.11.0.0在Ubuntu虚拟机上安装了kafka,zookeepe之后,没有修改过其中的任何配置文件,kafka默认端口是9029,zookeeper默认端口是2181, n分别运行
ActiveMq实践之排他性消费者
[code=&quot;java&quot;]activemq支持的功能:排他性消费者rn背景:rn这个功能主要是为了解决jms集群的问题而提出的rn当你使用product 产生一个消息并且放置到一个queue中的时候rn如果说你使用了集群,并且每一个集群里面消费的队列都是刚才的那个队列rn那么这个时候就会出现并发的时候 这么多集群rn同时都消费了这个队列里面产生的消息,出现了重复的消费rn一般的情况下碰到这个请求时...
SpringBoot+ActiveMq实现订阅模式(Topic)消息队列
上文已经详细介绍了点对点模式(Queue)下的消息队列,今天就来再介绍一下消息队列的另一种模式:订阅模式。nn一、订阅模式的流程nn生产者产生一条消息message放入一个topic中,该topic已经三个消费者订阅了,那么被放入topic中的这条消息,就会同时被这三个消费者取走(当然他们必须都处于在线状态),并进行“消费”。其实就类似现实生活中的手机接收推送。nnnn二、订阅模式的应用场景nnn...
ActiveMQ利用selector消费在消息堆积量大的时候不起作用的解决
ActiveMQ利用selector消费在消息堆积量大的时候不起作用的解决rn现象:rn​ 我们在使用activemq queue的时候,发送方会把两个城市的消息都发到一个queue里,但是会在消息扩展属性里加上代表不同城市的参数,然后两个城市的应用都会启带不同selector的消费者到queue里消费各自的消息。前几天A市的服务挂了,开始B市的服务还能正常的消费消息,当随着queue里消息堆积的越来...
active mq 多个消费者实战(发布订阅模式)
注意:所有的配置文件都在src文件下nn           声明:  在这里不讲activemq 是什么,本人只是根据平时用到的东西整理一下希望对大家有所帮助。n        首先是生产者的配置文件n<beans xmlns="http://www.springframework.org/schema/beans"n xmlns:context="http://www.springfra
spring boot整合activeMQ,实现queue和topic两者消息模式
如何下载安装MQ我就不说了,百度一大把,老规矩先上一下项目目录结构: n先看一下配置文件,主要是中间件的配置: nps:1.主要注意的是activeMQ默认提供ptp模式,若要使用topic模式需要假如最后一个配置为truespring.activemq.broker-url=tcp://localhost:61616nspring.activemq.in-memory=true nspring.
如何实现ActiveMq的Topic的持久订阅
rn原文地址:http://www.mytju.com/classcode/news_readNews.asp?newsID=486rn rn(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。首先,假设消费者都是普通...
ActiveMQ消息的发布与订阅
消息的发布与订阅nPublish类npackage com.cb01;nnimport javax.jms.Connection;nimport javax.jms.ConnectionFactory;nimport javax.jms.Destination;nimport javax.jms.JMSException;nimport javax.jms.MessageProducer;ni
activemq broker topic消息收发处理过程
首先看一下activemq的网络模型(获取自网络)。 n n可以看到,TcpTransportServer是一个监听网络的类,如果有socket连接,便会放入阻塞队列,然后创建一个TransportConnection类,该类接收socket的消息,进行逻辑处理;nn先看看TransportConnector在干啥 n n第一步:设置server处理类 n第二步:接收到连接socket加入线程池 ...
淘淘商城42-使用java向ActiveMQ服务器发送Queue消息与Topic消息
目录nn1.导入依赖nn2.发收Queue消息nn2.1消息生产者nn2.2消息消费者nn3.发收Topic消息nn3.1消息生成者nn3.2消息消费者nn4.总结nn1.导入依赖nn首先导入maven依赖,也可以直接导入jar包nnn&amp;lt;dependency&amp;gt;n &amp;lt;groupId&amp;gt;org.apache.activemq&amp;lt;/groupId&amp;gt;n &amp;lt;artifa...
spring+activemq配置多个生产者,多个消费者并发处理消息
先贴配置
kafka踩坑之消费者收不到消息
生产者发送消息,客户端始终消费不到nn原因:nn客户端版本与服务端不一致nn解决:nn我这里服务端使用的是:kafka_2.10-0.8.2.1.tgz,客户端原来使用的是0.8.1,需要改为:nn&amp;lt;dependency&amp;gt;n &amp;lt;groupId&amp;gt;org.apache.kafka&amp;lt;/groupId&amp;gt;n &amp;lt;artifactId&amp;gt;...
ActiveMQ中消费者是如何接收消息的(二)
上篇文章大致讲述了同步消费者和异步消费者接收消息的异同(详见《ActiveMQ中消费者是如何接收消息的(一)》http://manzhizhen.iteye.com/blog/2094130 ),但我们还未讲到消息是在什么时候放入消费者ActiveMQMessageConsumer类的“消息容器”unconsumedMessages中的,这很关键,因为为了解耦,消费者类不需要知道你ActiveMQ
文章热词 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型 设计制作学习
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 微信消息接口开发视频 微信末班消息java