douju1968 2014-04-28 12:32
浏览 59
已采纳

PHP pThreads - 你如何执行垃圾收集?

Given the following code, how can you ensure that the completed MyWorker objects are destroyed/their memory freed?

Due to what my script does I need ~50 threads constantly obtaining data from cURL, and processing it.

I've tried both having the threads never leave run(), or as shown in this sample code where they leave run and have the collect function spawn a new copy of them.

But not matter what I hit the memory limits after a minute or so. Could you tell me what I'm doing wrong?

class MyWorker extends Threaded
{
    public $complete;
    public function __construct() {$this->complete = false;}
    public function run() {$this->complete = true;}
}

$pool = new Pool(50);
for($i=0; $i<50; $i++)
    $pool->submit(new MyWorker());
$pool->collect(function($worker)
{
    global $pool;
    if($worker->complete == true)
        $pool->submit(new MyWorker());
    return $worker->complete;
});
$pool->shutdown();
  • 写回答

1条回答 默认 最新

  • douxin0251 2014-04-30 09:08
    关注

    Why

    Why should I collect anyway ?

    The Worker threads provided by pthreads require that the programmer retain the correct references to Threaded objects that are being executed. This is difficult for the programmer to achieve in userland reliably, so pthreads provides the Pool abstraction of Workers which maintains references for you.

    In order to maintain those reference pthreads needs to know when an object is garbage, it provides the Pool::collect interface for this purpose. Pool::collect takes a Closure which should accept a Threaded object and return boolean true if the passed object is finished executing.

    How

    The task at hand ...

    In order to keep submitting tasks for execution and not exhaust resources, you must create a queue of completed tasks for resubmission to the Pool

    The following code demonstrates a sane way of doing this:

    <?php
    
    define("LOG", Mutex::create());
    /* thread safe log to stdout */
    function slog($message, $args = []) {
        $args = func_get_args();
        if (($message = array_shift($args))) {
            Mutex::lock(LOG);
            echo vsprintf(
                "{$message}
    ", $args);
            Mutex::unlock(LOG);
        }
    }
    
    class Request extends Threaded {
        public function __construct($url) {
            $this->url = $url;
        }
    
        public function run() {
            $response = @file_get_contents($this->url);
    
            slog("%s returned %d bytes",
                $this->url, strlen($response));
    
            $this->reQueue();
        }
    
        public function getURL()        { return $this->url; }
    
        public function isQueued()      { return $this->queued; }
        public function reQueue()       { $this->queued = true; }
    
        protected $url;
        protected $queued = false;
    }
    
    /* create a pool of 50 threads */
    $pool = new Pool(50);
    
    /* submit 50 requests for execution */
    while (@$i++<50) {
        $pool->submit(new Request(sprintf(
            "http://google.com/?q=%s", md5($i))));
    }
    
    do {
        $queue = array();
    
        $pool->collect(function($request) use ($pool, &$queue) {
            /* check for items to requeue */
            if ($request->isQueued()) {
                /* get the url for the request, insert into queue */
                $queue[] = 
                    $request->getURL();
                /* allow this job to be collected */
                return true;
            }
        });
    
        /* resubmit completed tasks to pool */
        if (count($queue)) {
            foreach ($queue as $queued)
                $pool->submit(new Request($queued));
        }
    
        /* sleep for a couple of seconds here ... because, be nice ! */
        usleep(2.5 * 1000000);
    } while (true);
    ?>
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 基于单片机的靶位控制系统
  • ¥15 AT89C51控制8位八段数码管显示时钟。
  • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
  • ¥15 下图接收小电路,谁知道原理
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测
  • ¥15 ETLCloud 处理json多层级问题
  • ¥15 matlab中使用gurobi时报错