踏破九重天阙 2019-09-29 14:48 采纳率: 50%
浏览 1930
已结题

spring boot整合rabbitmq ack出现异常

先贴一下我的代码,配置类

@Configuration
public class RabbitConfig {

    @Autowired
    RedisService redisService;

        @Bean
    public Queue queue() {
        return new Queue(MqConstant.QUEUE_NAME);
    }


    @Bean
    public Queue BQueue() {
        return new Queue(MqConstant.BQUEUE_NAME);
    }

    /**
     * 消费者数量,默认10
     */
    private static final int DEFAULT_CONCURRENT = 10;
    /**
     * 每个消费者获取最大投递数量,默认50
     */
    private static final int DEFAULT_PREFETCH_COUNT = 5;


    /**
     * 初始化环境参数
     */
    private void initEnv() {
        SwitchTool.initEnv(redisService);
    }


    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        // 初始化环境参数
        initEnv();
        // 初始化环境参数-end
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT);
        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

然后是消费者类,消费者1:

 @RabbitListener(queues = MqConstant.QUEUE_NAME, containerFactory = "simpleRabbitListenerContainerFactory")
    @RabbitHandler
    public void process(@Payload MessageBody messageBody,
                               @Headers Map<String, Object> headers,
                               Channel channel) throws Exception {.....}

消费者2:

@RabbitListener(queues = MqConstant.BQUEUE_NAME,containerFactory = "simpleRabbitListenerContainerFactory")
    @RabbitHandler
    public void process(@Payload MessageBody messageBody,
                        @Headers Map<String, Object> headers,
                        Channel channel) throws Exception {
        log.info("get enter msg:"+messageBody);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);
    }

如果我只写消费者1,不写消费者2的话是完全没有问题的,当我写两个消费者的时候,启动就报错了,如下:

2019-09-29 14:35:48.307  WARN 4380 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient.process(com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody,java.util.Map<java.lang.String, java.lang.Object>,com.rabbitmq.client.Channel) throws java.lang.Exception]
Bean [com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient@6fbfd28b]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody] for GenericMessage [payload=byte[298], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=legalize.enter.queue, amqp_deliveryTag=2, amqp_consumerQueue=legalize.enter.queue, amqp_redelivered=true, id=f8b6a47d-baa1-eded-60f9-0b83554c56c0, amqp_consumerTag=amq.ctag-QgKSp5nfJeTaRNyWogyBbw, contentType=application/x-java-serialized-object, timestamp=1569738948306}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
    ... 12 common frames omitted

希望大佬们可以帮我看看这怎么解决。QAQ

  • 写回答

1条回答 默认 最新

  • Json-Huang 2019-09-29 21:12
    关注

    看报错信息:Cannot convert from [[B] to [com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody],
    类型可能不对,消费者2中的queues = MqConstant.BQUEUE_NAME改成跟消费者1一样,即queues = MqConstant.QUEUE_NAME

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月2日

悬赏问题

  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办
  • ¥15 kylin启动报错log4j类冲突
  • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大