dongxin991209 2017-09-06 08:41
浏览 140
已采纳

PHP popen进程限制?

I'm trying to extract a time consuming task to a separate process. Unfortunately, multi-threading does not really seem to be an option with PHP, but you can create new php processes, using popen.

The use case is this: there is a cronjob that runs every minute, which checks if there are any email campaigns that need to be sent. There could be multiple campaigns that need to be sent at the exact same time, but as of now it just picks up one campaign every minute. I would like to extract sending of the campaigns to a separate process, so that I can send multiple campaigns at the same time.

The code looks something like this (note that this is just a proof of concept):

crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1

maintask.php

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo "Spawn process {$processName}" . PHP_EOL;

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
    stream_set_blocking($process, false);
}

subtask.php

$process = $_GET['process_name'];

echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process  {$process}" . PHP_EOL;

Now, the problem I'm having is that popen will only spawn 2 processes at any time, while I'm trying to spawn 4. I can not figure out why. There doesn't appear to be any limit documented. Perhaps this is limited by the amount of cores I have available?

  • 写回答

1条回答 默认 最新

  • doulun9534 2017-09-07 09:39
    关注

    I modified subtask.php so you can see when each task starts, ends and how long it is intending to wait. now you can see when a process starts/stops you can reduce the sleep times - don't need to use ps -aux to show when processes are running

    subtask.php

    <?php
    $process = $argv[1];
    
    $sleepTime = rand(1, 10);
    echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
    sleep($sleepTime);
    echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;
    

    I've added the Class into the maintask.php code so you can test it... the fun starts when you queue() more entries than you have set maxProcesses (try 32)
    NOTE: the results will come back in the order they complete

    maintask.php

    <?php
    class ParallelProcess
    {
        private $maxProcesses = 16; // maximum processes
        private $arrProcessQueue = [];
        private $arrCommandQueue = [];
    
        private function __construct()
        {
        }
    
        private function __clone()
        {
        }
    
        /**
         *
         * @return \static
         */
        public static function create()
        {
            $result = new static();
            return $result;
        }
    
        /**
         *
         * @param int $maxProcesses
         * @return \static
         */
        public static function load($maxProcesses = 16)
        {
            $result = self::create();
            $result->setMaxProcesses($maxProcesses);
            return $result;
        }
    
        /**
         * get maximum processes
         *
         * @return int
         */
        public function getMaxProcesses()
        {
            return $this->maxProcesses;
        }
    
        /**
         * set maximum processes
         *
         * @param int $maxProcesses
         * @return $this
         */
        public function setMaxProcesses($maxProcesses)
        {
            $this->maxProcesses = $maxProcesses;
            return $this;
        }
    
        /**
         * number of entries in the process queue
         *
         * @return int
         */
        public function processQueueLength()
        {
            $result = count($this->arrProcessQueue);
            return $result;
        }
    
        /**
         * number of entries in the command queue
         *
         * @return int
         */
        public function commandQueueLength()
        {
            $result = count($this->arrCommandQueue);
            return $result;
        }
    
    
        /**
         * process open
         *
         * @staticvar array $arrDescriptorspec
         * @param string $strCommand
         * @return $this
         * @throws \Exception
         */
        private function p_open($strCommand)
        {
            static $arrDescriptorSpec = array(
                0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
                1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
                2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
            );
    
            $arrPipes = array();
            if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
                throw new \Exception("error: proc_open() failed!");
            }
    
            $resStream = &$arrPipes[1];
    
            if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
                throw new \Exception("error: stream_set_blocking() failed!");
            }
    
            $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
            return $this;
        }
    
        /**
         * execute any queued commands
         *
         * @return $this
         */
        private function executeCommand()
        {
            while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
                $strCommand = array_shift($this->arrCommandQueue);
                $this->p_open($strCommand);
            }
            return $this;
        }
    
        /**
         * process close
         *
         * @param array $arrQueueEntry
         * @return $this
         */
        private function p_close(array $arrQueueEntry)
        {
            $resProcess = $arrQueueEntry[1];
            $resStream = $arrQueueEntry[2];
    
            fclose($resStream);
    
            $this->returnValue = proc_close($resProcess);
    
            $this->executeCommand();
            return $this;
        }
    
        /**
         * queue command
         *
         * @param string $strCommand
         * @return $this
         */
        public function queue($strCommand) {
            // put the command on the $arrCommandQueue
            $this->arrCommandQueue[] = $strCommand;
            $this->executeCommand();
            return $this;
        }
    
        /**
         * read from stream
         *
         * @param resource $resStream
         * @return string
         */
        private static function readStream($resStream)
        {
            $result = '';
            while (($line = fgets($resStream)) !== false) {
                $result .= $line;
            }
            return $result;
        }
    
        /**
         * read a result from the process queue
         *
         * @return string|false
         */
        private function readProcessQueue()
        {
            $result = false;
            reset($this->arrProcessQueue);
            while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
                $arrStatus = proc_get_status($arrQueueEntry[1]);
                if ($arrStatus['running'] === false) {
                    array_splice($this->arrProcessQueue, $key, 1);
                    $resStream = $arrQueueEntry[2];
                    $result = self::readStream($resStream);
                    $this->p_close($arrQueueEntry);
                }
            }
            return $result;
        }
    
        /**
         * get result from process queue
         *
         * @return string|false
         */
        public function readNext()
        {
            $result = false;
            if ($this->processQueueLength() === 0) {
            } else {
                while ($result === false and $this->processQueueLength() > 0) {
                    $result = $this->readProcessQueue();
                }
            }
            return $result;
        }
    }
    
    set_time_limit(0); // don't timeout
    
    $objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes
    
    for ($i = 0; $i < 4; $i++) {
        $processName = "Process_{$i}";
        echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
        $objParallelProcess->queue("php subtask.php {$processName}"); // queue process
    }
    
    // loop through process queue
    while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
        // process response
        echo $strResponse;
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 安装svn网络有问题怎么办
  • ¥15 Python爬取指定微博话题下的内容,保存为txt
  • ¥15 vue2登录调用后端接口如何实现
  • ¥65 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥15 latex怎么处理论文引理引用参考文献