dongsu4345 2013-07-15 12:39
浏览 206
已采纳

消费者不承认来自RabbitMq的消息

I have create a simple publisher and a consumer which subscribes on the queue using basic.consume.

My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.

How do I need to approach this use case?

Setup code

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

Consumer code

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

Producer code

$exchange->publish('message');
  • 写回答

2条回答 默认 最新

  • dqdes60666 2013-07-15 14:57
    关注

    If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

    UPD:

    You have to nack message with redelivery flag in your catch block

        try {
            //Do some business logic
        } catch (Exception $ex) {
            //Log exception
            return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
        }
    

    Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

    If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

    There are multiple techniques to deal with cycle redeliver problem:

    1. Rely on Dead Letter Exchanges

    • pros: reliable, standard, clear
    • cons: require additional logic

    2. Use per message or per queue TTL

    • pros: easy to implement, also standard, clear
    • cons: with long queues you may loose some message

    Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

    2.1 Per message ttl:

    $queue = new AMQPQueue($channel);
    $queue->setName('my-queue');
    $queue->declareQueue();
    $queue->bind('my-exchange');
    
    $exchange->publish(
        'message at ' . microtime(true),
        null,
        AMQP_NOPARAM,
        array(
            'expiration' => '1000'
        )
    );
    

    2.2. Per queue ttl:

    $queue = new AMQPQueue($channel);
    $queue->setName('my-queue');
    $queue->setArgument('x-message-ttl', 1000);
    $queue->declareQueue();
    $queue->bind('my-exchange');
    
    $exchange->publish('message at ' . microtime(true));
    

    3. Hold redelivers count or left redelivers number (aka hop limit or ttl in IP stack) in message body or headers

    • pros: give you extra control on messages lifetime on application level
    • cons: significant overhead while you have to modify message and publish it again, application specific, not clear

    Code:

    $queue = new AMQPQueue($channel);
    $queue->setName('my-queue');
    $queue->declareQueue();
    $queue->bind('my-exchange');
    
    $exchange->publish(
        'message at ' . microtime(true),
        null,
        AMQP_NOPARAM,
        array(
            'headers' => array(
                'ttl' => 100
            )
        )
    );
    
    $queue->consume(
        function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
            $headers = $msg->getHeaders();
            echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
            echo $msg->getDeliveryTag(), ' ';
            echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
            echo $msg->getBody(), PHP_EOL;
    
            try {
                //Do some business logic
                throw new Exception('business logic failed');
            } catch (Exception $ex) {
                //Log exception
                if (isset($headers['ttl'])) {
                    // with ttl logic
    
                    if ($headers['ttl'] > 0) {
                        $headers['ttl']--;
    
                        $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                    }
    
                    return $queue->ack($msg->getDeliveryTag());
                } else {
                    // without ttl logic
                    return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
                }
    
            }
    
            return $queue->ack($msg->getDeliveryTag());
        }
    );
    

    There are may be some other ways to better control message redelivers flow.

    Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 HFSS 中的 H 场图与 MATLAB 中绘制的 B1 场 部分对应不上
  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?