追梦12138 2019-04-28 13:49 采纳率: 50%
浏览 1098
已采纳

php处理从rabbitmq中得到的数据

php处理从rabbitmq中得到的数据php处理从rabbitmq中得到的数据

  • 写回答

1条回答

  • 追梦12138 2019-05-07 15:25
    关注

    以下是以direct为例

    public function start(){
    // 创建一个连接服务器的connection
    if ($this->connection == null){
    return false;
    }
    // 创建一个通道
    $channel=$this->connection->channel();
    /**
    * exchange_declare(
    * name:交换机名
    * EXCHANGER_TYPE_DIRECT:交换机类型
    * passive: false
    * durable: true 交换机将在服务器重启后生存
    * auto_delete: false 通道关闭的时候,交换机不会被删除
    * )
    */
    $channel->exchange_declare(self::exchange,self::EXCHANGER_TYPE_DIRECT,false,true,false);

            /**
             * queue_declare(
             * name:队列名
             * passive: false
             * durable: true 队列是否持久化
             * exclusive: false 当前连接不在时,队列是否自动删除
             * auto_delete: false 没有consumer时,队列是否自动删除
             * )
             */
            $channel->queue_declare(self::queue, false, true, false, false);
    

    // list($queue_name, ,) = $channel->queue_declare(self::queue, false, false, true, false);

            /**
             * queue_bind(  绑定队列到一个交换机
             * string $queue:队列名
             * string $exchange:交换机名
             * string $routing_key:路由key
             * )
             */
            $channel->queue_bind(self::queue, self::exchange, self::routing_key);
    
            /**
             * basic_consume(  指的是channel在某个队列上注册消费者,那在这个队列有消息来了之后,就会把消息转发到给此channel处理,如果 这个队列有多个消费者,则会采用轮转的方式将消息分发给消息者
             * string $queue:队列名
             * string $consumer_tag:消费者
             * bool $no_local:
             * bool $no_ack:
             * bool $exclusive:
             * bool $nowait:
             * callback|null $callback:
             * )
             */
            $channel->basic_consume(self::queue,'',false,false,false,false,array($this,'process_message'));
    
            /**
             * 脚本退出前执行shutdown来关闭通道与连接
             * shutdown:方法名
             * string $channel:通道
             * connection:连接
             */
            register_shutdown_function(array($this,'shutdown'),$channel,$this->connection);
            echo 'true';
    

    // 判断是否存在回调函数
    while(count($channel->callbacks)){
    // 执行回调函数
    $channel->wait();
    }
    }

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮