it_is_me 2017-09-08 06:47
浏览 3737

求解决:ActiveMQ在发布订阅模式下会递增式的重复接收消息

求解决:ActiveMQ在发布订阅模式下有多个consumer,consumer会递增式的重复接收消息

问题:springboot集成ActiveMQ实现了消费队列和发布订阅两种消息类型,发布订阅模式下有TopicConsumer1和TopicConsumer2,监听的destination都是alarm.topic,但这两个consumer会递增式的重复接收消息。我的实现代码如下:

test入口类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MessageCenterApplication.class)
public class ActivemqTest {
@Autowired
private Producer producer;
@Test
public void testSendTopicMessage(){
for (int i = 0; i < 5; i++) {
producer.sendTopicMessage(Destinations.ALARM_TOPIC, "here is a topic message, the number is " + i);
}
}
/*输出结果如下,这个输出结果不是我所期望的
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 0
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 0
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4*/

}

Producer实现类
@Component
public class Producer {
@Autowired
@Qualifier("jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
@Qualifier("jmsTopicMessagingTemplate")
private JmsMessagingTemplate jmsTopicMessagingTemplate;
/**
* 使用queue消息类型(点对点)
/
public void sendQueueMessage(String destination, final String message){
Queue queue = new ActiveMQQueue(destination);
jmsQueueMessagingTemplate.convertAndSend(queue, message);
}
/
*
* 使用topic消息类型
*/
public void sendTopicMessage(String destination, final String message){
Topic topic = new ActiveMQTopic(destination);
jmsTopicMessagingTemplate.convertAndSend(topic, message);
}

}

TopicConsumer1和TopicConsumer2的实现代码
@Component
public class TopicConsumer1 {

@JmsListener(destination = Destinations.ALARM_TOPIC, containerFactory = "jmsListenerContainerFactory4Topic")
public void receive(String message){
    System.out.println("TopicConsumer1接收到的消息内容为:" + message);
}

}
@Component
public class TopicConsumer2 {

@JmsListener(destination = Destinations.ALARM_TOPIC, containerFactory = "jmsListenerContainerFactory4Topic")
public void receive(String message){
    System.out.println("TopicConsumer2接收到的消息内容为:" + message);
}

}

jmsTopicMessagingTemplate对应的bean的配置
@Configuration
public class JmsMessagingConfig {
@Autowired
@Qualifier(value = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
@Autowired
@Qualifier(value = "jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
@Bean(name = "jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate(){
return new JmsMessagingTemplate(jmsQueueTemplate);
}
@Bean(name = "jmsTopicMessagingTemplate")
public JmsMessagingTemplate jmsTopicMessagingTemplate(){
return new JmsMessagingTemplate(jmsTopicTemplate);
}

}

jmsTopicTemplate对应的bean的配置
@Configuration
public class JmsConfig {

private final JmsProperties jmsProperties;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;

public JmsConfig(JmsProperties jmsProperties, ObjectProvider<DestinationResolver> destinationResolver,
        ObjectProvider<MessageConverter> messageConverter) {
    this.jmsProperties = jmsProperties;
    this.destinationResolver = destinationResolver;
    this.messageConverter = messageConverter;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsQueueTemplate")
@Primary
public JmsTemplate jmsQueueTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    // 设置为使用queue消息类型
    template.setPubSubDomain(false);

    DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    if (destinationResolver != null) {
        template.setDestinationResolver(destinationResolver);
    }
    MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    if (messageConverter != null) {
        template.setMessageConverter(messageConverter);
    }

    // deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    template.setExplicitQosEnabled(true);
    // DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    template.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 默认不开启事务
    // 如果不启用事务,则会导致XA事务失效;
    // 作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    template.setSessionTransacted(true);
    return template;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsTopicTemplate")
public JmsTemplate jmsTopicTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    // 设置为使用topic消息类型
    template.setPubSubDomain(true);

    DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    if (destinationResolver != null) {
        template.setDestinationResolver(destinationResolver);
    }
    MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    if (messageConverter != null) {
        template.setMessageConverter(messageConverter);
    }

    // deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    template.setExplicitQosEnabled(true);
    // DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    template.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 默认不开启事务
    // 如果不启用事务,则会导致XA事务失效;
    // 作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    template.setSessionTransacted(true);
    return template;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsListenerContainerFactory4Topic")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic(PooledConnectionFactory pooledConnectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(pooledConnectionFactory);
    // 设置为使用topic消息类型
    factory.setPubSubDomain(true);
    //如果单独配置了分布式事务,则启用
    // if (this.transactionManager != null) {
    // factory.setTransactionManager(transactionManager);
    // } else {
    factory.setSessionTransacted(Boolean.valueOf(true));
    // }
    JmsProperties.Listener listener = jmsProperties.getListener();
    factory.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
        factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null){
        factory.setConcurrency(concurrency);
    }
    return factory;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsListenerContainerFactory4Queue")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue(PooledConnectionFactory pooledConnectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(pooledConnectionFactory);
    // 设置为使用queue消息类型
    factory.setPubSubDomain(false);
    //如果单独配置了分布式事务,则启用
    // if (this.transactionManager != null) {
    // factory.setTransactionManager(transactionManager);
    // } else {
    factory.setSessionTransacted(Boolean.valueOf(true));
    // }
    JmsProperties.Listener listener = jmsProperties.getListener();
    factory.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
        factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null){
        factory.setConcurrency(concurrency);
    }
    return factory;
}

}

yml配置文件的配置
spring:
#jms配置,配合activemq一起配置
jms:
listener:
concurrency: 1
max-concurrency: 50
#activemq配置
activemq:
#默认使用spring-boot-starter-activemq内嵌的activemq,除非明确指定了broker-url
in-memory: false
user: system
password: manager
broker-url: auto+nio://192.168.137.5:61616
pool:
enabled: true
max-connections: 500

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 eclipse运行项目时遇到的问题
    • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
    • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
    • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
    • ¥50 成都蓉城足球俱乐部小程序抢票
    • ¥15 yolov7训练自己的数据集
    • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
    • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
    • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)
    • ¥20 matlab yalmip kkt 双层优化问题