hyperf 队列报错
<?php
declare(strict_types=1);
namespace App\Service;
use App\Job\QueueJob;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;
class QueueService
{
protected DriverInterface $driver;
public function __construct(DriverFactory $driverFactory)
{
$this->driver = $driverFactory->get('default');
}
/**
* 生产消息.
* @param $params
* @param int $delay 延时时间 单位秒
* @return bool
*/
public function push($params, int $delay = 0): bool
{
// 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
// 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
// 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
return $this->driver->push(new QueueJob($params), $delay);
}
}
以下是job代码
<?php
declare(strict_types=1);
namespace App\Job;
use Hyperf\AsyncQueue\Job;
class QueueJob extends Job
{
public $params;
/**
* 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
*/
protected $maxAttempts = 2;
public function __construct($params)
{
// 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
$this->params = $params;
}
public function handle()
{
// 根据参数处理具体逻辑
// 通过具体参数获取模型等
// 这里的逻辑会在 ConsumerProcess 进程中执行
var_dump($this->params);
}
}
$job = make(QueueJob::class);
以下是投递消息代码:
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace App\Controller;
use App\Service\QueueService;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
#[AutoController]
class IndexController extends AbstractController
{
#[Inject]
protected QueueService $service;
/**
* 传统模式投递消息
*/
public function index(): string
{
$this->service->push([
'group@hyperf.io',
'https://doc.hyperf.io',
'https://www.hyperf.io',
]);
return 'success';
}
}
以下是配置代码async_queue.php,其中该配置我执行composer require hyperf/async-queue
,没有产生,是我手动添加的
<?php
return [
'default' => [
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
'redis' => [
'pool' => 'default'
],
'channel' => 'queue',
'timeout' => 2,
'retry_seconds' => 5,
'handle_timeout' => 10,
'processes' => 1,
'concurrent' => [
'limit' => 5,
],
],
];
麻烦告知一下,问题,谢谢