top啦它 2022-04-08 23:48 采纳率: 70%
浏览 196
已结题

RabbitMq消费者多次消费同一条数据

报的错:

2022-04-08 23:15:14.728 ERROR 91996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method ' throws java.lang.InterruptedException,java.io.IOException' threw exception


springboot配置文件:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/test
spring.rabbitmq.connection-timeout=1000ms
#如果mq接收到了消息,那么会向生产者发送一个回调,通知生产者,消息已经收到了
spring.rabbitmq.publisher-confirm-type=correlated
#如果没有对应的队列来放消息的话,mq会将消息原封不动的返回给生产者
spring.rabbitmq.publisher-returns=true
#如果消息无法正常送达,则直接原封不动返回,未false则抛弃
spring.rabbitmq.template.mandatory=true


#配置·消费者
#manual手动的意思,就是要手动提交消费者ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.prefetch: 2
#不配置spring.rabbitmq.listener.simple.prefetch的时候甚至出现了100个数据被接受几十遍的情况

消费者代码:

@Component
public class RabbitmqConsumerUtil {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = ExchangeAndQueueConfig.EMAIL_QUEUE_USER_REGISTER,durable = "true"),
                    exchange =@Exchange(value = ExchangeAndQueueConfig.EMAIL_EXCNANGE,durable = "true",type = "direct"),
                    key = ExchangeAndQueueConfig.REGISTER_USER_EMAIL_BIND_ROUTING
            )
    )
    @RabbitHandler
    public void consumerEmail(@Payload Email people,
                              Channel channel,
                              @Headers Map<String,Object> headers) throws InterruptedException, IOException {
//        所有的消息处理后必须进行消息的ack,channel.basicAck()
        Thread.sleep(10000);
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
        System.out.println(people.toString());
        channel.basicAck(tag,false);
    }
}

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 4月16日
    • 创建了问题 4月8日

    悬赏问题

    • ¥15 基于卷积神经网络的声纹识别
    • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
    • ¥100 为什么这个恒流源电路不能恒流?
    • ¥15 有偿求跨组件数据流路径图
    • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
    • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
    • ¥15 CSAPPattacklab
    • ¥15 一直显示正在等待HID—ISP
    • ¥15 Python turtle 画图
    • ¥15 stm32开发clion时遇到的编译问题