it_is_me 2017-09-08 06:47
浏览 3736

求解决:ActiveMQ在发布订阅模式下会递增式的重复接收消息

求解决:ActiveMQ在发布订阅模式下有多个consumer,consumer会递增式的重复接收消息

问题:springboot集成ActiveMQ实现了消费队列和发布订阅两种消息类型,发布订阅模式下有TopicConsumer1和TopicConsumer2,监听的destination都是alarm.topic,但这两个consumer会递增式的重复接收消息。我的实现代码如下:

test入口类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MessageCenterApplication.class)
public class ActivemqTest {
@Autowired
private Producer producer;
@Test
public void testSendTopicMessage(){
for (int i = 0; i < 5; i++) {
producer.sendTopicMessage(Destinations.ALARM_TOPIC, "here is a topic message, the number is " + i);
}
}
/*输出结果如下,这个输出结果不是我所期望的
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 0
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 0
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 1
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 2
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 3
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer2接收到的消息内容为:here is a topic message, the number is 4
TopicConsumer1接收到的消息内容为:here is a topic message, the number is 4*/

}

Producer实现类
@Component
public class Producer {
@Autowired
@Qualifier("jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
@Qualifier("jmsTopicMessagingTemplate")
private JmsMessagingTemplate jmsTopicMessagingTemplate;
/**
* 使用queue消息类型(点对点)
/
public void sendQueueMessage(String destination, final String message){
Queue queue = new ActiveMQQueue(destination);
jmsQueueMessagingTemplate.convertAndSend(queue, message);
}
/
*
* 使用topic消息类型
*/
public void sendTopicMessage(String destination, final String message){
Topic topic = new ActiveMQTopic(destination);
jmsTopicMessagingTemplate.convertAndSend(topic, message);
}

}

TopicConsumer1和TopicConsumer2的实现代码
@Component
public class TopicConsumer1 {

@JmsListener(destination = Destinations.ALARM_TOPIC, containerFactory = "jmsListenerContainerFactory4Topic")
public void receive(String message){
    System.out.println("TopicConsumer1接收到的消息内容为:" + message);
}

}
@Component
public class TopicConsumer2 {

@JmsListener(destination = Destinations.ALARM_TOPIC, containerFactory = "jmsListenerContainerFactory4Topic")
public void receive(String message){
    System.out.println("TopicConsumer2接收到的消息内容为:" + message);
}

}

jmsTopicMessagingTemplate对应的bean的配置
@Configuration
public class JmsMessagingConfig {
@Autowired
@Qualifier(value = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
@Autowired
@Qualifier(value = "jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
@Bean(name = "jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate(){
return new JmsMessagingTemplate(jmsQueueTemplate);
}
@Bean(name = "jmsTopicMessagingTemplate")
public JmsMessagingTemplate jmsTopicMessagingTemplate(){
return new JmsMessagingTemplate(jmsTopicTemplate);
}

}

jmsTopicTemplate对应的bean的配置
@Configuration
public class JmsConfig {

private final JmsProperties jmsProperties;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;

public JmsConfig(JmsProperties jmsProperties, ObjectProvider<DestinationResolver> destinationResolver,
        ObjectProvider<MessageConverter> messageConverter) {
    this.jmsProperties = jmsProperties;
    this.destinationResolver = destinationResolver;
    this.messageConverter = messageConverter;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsQueueTemplate")
@Primary
public JmsTemplate jmsQueueTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    // 设置为使用queue消息类型
    template.setPubSubDomain(false);

    DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    if (destinationResolver != null) {
        template.setDestinationResolver(destinationResolver);
    }
    MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    if (messageConverter != null) {
        template.setMessageConverter(messageConverter);
    }

    // deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    template.setExplicitQosEnabled(true);
    // DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    template.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 默认不开启事务
    // 如果不启用事务,则会导致XA事务失效;
    // 作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    template.setSessionTransacted(true);
    return template;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsTopicTemplate")
public JmsTemplate jmsTopicTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    // 设置为使用topic消息类型
    template.setPubSubDomain(true);

    DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    if (destinationResolver != null) {
        template.setDestinationResolver(destinationResolver);
    }
    MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    if (messageConverter != null) {
        template.setMessageConverter(messageConverter);
    }

    // deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    template.setExplicitQosEnabled(true);
    // DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    template.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 默认不开启事务
    // 如果不启用事务,则会导致XA事务失效;
    // 作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    template.setSessionTransacted(true);
    return template;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsListenerContainerFactory4Topic")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic(PooledConnectionFactory pooledConnectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(pooledConnectionFactory);
    // 设置为使用topic消息类型
    factory.setPubSubDomain(true);
    //如果单独配置了分布式事务,则启用
    // if (this.transactionManager != null) {
    // factory.setTransactionManager(transactionManager);
    // } else {
    factory.setSessionTransacted(Boolean.valueOf(true));
    // }
    JmsProperties.Listener listener = jmsProperties.getListener();
    factory.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
        factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null){
        factory.setConcurrency(concurrency);
    }
    return factory;
}

@ConfigurationProperties(prefix = "spring.activemq")
@Bean(name = "jmsListenerContainerFactory4Queue")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue(PooledConnectionFactory pooledConnectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(pooledConnectionFactory);
    // 设置为使用queue消息类型
    factory.setPubSubDomain(false);
    //如果单独配置了分布式事务,则启用
    // if (this.transactionManager != null) {
    // factory.setTransactionManager(transactionManager);
    // } else {
    factory.setSessionTransacted(Boolean.valueOf(true));
    // }
    JmsProperties.Listener listener = jmsProperties.getListener();
    factory.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
        factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null){
        factory.setConcurrency(concurrency);
    }
    return factory;
}

}

yml配置文件的配置
spring:
#jms配置,配合activemq一起配置
jms:
listener:
concurrency: 1
max-concurrency: 50
#activemq配置
activemq:
#默认使用spring-boot-starter-activemq内嵌的activemq,除非明确指定了broker-url
in-memory: false
user: system
password: manager
broker-url: auto+nio://192.168.137.5:61616
pool:
enabled: true
max-connections: 500

  • 写回答

0条回答

    报告相同问题?

    悬赏问题

    • ¥15 矩阵加法的规则是两个矩阵中对应位置的数的绝对值进行加和
    • ¥15 活动选择题。最多可以参加几个项目?
    • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
    • ¥15 vs2019中数据导出问题
    • ¥20 云服务Linux系统TCP-MSS值修改?
    • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
    • ¥20 怎么在stm32门禁成品上增加查询记录功能
    • ¥15 Source insight编写代码后使用CCS5.2版本import之后,代码跳到注释行里面
    • ¥50 NT4.0系统 STOP:0X0000007B
    • ¥15 想问一下stata17中这段代码哪里有问题呀