wddbq 2019-04-24 23:03
浏览 2518

rabbitmq rpc模式下 client如何实现(QueueingConsumer已废弃)

官网推荐使用BlockingQueue实现阻塞,仿照官网例子,运行有异常。

final BlockingQueue<String> response = new ArrayBlockingQueue(1);

        String uuid = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,
                true, false, false, null);
        channel.basicPublish("", "",
                new AMQP.BasicProperties().builder()
                        .replyTo(REPLY_QUEUE_NAME)
                        .clusterId(uuid)
                        .build(),
                message.getBytes());
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.err.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
                if (properties.getClusterId().equals(uuid)) {
                    response.offer(new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        };

        channel.basicConsume(REPLY_QUEUE_NAME, false, consumer); //异常

        System.err.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
        return response.take();

异常堆栈:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:540)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:494)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:472)

求教:
rabbitmq3.7版本的rpc模式如何实现

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥20 sub地址DHCP问题
    • ¥15 delta降尺度计算的一些细节,有偿
    • ¥15 Arduino红外遥控代码有问题
    • ¥15 数值计算离散正交多项式
    • ¥30 数值计算均差系数编程
    • ¥15 redis-full-check比较 两个集群的数据出错
    • ¥15 Matlab编程问题
    • ¥15 训练的多模态特征融合模型准确度很低怎么办
    • ¥15 kylin启动报错log4j类冲突
    • ¥15 超声波模块测距控制点灯,灯的闪烁很不稳定,经过调试发现测的距离偏大