dongmubi4444 2014-01-07 13:21
浏览 262
已采纳

使用redis实现消息队列时出错,使用BLPOP时出错

I am trying to build a message queue using Redis. Whenever client sends new data, they are added to a list.

Here is the code for it

$client->lPush("my_queue", $data);

Now there is a separate script slave.php which pops the newly arrived data and process it. The code for slave.php

while (true) {
   list($queue, $message)  = $client->brPop(["my_queue"], 0);

    /*
    Logic to process the data
    */
}

I have modified the apache startup script so that slave.php should start & stop with apache. It works well. But after waiting for few minutes the brPop stops listening with the error message like this :

Uncaught exception 'Predis\Connection\ConnectionException' with message 'Error while reading line from the server [tcp://127.0.0.1:6379]' in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php:139
Stack trace:
#0 /var/www/api/lib/predis-0.8/lib/Predis/Connection/StreamConnection.php(205): Predis\Connection\AbstractConnection->onConnectionError('Error while rea...')
#1 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(128): Predis\Connection\StreamConnection->read()
#2 /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php(120): Predis\Connection\AbstractConnection->readResponse(Object(Predis\Command\ListPopLastBlocking))
#3 /var/www/api/lib/predis-0.8/lib/Predis/Client.php(227): Predis\Connection\AbstractConnection->executeCommand(Object(Predis\Command\ListPopLastBlocking))
#4 /var/www/api/lib/slave.php(7): Predis\Client->__call('brPop', Array)
#5 /var/www/api/lib/slave.php(7): Predis\Client->brPop(Array, 0)
#6 {main}
 thrown in /var/www/api/lib/predis-0.8/lib/Predis/Connection/AbstractConnection.php on line 139

According to the documentation, if list is empty then, BLPOP/BRPOP blocks the connection until another client performs an LPUSH or RPUSH operation against one of the keys. But this is not happening in my case. In my case once the brpop blocks the connection, it doesn't listen again even when new data arrives in the list.

What changes I should make to get this working?

  • 写回答

2条回答 默认 最新

  • doufu6423 2014-01-08 06:20
    关注

    Its working for me now but I am not sure whether it is the right method to do this. Now I am catching the error and recursively calling the function in case of connection failure. My new slave.php looks like this :

    function process_data()
    {
        try {
            $client = new \Predis\Client();
    
            require_once("logger.php");
    
            while (true) {
                list($queue, $message) = $client->brPop(["bookmark_queue"], 0);
                // logic
            }
        } catch (Exception $ex) {
            $error = $ex->getMessage();
            log_error($error, "slave.php");
            process_data(); // call the function recursively if connection fails
        }
    }
    process_data(); // call the function
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?