剖析Laravel队列系统--推送作业到队列

译文 GitHub https://github.com/yuansir/diving-laravel-zh

原文链接https://divinglaravel.com/queue-system/pushing-jobs-to-queue

有几种方法可以将作业推送到队列中:

Queue::push(new InvoiceEmail($order));

Bus::dispatch(new InvoiceEmail($order));

dispatch(new InvoiceEmail($order));

(new InvoiceEmail($order))->dispatch();

调用Queue facade 是对应用程序使用的队列驱动的调用,如果你使用数据库队列驱动,调用push方法是调用Queue\DatabaseQueue类的push方法。

有几种有用的方法可以使用:

// 将作业推送到特定的队列
Queue::pushOn('emails', new InvoiceEmail($order));

// 在给定的秒数之后推送作业
Queue::later(60, new InvoiceEmail($order));

// 延迟后将作业推送到特定的队列
Queue::laterOn('emails', 60, new InvoiceEmail($order));

// 推送多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
]);

// 推送特定队列上的多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
], null, 'emails');

调用这些方法之后,所选择的队列驱动会将给定的信息存储在存储空间中,供 workers 按需获取。

使用命令总线

使用命令总线调度作业进行排队可以给你额外控制权; 您可以从作业类中设置选定的connection, queue, and delay 来决定命令是否应该排队或立即运行,在运行之前通过管道发送作业,实际上你甚至可以从你的作业类中处理整个队列过程。

Bug facade 代理到Contracts\Bus\Dispatcher 容器别名,此别名解析为Bus\Dispatcher内的Bus\BusServiceProvider的一个实例:

$this->app->singleton(Dispatcher::class, function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
});

所以Bus::dispatch() 调用的 dispatch() 方法是 Bus\Dispatcher 类的:

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}

该方法决定是否将给定的作业分派到队列或立即运行,commandShouldBeQueued() 方法检查作业类是否是 Contracts\Queue\ShouldQueue, 的实例,因此使用此方法,您的作业只有继承了ShouldQueue接口才会被放到队列中。

我们不会在这篇中深入dispatchNow,我们将在深入 worker 如何执行作业中详细讨论。 现在我们来看看总线如何调度你的工作队列:

public function dispatchToQueue($command)
{
    $connection = isset($command->connection) ? $command->connection : null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    } else {
        return $this->pushCommandToQueue($queue, $command);
    }
}

首先 Laravel 会检查您的作业类中是否定义了connection 属性,使用这个属性可以设置 Laravel 应该将排队作业发送到哪个连接,如果未定义任何属性,将使用 null 属性,在这种情况下 Laravel 将使用默认连接。

通过设置的连接,Laravel 使用一个 queueResolver 闭包来构建应该使用哪个队列驱动的实例,当注册调度器实例的时候这个闭包在Bus\BusServiceProvider 中被设置:

function ($connection = null) use ($app) {
    return $app[Contracts\Queue\Factory::class]->connection($connection);
}

Contracts\Queue\FactoryQueue\QueueManager的别名,换句话说,该闭包返回一个QueueManager实例,并为manager设置所使用的队列驱动需要的连接。

最后,dispatchToQueue方法检查作业类是否具有queue方法,如果调度器调用此方法,可以完全控制作业排队的方式,您可以选择队列,分配延迟,设置最大重试次数, 超时等

如果没有找到 queue 方法,对 pushCommandToQueue() 的调用将调用所选队列驱动上的push方法:

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    return $queue->push($command);
}

调度器检查 Job 类中的 queuedelay ,并根据此选择适当的队列方法。

所以我可以设置工作类中的队列,延迟和连接?

是的,您还可以设置一个triestimeout 属性,队列驱动也将使用这些值,以下工作类示例:

class SendInvoiceEmail{
    public $connection = 'default';

    public $queue = 'emails';

    public $delay = 60;

    public $tries = 3;

    public $timeout = 20;
}

Setting job configuration on the fly

即时设置作业配置

使用 dispatch() 全局帮助方法,您可以执行以下操作:

dispatch(new InvoiceEmail($order))
        ->onConnection('default')
        ->onQueue('emails')
        ->delay(60);

这只有在您在作业类中使用 Bus\Queueable trait 时才有效,此 trait 包含几种方法,您可以在分发作业类之前在作业类上设置一些属性,例如:

public function onQueue($queue)
{
    $this->queue = $queue;

    return $this;
}

但是在你的例子中,我们调用 dispatch()的返回方法!

这是诀窍:

function dispatch($job)
{
    return new PendingDispatch($job);
}

这是在Foundation/helpers.php中的dispatch()帮助方法的定义,它返回一个Bus\PendingDispatch 的实例,并且在这个类中,我们有这样的方法:

public function onQueue($queue)
{
    $this->job->onQueue($queue);

    return $this;
}

所以当我们执行 dispatch(new JobClass())->onQueue('default'), 时,PendingDispatchonQueue 方法将调用 job 类上的 onQueue 方法,如前所述,作业类需要使用所有这些的 Queueable trait 来工作。

那么调用 Dispatcher::dispatch 方法的那部分是哪里?

一旦你执行了 dispatch(new JobClass())->onQueue('default'),你将让作业实例准备好进行调度,实际的工作发生在 PendingDispatch::__destruct()中:

public function __destruct()
{
    app(Dispatcher::class)->dispatch($this->job);
}

调用此方法时,将从容器中解析 Dispatcher 的一个实例,然后调用它的dispatch()方法。 destruct()是一种 PHP 魔术方法,当对对象的所有引用不再存在或脚本终止时,都会调用,因为我们不会立即在 __destruct方法中存储对PendingDispatch 实例的引用,

// Here the destructor will be called rightaway
dispatch(new JobClass())->onQueue('default');

// 如果我们调用unset($temporaryVariable),那么析构函数将被调用
// 或脚本完成执行时。
$temporaryVariable = dispatch(new JobClass())->onQueue('default');

使用可调度的特征

您可以使用工作类上的 Bus\Dispatchable trait 来调度您的工作,如下所示:

(new InvoiceEmail($order))->dispatch();

调度方法 Dispatchable看起来像这样:

public static function dispatch()
{
    return new PendingDispatch(new static(...func_get_args()));
}

正如你可以看到它使用一个 PendingDispatch的实例,这意味着我们可以做一些像这样的事:

(new InvoiceEmail($order))->dispatch()->onQueue('emails');

转载请注明:  转载自Ryan 是菜鸟 | LNMP 技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文链接地址: 剖析 Laravel 队列系统–推送作业到队列

知识共享许可协议 本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可


Original url: Access
Created at: 2018-12-19 13:50:59
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论