lin185 2020-10-15 16:35 采纳率: 0%
浏览 109

用springboot+rabbit在做队列测试时,为什么配置调成手动ack时,消息会被ack两次?

看不懂源码,但是打断点发现好像是ack方法执行了两次,不知道为什么会执行两次ack方法
图片说明

这是配置代码

#rabbit配置
spring:
  rabbitmq:
    # rabbit地址
    host: 127.0.0.1
    # rabbit端口号
    port: 5672
    # 用户账号和密码
    username: guest
    password: guest
    #rabbit项目名,每个virtualHost的队列是隔离的,相当于数据库
    virtual-host: /rabbit
    #开启Publisher Confirms 模式,消息发送到交换器后触发回调。
    #publisher-confirms: true
    #开启PublisherReturn 模式,交换机将消息发送到对应队列失败时触发
    #publisher-returns: true
    listener:
      direct:
        #设置监听为手动答应模式
        acknowledge-mode: manual

这是绑定交换机代码

@Configuration
public class RabbitConfig {

    /**
     * 将队列绑定到交换机中,并绑定路由
     * @return
     */
    @Bean
    public Binding bindingExchange() {
        return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitEnum.RABBIT.getRoutingKey());
    }

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(RabbitEnum.RABBIT.getQueue(),true);
    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitEnum.RABBIT.getDirectExchange(), true, false);
    }
}

这是生产者发布消息和消费者消费消息的代码

@Service
public class PushService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 推送方法
     */
    public void push(){
        System.out.println("开始推送");
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend(RabbitEnum.RABBIT.getDirectExchange(), RabbitEnum.RABBIT.getRoutingKey(),"helloWord", new CorrelationData("1"));
        System.out.println("结束推送");
    }

    /**
     * 推送队列的监听
     * 注解中为监听的队列
     * @param
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = "rabbit.direct.queue.push")
    public void processDirect(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("收到消息");
        System.out.println(str);
        channel.basicAck(tag,true);
    }
}

这是消费完成后报的异常

ERROR 12656 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
  • 写回答

1条回答 默认 最新

  • Java毕设王 2023-06-19 10:08
    关注

    在使用 Spring Boot 和 RabbitMQ 进行队列测试时,当将消息确认模式配置为手动 ACK(acknowledgment)时,消息被 ACK 两次的可能原因如下:

    消息重复消费:当消息处理过程中发生异常或处理时间过长时,消费者可能会重新消费相同的消息。这可能导致消息在处理完成后再次确认,从而导致消息被 ACK 两次。在确认之前,应确保消息处理的幂等性,以避免重复处理相同的消息。

    消费者重启:如果消费者应用程序在消息处理期间意外终止并重新启动,尚未确认的消息可能会被重新分发给新的消费者实例。新的消费者实例会对之前未确认的消息进行确认,导致消息被 ACK 两次。

    多个消费者实例:如果存在多个消费者实例监听同一个队列,并且配置了手动 ACK,那么当消息被投递到队列时,每个消费者实例都有机会接收并处理该消息。因此,如果多个消费者实例都对消息进行了确认,那么消息就会被 ACK 多次。

    解决此问题的方法包括:

    确保消息处理的幂等性:在消费者应用程序中实现幂等性,即无论收到相同的消息多少次,最终的结果应该是一致的。这样即使消息被重复处理或重新消费,也不会产生错误的影响。

    避免消费者重启时的重复处理:可以在消费者应用程序中记录已经处理的消息的唯一标识,例如消息的 ID 或业务相关的唯一标识。在重新启动时,检查已经处理过的消息,并跳过重复的消息。

    确保只有一个消费者实例消费消息:在 RabbitMQ 配置中,可以使用队列的 Exclusive 模式确保只有一个消费者实例监听队列。这样可以避免多个消费者实例同时处理同一消息。

    评论

报告相同问题?

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器