先启动消费者不会报错,在启动生产者就会报错;或者先启动生产者不会报错,在启动消费者就会报错;报错相同:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'direct_logs1' in vhost '/': received 'false' but current is 'true', class-id=40, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:646)
at java.base/java.lang.Thread.run(Thread.java:834)
生产者:
public class EmitLogDirect1 {
private static final String EXCHANGE_NAME = "direct_logs1";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
} else {
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
//未确认的消息
ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
{
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
/**
* 生成一个队列
* 0.交换器的名称
* 1.交换器的类型, 常见类型有fanout, direct, topic, headers
* 2.设置是否持久化, true表示持久化, 反之是非持久化, 持久化可以将交换器存盘, 在服务器重启的时候不会丢失相关信息
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费;设置是否自动删除,
* true表示自动删除, 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个
* 交换器绑定的队列或交换器都于此解绑, 注意不能错误的把这个参数理解为"当与此交换器连接的客户
* 端都断开时, RabbitMQ会自动删除本交换器"
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug", "调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry :
bindingKeyMap.entrySet()) {
String bindingKey =
bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性让消息实现持久化需要在消息生产者修改代码
* 4.发送消息的消息体
*/
channel.basicPublish(EXCHANGE_NAME, bindingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
}
消费者:
public class ReceiveLogsDirect022 {
private static final String EXCHANGE_NAME = "direct_logs1";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console1";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消 息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}