先贴一下我的代码,配置类
@Configuration
public class RabbitConfig {
@Autowired
RedisService redisService;
@Bean
public Queue queue() {
return new Queue(MqConstant.QUEUE_NAME);
}
@Bean
public Queue BQueue() {
return new Queue(MqConstant.BQUEUE_NAME);
}
/**
* 消费者数量,默认10
*/
private static final int DEFAULT_CONCURRENT = 10;
/**
* 每个消费者获取最大投递数量,默认50
*/
private static final int DEFAULT_PREFETCH_COUNT = 5;
/**
* 初始化环境参数
*/
private void initEnv() {
SwitchTool.initEnv(redisService);
}
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 初始化环境参数
initEnv();
// 初始化环境参数-end
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT);
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
}
然后是消费者类,消费者1:
@RabbitListener(queues = MqConstant.QUEUE_NAME, containerFactory = "simpleRabbitListenerContainerFactory")
@RabbitHandler
public void process(@Payload MessageBody messageBody,
@Headers Map<String, Object> headers,
Channel channel) throws Exception {.....}
消费者2:
@RabbitListener(queues = MqConstant.BQUEUE_NAME,containerFactory = "simpleRabbitListenerContainerFactory")
@RabbitHandler
public void process(@Payload MessageBody messageBody,
@Headers Map<String, Object> headers,
Channel channel) throws Exception {
log.info("get enter msg:"+messageBody);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
如果我只写消费者1,不写消费者2的话是完全没有问题的,当我写两个消费者的时候,启动就报错了,如下:
2019-09-29 14:35:48.307 WARN 4380 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient.process(com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody,java.util.Map<java.lang.String, java.lang.Object>,com.rabbitmq.client.Channel) throws java.lang.Exception]
Bean [com.chinamobile.cmic.rcsoa.wishsms.component.EnterClient@6fbfd28b]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.chinamobile.cmic.rcsoa.wishsms.dto.MessageBody] for GenericMessage [payload=byte[298], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=legalize.enter.queue, amqp_deliveryTag=2, amqp_consumerQueue=legalize.enter.queue, amqp_redelivered=true, id=f8b6a47d-baa1-eded-60f9-0b83554c56c0, amqp_consumerTag=amq.ctag-QgKSp5nfJeTaRNyWogyBbw, contentType=application/x-java-serialized-object, timestamp=1569738948306}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
... 12 common frames omitted
希望大佬们可以帮我看看这怎么解决。QAQ