weixin_41427352 2023-01-31 08:39 采纳率: 75%
浏览 36

php如何并发消费mq中大量请求?

有个php项目要增加高并发的能力
看了网上的教学,说是用队列,目前想用队列,把请求放到队列里
理论上能明白,但是操作上有个问题,比如说同时有100万个订单同时产生,如何能同时处理10万个订单?目前用的thinkPHP-queue,也只能加几个进程去处理而已。
  • 写回答

1条回答 默认 最新

  • m0_54204465 2023-01-31 09:32
    关注

    增加消费者数量:消费者越多,能够同时处理的请求数量就越多。创建了10个消费者线程,同时从RabbitMQ队列中读取请求:

    <?php
    //连接到RabbitMQ服务器
    $connection = new AMQPConnection(array('host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'username' => 'guest', 'password' => 'guest'));
    $connection->connect();
    
    //创建通道
    $channel = new AMQPChannel($connection);
    
    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName('requests');
    
    //创建10个消费者线程
    for ($i = 0; $i < 10; $i++) {
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            // parent
        } else {
            // child
            $queue->consume(function ($envelope, $queue) {
                $request = $envelope->getBody();
                //处理请求
                //...
            });
            exit();
        }
    }
    
    //等待所有消费者线程结束
    for ($i = 0; $i < 10; $i++) {
        $status = 0;
        $pid = pcntl_wait($status);
    }
    
    //关闭连接
    $connection->disconnect();
    
    

    使用异步消息处理:可以使用异步消息处理,在处理消息时不阻塞,而是立即返回。使用 RabbitMQ 的消息队列:

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    // 建立 RabbitMQ 连接
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    // 创建队列
    $channel->queue_declare('async_queue', false, false, false, false);
    
    // 发送消息
    $message = new AMQPMessage('Hello World!');
    $channel->basic_publish($message, '', 'async_queue');
    
    // 异步消费消息
    $callback = function($message) {
        echo " [x] Received ", $message->body, "\n";
    };
    
    $channel->basic_consume('async_queue', '', false, true, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    // 关闭连接
    $channel->close();
    $connection->close();
    
    

    建立了 RabbitMQ 连接,然后创建了一个队列并发送了消息,然后消费者异步地处理消息,不会阻塞,而是立即返回,最后关闭连接。

    分割请求:将大量请求拆分为小请求,以提高处理效率。

    // 分割请求的代码示例
    $requests = getRequests(); // 获取所有请求
    $chunkSize = 1000; // 将请求拆分为1000个一组
    $chunks = array_chunk($requests, $chunkSize);
    foreach ($chunks as $chunk) {
        processRequests($chunk); // 并发处理每一组请求
    }
    
    

    缓存队列中的请求:通过缓存队列中的请求,以提高处理效率。使用memcached或redis作为缓存:

    // 使用memcached存储请求队列
    $mem = new Memcached();
    $mem->addServer("localhost", 11211);
    $queue_key = 'request_queue';
    
    // 向队列中添加请求
    $mem->append($queue_key, $request);
    
    // 从队列中获取请求
    $requests = $mem->get($queue_key);
    
    // 处理请求
    foreach ($requests as $request) {
        // process request
    }
    
    // 清空队列
    $mem->delete($queue_key);
    
    
    // 使用redis存储请求队列
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);
    $queue_key = 'request_queue';
    
    // 向队列中添加请求
    $redis->lpush($queue_key, $request);
    
    // 从队列中获取请求
    $requests = $redis->lrange($queue_key, 0, -1);
    
    // 处理请求
    foreach ($requests as $request) {
        // process request
    }
    
    // 清空队列
    $redis->del($queue_key);
    
    
    评论

报告相同问题?

问题事件

  • 创建了问题 1月31日

悬赏问题

  • ¥15 python怎么在已有视频文件后添加新帧
  • ¥20 虚幻UE引擎如何让多个同一个蓝图的NPC执行一样的动画,
  • ¥15 fluent里模拟降膜反应的UDF编写
  • ¥15 MYSQL 多表拼接link
  • ¥15 关于某款2.13寸墨水屏的问题
  • ¥15 obsidian的中文层级自动编号
  • ¥15 同一个网口一个电脑连接有网,另一个电脑连接没网
  • ¥15 神经网络模型一直不能上GPU
  • ¥15 pyqt怎么把滑块和输入框相互绑定,求解决!
  • ¥20 wpf datagrid单元闪烁效果失灵