官网推荐使用BlockingQueue实现阻塞,仿照官网例子,运行有异常。
final BlockingQueue<String> response = new ArrayBlockingQueue(1);
String uuid = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,
true, false, false, null);
channel.basicPublish("", "",
new AMQP.BasicProperties().builder()
.replyTo(REPLY_QUEUE_NAME)
.clusterId(uuid)
.build(),
message.getBytes());
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.err.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
if (properties.getClusterId().equals(uuid)) {
response.offer(new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicReject(envelope.getDeliveryTag(), true);
}
}
};
channel.basicConsume(REPLY_QUEUE_NAME, false, consumer); //异常
System.err.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
return response.take();
异常堆栈:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:540)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:494)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:472)
求教:
rabbitmq3.7版本的rpc模式如何实现