donte1234567 2019-03-22 10:40
浏览 101

RabbitMQ存储消息进行交换

I'm trying to figure out whether is possible to store messages in a RabbitMQ exchange even when there's no consumer running.

I understood (probably incorrectly) that to achieve that the exchange needs to be "durable" as well as the queue and the message needs to be sent out with the "persistent" flag

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

My main goal is to store all the messages in the exchange so that, in the case that for whatever reason no consumer is running, when I launch one all the messages in the exchange can get directed to the bonded queue. I'm declaring my exchanges and queue as follows:

//Sender.php
public function sendToQueue(ActionMessage $message)
    {
        $headers = [
            'content-type' => 'application/json',
            'timestamp' => $message->getCreatedAt()->getTimestamp(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        $channel = $this->connection->getChannel();
        $channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
        $qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
        $channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
        return true;
    }
//Receiver.php
public function consume($callbackFunction)
        {
            $channel = $this->messenger->getChannel();
            $channel->exchange_declare($this->exchange, 'direct', false, true, false);
            list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
            $channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

            $channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

            while (count($channel->callbacks)) {
                $channel->wait();
            }

            $channel->close();
            $this->messenger->close();
        }

I'll appreciate any help (even just to discard the idea and insert some storage in between). Thanks.

  • 写回答

1条回答 默认 最新

  • duanpo2037 2019-03-26 13:43
    关注

    An exchange does not store messages, that is the job of a queue. The problem you're having is not that no consumers are running, but that no queues exist, because you have left the consumers to declare their own queues.

    If you want the messages to be persistent until a consumer picks them up, you should declare:

    • An exchange that the "Sender" will publish to
    • A named queue attached to that exchange for each type of message that will be consumed separately (one for each routing key, if using a direct exchange)

    These can both be declared in the Sender script, but in most cases it makes more sense to declare them once when the application is deployed, treating them like you would a database schema.

    Instead of creating an anonymous queue in the Receiver script, you can then just attach to the named queue, and start receiving the messages waiting there.

    The main difference this will make is how multiple consumers for the same routing key will interact:

    • Multiple queues attached to a single exchange, as in your existing code, create multiple copies of each message. This is useful if you have different consumers doing different things with the same messages.
    • Multiple consumers attached to a single queue, as I've suggested above, will share out the messages, with each being processed by a different consumer essentially at random. This is useful it you have multiple identical consumers to deal with a large number of messages.

    You might find this RabbitMQ simulator useful to visualise the difference.

    You might find you actually want a mixture:

    • Pre-declare a queue for each consumer which must see every message, to ensure a copy of each message is stored until that particular consumer is ready to read it.
    • Declare additional temporary queues in additional consumers to pick up an extra copy of messages as they come in.

    As a final note, there are two mechanisms in RabbitMQ for falling back to different processing for messages that can't be processed:

    • An Alternate Exchange captures messages that would be discarded from an exchange (because there is no appropriate queue bound).
    • A Dead Letter Exchange captures messages that would be discarded from a queue (e.g. because it was rejected by a consumer, or reached a configured timeout).

    An AE might be useful in your example if you don't actually want to process the missed messages normally, you just want to detect them, e.g. listing them in an error log.

    评论

报告相同问题?

悬赏问题

  • ¥15 有了解d3和topogram.js库的吗?有偿请教
  • ¥100 任意维数的K均值聚类
  • ¥15 stamps做sbas-insar,时序沉降图怎么画
  • ¥15 unity第一人称射击小游戏,有demo,在原脚本的基础上进行修改以达到要求
  • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
  • ¥15 关于#Java#的问题,如何解决?
  • ¥15 加热介质是液体,换热器壳侧导热系数和总的导热系数怎么算
  • ¥100 嵌入式系统基于PIC16F882和热敏电阻的数字温度计
  • ¥15 cmd cl 0x000007b
  • ¥20 BAPI_PR_CHANGE how to add account assignment information for service line