报的错:
2022-04-08 23:15:14.728 ERROR 91996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method ' throws java.lang.InterruptedException,java.io.IOException' threw exception
springboot配置文件:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/test
spring.rabbitmq.connection-timeout=1000ms
#如果mq接收到了消息,那么会向生产者发送一个回调,通知生产者,消息已经收到了
spring.rabbitmq.publisher-confirm-type=correlated
#如果没有对应的队列来放消息的话,mq会将消息原封不动的返回给生产者
spring.rabbitmq.publisher-returns=true
#如果消息无法正常送达,则直接原封不动返回,未false则抛弃
spring.rabbitmq.template.mandatory=true
#配置·消费者
#manual手动的意思,就是要手动提交消费者ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.prefetch: 2
#不配置spring.rabbitmq.listener.simple.prefetch的时候甚至出现了100个数据被接受几十遍的情况
消费者代码:
@Component
public class RabbitmqConsumerUtil {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = ExchangeAndQueueConfig.EMAIL_QUEUE_USER_REGISTER,durable = "true"),
exchange =@Exchange(value = ExchangeAndQueueConfig.EMAIL_EXCNANGE,durable = "true",type = "direct"),
key = ExchangeAndQueueConfig.REGISTER_USER_EMAIL_BIND_ROUTING
)
)
@RabbitHandler
public void consumerEmail(@Payload Email people,
Channel channel,
@Headers Map<String,Object> headers) throws InterruptedException, IOException {
// 所有的消息处理后必须进行消息的ack,channel.basicAck()
Thread.sleep(10000);
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
System.out.println(people.toString());
channel.basicAck(tag,false);
}
}