top啦它 2022-04-08 23:48 采纳率: 70%
浏览 191
已结题

RabbitMq消费者多次消费同一条数据

报的错:

2022-04-08 23:15:14.728 ERROR 91996 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method ' throws java.lang.InterruptedException,java.io.IOException' threw exception


springboot配置文件:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/test
spring.rabbitmq.connection-timeout=1000ms
#如果mq接收到了消息,那么会向生产者发送一个回调,通知生产者,消息已经收到了
spring.rabbitmq.publisher-confirm-type=correlated
#如果没有对应的队列来放消息的话,mq会将消息原封不动的返回给生产者
spring.rabbitmq.publisher-returns=true
#如果消息无法正常送达,则直接原封不动返回,未false则抛弃
spring.rabbitmq.template.mandatory=true


#配置·消费者
#manual手动的意思,就是要手动提交消费者ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.prefetch: 2
#不配置spring.rabbitmq.listener.simple.prefetch的时候甚至出现了100个数据被接受几十遍的情况

消费者代码:

@Component
public class RabbitmqConsumerUtil {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = ExchangeAndQueueConfig.EMAIL_QUEUE_USER_REGISTER,durable = "true"),
                    exchange =@Exchange(value = ExchangeAndQueueConfig.EMAIL_EXCNANGE,durable = "true",type = "direct"),
                    key = ExchangeAndQueueConfig.REGISTER_USER_EMAIL_BIND_ROUTING
            )
    )
    @RabbitHandler
    public void consumerEmail(@Payload Email people,
                              Channel channel,
                              @Headers Map<String,Object> headers) throws InterruptedException, IOException {
//        所有的消息处理后必须进行消息的ack,channel.basicAck()
        Thread.sleep(10000);
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
        System.out.println(people.toString());
        channel.basicAck(tag,false);
    }
}

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 4月16日
    • 创建了问题 4月8日

    悬赏问题

    • ¥15 有没有人知道这个问题怎么解决
    • ¥15 comsol电力电缆载流量仿真
    • ¥15 webSocket可以接TCP socket接口吗
    • ¥60 mpi并行出错,CFD++计算
    • ¥15 c#:vsto,powerpoint的外接程序中换主题颜色
    • ¥15 状态机/汽车转向灯/Sateflow
    • ¥15 这个有点复杂 有没有人看看
    • ¥15 用python如何确定子孙元素在父元素中的位置
    • ¥15 obj文件滤除异常高程
    • ¥15 用mathematicas或者matlab计算三重积分