top啦它 2022-04-08 23:48 采纳率: 70%
浏览 198
已结题

RabbitMq消费者多次消费同一条数据

报的错:

2022-04-08 23:15:14.728 ERROR 91996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method ' throws java.lang.InterruptedException,java.io.IOException' threw exception


springboot配置文件:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/test
spring.rabbitmq.connection-timeout=1000ms
#如果mq接收到了消息,那么会向生产者发送一个回调,通知生产者,消息已经收到了
spring.rabbitmq.publisher-confirm-type=correlated
#如果没有对应的队列来放消息的话,mq会将消息原封不动的返回给生产者
spring.rabbitmq.publisher-returns=true
#如果消息无法正常送达,则直接原封不动返回,未false则抛弃
spring.rabbitmq.template.mandatory=true


#配置·消费者
#manual手动的意思,就是要手动提交消费者ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.prefetch: 2
#不配置spring.rabbitmq.listener.simple.prefetch的时候甚至出现了100个数据被接受几十遍的情况

消费者代码:

@Component
public class RabbitmqConsumerUtil {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = ExchangeAndQueueConfig.EMAIL_QUEUE_USER_REGISTER,durable = "true"),
                    exchange =@Exchange(value = ExchangeAndQueueConfig.EMAIL_EXCNANGE,durable = "true",type = "direct"),
                    key = ExchangeAndQueueConfig.REGISTER_USER_EMAIL_BIND_ROUTING
            )
    )
    @RabbitHandler
    public void consumerEmail(@Payload Email people,
                              Channel channel,
                              @Headers Map<String,Object> headers) throws InterruptedException, IOException {
//        所有的消息处理后必须进行消息的ack,channel.basicAck()
        Thread.sleep(10000);
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
        System.out.println(people.toString());
        channel.basicAck(tag,false);
    }
}

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 4月16日
    • 创建了问题 4月8日

    悬赏问题

    • ¥15 网络科学导论,网络控制
    • ¥15 metadata提取的PDF元数据,如何转换为一个Excel
    • ¥15 关于arduino编程toCharArray()函数的使用
    • ¥100 vc++混合CEF采用CLR方式编译报错
    • ¥15 coze 的插件输入飞书多维表格 app_token 后一直显示错误,如何解决?
    • ¥15 vite+vue3+plyr播放本地public文件夹下视频无法加载
    • ¥15 c#逐行读取txt文本,但是每一行里面数据之间空格数量不同
    • ¥50 如何openEuler 22.03上安装配置drbd
    • ¥20 ING91680C BLE5.3 芯片怎么实现串口收发数据
    • ¥15 无线连接树莓派,无法执行update,如何解决?(相关搜索:软件下载)