drcmg28484 2012-09-07 10:50
浏览 65
已采纳

Rabbitmq队列与PHP中的进程分叉

I have simple queue worker based on standard AMQP Class from PHP. It works with RabbitMQ as a server. I have Queue Class for initialize AMQP connection wirh RabbitMQ. Everything works fine with code below:

$queue = new Queue('myQueue');

 while($envelope = $queue->getEnvelope()) {
   $command = unserialize($envelope->getBody());

   if ($command instanceof QueueCommand) {
     try {
       if ($command->execute()) {
         $queue->ack($envelope->getDeliveryTag());
       }
     } catch (Exception $exc) {
       // an error occurred so do some processing to deal with it
     }
   }
 }

However I wanted to fork queue command execution, but in this case queue goes endless with the first command over and over again. I can't acknowledge RabbitMQ that message was recieved with $queue->ack(); My forked version (simplified with only one child for testing sake) looks like this :

$queue = new Queue('myQueue');

while($envelope = $queue->getEnvelope()) {
  $command = unserialize($envelope->getBody());

  if ($command instanceof QueueCommand) {
    $pid = pcntl_fork();

    if ($pid) {
      //parent proces
      //wait for child
      pcntl_waitpid($pid, $status, WUNTRACED);

      if($status > 0) {
        // an error occurred so do some processing to deal with it
      } else {
        //remove Command from queue
        $queue->ack($envelope->getDeliveryTag());
      }
    } else {
      //child process
      try {
        if ($command->execute()) {
          exit(0);
        }
      } catch (Exception $exc) {
        exit(1);
      }
    }
  }
}

any help will be appreciated...

  • 写回答

1条回答 默认 最新

  • du9698 2012-09-07 11:56
    关注

    I finally solved the problem! I had to run ack command from child process, it works this way! This is correct code:

    $queue = new Queue('myQueue');
    
    while($envelope = $queue->getEnvelope()) {
      $command = unserialize($envelope->getBody());
    
      if ($command instanceof QueueCommand) {
        $pid = pcntl_fork();
    
        if ($pid) {
          //parent proces
          //wit for child
          pcntl_waitpid($pid, $status, WUNTRACED);
    
          if($status > 0) {
            // an error occurred so do some processing to deal with it
          } else {
            // sucess
          }
        } else {
          //child process
          try {
            if ($command->execute()) {
              $queue->ack($envelope->getDeliveryTag());
              exit(0);
            }
          } catch (Exception $exc) {
            exit(1);
          }
        }
      }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 echarts绘制图表
  • ¥15 根据企业名称 对照两个文件 样本筛选/匹配
  • ¥15 Linux环境下CA证书更新问题
  • ¥15 sqlserver语句提取结果以外数据
  • ¥60 微信小程序如何上传QQ聊天文件
  • ¥300 开发的系统遭到无良商家的破解,请问如何防止再次发生,并追回损失
  • ¥15 java代码写在记事本上后在cmd上运行时无报错但又没生成文件
  • ¥15 关于#python#的问题:在跑ldsc数据整理的时候一直抱这种错误,要么--out识别不了参数,要么--merge-alleles识别不了参数(操作系统-linux)
  • ¥15 PPOCRLabel
  • ¥15 混合键合键合机对准标识