duancongjue9202 2018-01-31 13:43
浏览 50
已采纳

在PHPRatchet中接收RabbitMQ消息

I am trying to achieve mechanism where PHP pushes message to RabbitMQ (I don't want RabbitMQ to be directly exposed to user), RabbitMQ connects to RatchetPHP and Ratchet broadcasts it via websocket connections to users.

The issue I have is with accually making Ratchet server to simultanously listen for queue messages and transfer them further. Ratchet documentation assumes using ZeroMQ and after a long search through outdated documentations and libraries which do not have such methods anymore (eg. React\Stomp) I need fresh eyes from someone who has experience with these solutions.

What I have is pusher.php (standard example from RabbitMQ docs):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'
";

$channel->close();
$connection->close();

Just to simplify reproducing scenario I include also Chat class:

use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

class Chat implements MessageComponentInterface
{
    protected $clients;

    public function __construct()
    {
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $connection)
    {
        // Store the new connection to send messages to later
        $this->clients->attach($connection);

        echo "New connection! ({$connection->resourceId})
";
    }

    public function onMessage(ConnectionInterface $from, $msg)
    {
        $numRecv = count($this->clients) - 1;
        echo sprintf('Connection %d sending message "%s" to %d other connection%s'."
"
            , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');

        foreach($this->clients as $client)
        {
            /** @var \SplObjectStorage $client */
            if($from !== $client)
            {
                // The sender is not the receiver, send to each client connected
                $client->send($msg);
            }
        }
    }

    public function onClose(ConnectionInterface $conn)
    {
        // The connection is closed, remove it, as we can no longer send it messages
        $this->clients->detach($conn);

        echo "Connection {$conn->resourceId} has disconnected
";
    }

    public function onError(ConnectionInterface $conn, \Exception $e)
    {
        echo "An error has occurred: {$e->getMessage()}
";

        $conn->close();
    }
}

And Ratchet server.php (standard Ratchet example and RabbitMQ receiver example):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Src\Chat;

// RABBIT_RECEIVER
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "
";

$callback = function ($msg)
{
    echo " [x] Received ", $msg->body, "
";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks))
{
    $channel->wait();
}

$channel->close();
$connection->close();
// RABBIT_RECEIVER END

$server = new \Ratchet\App('sockets.dev');
$server->route('/', new Chat());

$server->run();

Current versions are basically 2 separate mechanisms listening for messages and they work great alone (so no issue there) except that they block each other and do not transfer messages between.

Question is how to make server.php to make RabbitMQ receive message and plug it into running Ratchet server.

  • 写回答

1条回答 默认 最新

  • duanqinjiao5244 2018-02-01 10:08
    关注

    I figured it out so adding answer for posterity. Solution is not fully real time but it is very close, seems to have good performance and is non-blocking for Ratchet websocket server. Solution was in Ratchet custom loop and addPeriodicTimer method.

    So code for server.php should look like this:

    $loop = React\EventLoop\Factory::create();
    $chat = new Chat($loop);
    
    $server = new \Ratchet\App('sockets.dev', 8080, '127.0.0.1', $loop);
    $server->route('/', $chat);
    
    $server->run();
    

    And Chat.php class (only __constructor because rest is the same):

    public function __construct(LoopInterface $loop)
    {
        $this->loop = $loop;
        $this->clients = new \SplObjectStorage();
    
        $this->loop->addPeriodicTimer(0, function ()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
            $channel = $connection->channel();
            $channel->queue_declare('hello', false, false, false, false);
            echo ' [*] Checking for for messages from RabbitMQ', "
    ";
    
            $max_number_messages_to_fetch_per_batch = 10000;
            do
            {
                $message = $channel->basic_get('hello', true);
                if($message)
                {
                    foreach($this->clients as $client)
                    {
                        $client->send($message->body);
                    }
    
                    $max_number_messages_to_fetch_per_batch--;
                }
            }
            while($message && $max_number_messages_to_fetch_per_batch > 0);
    
            $channel->close();
            $connection->close();
        });
    
    }
    

    Initiating Ratchet server will attach periodic event (in example every 0 sec) which will check RabbitMQ for new messages and process them.

    Just for better performance control allowing websocket to take a breath number of messages from RabbitMQ processed in one batch was limited to 10k. Processed messages are removed from queue and next batch is processed in next iteration.

    You can also fine tune frequency of updates with addPeriodicTimer interval parameter. 0 seconds is the closest to real time you will get but it may not be required for your needs and can be changed to higher value.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 高德地图点聚合中Marker的位置无法实时更新
  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错
  • ¥15 Matlab编程问题
  • ¥15 训练的多模态特征融合模型准确度很低怎么办