weixin_43857493
踏破九重天阙
2019-09-29 14:48
采纳率: 54.5%
浏览 1.6k
已采纳

spring boot整合rabbitmq ack出现异常

先贴一下我的代码,配置类

@Configuration
public class RabbitConfig {

    @Autowired
    RedisService redisService;

        @Bean
    public Queue queue() {
        return new Queue(MqConstant.QUEUE_NAME);
    }


    @Bean
    public Queue BQueue() {
        return new Queue(MqConstant.BQUEUE_NAME);
    }

    /**
     * 消费者数量,默认10
     */
    private static final int DEFAULT_CONCURRENT = 10;
    /**
     * 每个消费者获取最大投递数量,默认50
     */
    private static final int DEFAULT_PREFETCH_COUNT = 5;


    /**
     * 初始化环境参数
     */
    private void initEnv() {
        SwitchTool.initEnv(redisService);
    }


    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        // 初始化环境参数
        initEnv();
        // 初始化环境参数-end
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT);
        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

然后是消费者类,消费者1:

 @RabbitListener(queues = MqConstant.QUEUE_NAME, containerFactory = "simpleRabbitListenerContainerFactory")
    @RabbitHandler
    public void process(@Payload MessageBody messageBody,
                               @Headers Map<String, Object> headers,
                               Channel channel) throws Exception {.....}

消费者2:

@RabbitListener(queues = MqConstant.BQUEUE_NAME,containerFactory = "simpleRabbitListenerContainerFactory")
    @RabbitHandler
    public void process(@Payload MessageBody messageBody,
                        @Headers Map<String, Object> headers,
                        Channel channel) throws Exception {
        log.info("get enter msg:"+messageBody);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag,false);
    }

如果我只写消费者1,不写消费者2的话是完全没有问题的,当我写两个消费者的时候,启动就报错了,如下:

2019-09-29 14:35:48.307  WARN 4380 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient.process(com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody,java.util.Map<java.lang.String, java.lang.Object>,com.rabbitmq.client.Channel) throws java.lang.Exception]
Bean [com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient@6fbfd28b]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody] for GenericMessage [payload=byte[298], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=legalize.enter.queue, amqp_deliveryTag=2, amqp_consumerQueue=legalize.enter.queue, amqp_redelivered=true, id=f8b6a47d-baa1-eded-60f9-0b83554c56c0, amqp_consumerTag=amq.ctag-QgKSp5nfJeTaRNyWogyBbw, contentType=application/x-java-serialized-object, timestamp=1569738948306}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
    ... 12 common frames omitted

希望大佬们可以帮我看看这怎么解决。QAQ

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

1条回答 默认 最新

  • hjs218
    Json-Huang 2019-09-29 21:12
    已采纳

    看报错信息:Cannot convert from [[B] to [com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody],
    类型可能不对,消费者2中的queues = MqConstant.BQUEUE_NAME改成跟消费者1一样,即queues = MqConstant.QUEUE_NAME

    点赞 评论

相关推荐