姚高鹏 2022-02-18 10:32 采纳率: 25%
浏览 35

rabbitmq使用报错

先启动消费者不会报错,在启动生产者就会报错;或者先启动生产者不会报错,在启动消费者就会报错;报错相同:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'direct_logs1' in vhost '/': received 'false' but current is 'true', class-id=40, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:646)
    at java.base/java.lang.Thread.run(Thread.java:834)


生产者:

public class EmitLogDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs1";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            //开启发布确认
            channel.confirmSelect();
            /**
             * 线程安全有序的一个哈希表,适用于高并发的情况
             * 1.轻松的将序号与消息进行关联
             * 2.轻松批量删除条目 只要给到序列号
             * 3.支持并发访问
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                    ConcurrentSkipListMap<>();
            /**
             * 确认收到消息的一个回调
             * 1.消息序列号
             * 2.true 可以确认小于等于当前序列号的消息
             * false 确认当前序列号消息
             */
            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小于等于当前序列号的未确认消息 是一个 map
                    ConcurrentNavigableMap<Long, String> confirmed =
                            outstandingConfirms.headMap(sequenceNumber, true);
                    //清除该部分未确认消息
                    confirmed.clear();
                } else {
                    //只清除当前序列号的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            //未确认的消息
            ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
            {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
            };
            /**
             * 添加一个异步确认的监听器
             * 1.确认收到消息的回调
             * 2.未收到消息的回调
             */
            channel.addConfirmListener(ackCallback, nackCallback);
            /**
             * 生成一个队列
             * 0.交换器的名称
             * 1.交换器的类型, 常见类型有fanout, direct, topic, headers
             * 2.设置是否持久化, true表示持久化, 反之是非持久化, 持久化可以将交换器存盘, 在服务器重启的时候不会丢失相关信息
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费;设置是否自动删除,
             *      true表示自动删除, 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个
             *      交换器绑定的队列或交换器都于此解绑, 注意不能错误的把这个参数理解为"当与此交换器连接的客户
             *      端都断开时, RabbitMQ会自动删除本交换器"
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
            //创建多个 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "普通 info 信息");
            bindingKeyMap.put("warning", "警告 warning 信息");
            bindingKeyMap.put("error", "错误 error 信息");
            //debug 没有消费这接收这个消息 所有就丢失了
            bindingKeyMap.put("debug", "调试 debug 信息");
            for (Map.Entry<String, String> bindingKeyEntry :
                    bindingKeyMap.entrySet()) {
                String bindingKey =
                        bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                /**
                 * channel.getNextPublishSeqNo()获取下一个消息的序列号
                 * 通过序列号与消息体进行一个关联
                 * 全部都是未确认的消息体
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                /**
                 * 发送一个消息
                 * 1.发送到那个交换机
                 * 2.路由的 key 是哪个
                 * 3.其他的参数信息MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性让消息实现持久化需要在消息生产者修改代码
                 * 4.发送消息的消息体
                 */
                channel.basicPublish(EXCHANGE_NAME, bindingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
}


消费者:

public class ReceiveLogsDirect022 {
    private static final String EXCHANGE_NAME = "direct_logs1";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "console1";
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) ->
        {String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消 息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

  • 写回答

2条回答 默认 最新

  • 「已注销」 2022-02-18 14:36
    关注

    又一个出现这种错误的

    评论

报告相同问题?

问题事件

  • 创建了问题 2月18日

悬赏问题

  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘