开放的编程资料库

当前位置:我爱分享网 > Python教程 > 正文

通过 Openswoole 计时器运行 cronjobs

我建立的网站经常利用cronjobs定期从其他来源获取数据。例如,我可能想每天轮询一次API,或者每月一次从另一个网站抓取内容。Cronjobs非常适合这种情况。

但是,cron有一些问题:

  • 如果作业是将信息写入您的网络应用程序的文件树,您需要确保权限正确,无论是在文件系统级别还是在编写cronjob时(例如,以同一用户身份运行它,或在完成时更改权限)。
  • 如果您正在运行与PHP应用程序关联的控制台工具,您可能需要担心运行作业时特定环境变量是否在范围内。
  • 在容器化环境中,强烈建议不要使用cron,因为这意味着运行另一个守护进程。您可以使用s6-overlay等工具来解决这个问题,但它是问题的另一个载体。

由于我构建的大多数网站都在使用mezzio-swoole,我开始怀疑我是否能够以其他方式处理这些工作。

任务工作者

我们在mezzio-swoole的版本2中引入了与Swoole的任务工作者的集成。任务工作者作为与Web工作者分开的池运行,并允许Web工作者在当前请求不需要结果时卸载繁重的处理。他们作为作为每服务器消息队列的一种形式,非常适合执行诸如发送电子邮件、处理webhook有效负载等操作。

mezzio-swoole中的集成允许您在mezzio-swooleMezzio\Swoole\Task\DeferredListenerDeferredServiceListener实例中装饰PSR-14EventDispatcher监听器;当发生这种情况时,装饰器会使用Swoole服务器创建一个任务,为其提供实际的侦听器和事件。当调度处理任务时,它会使用事件调用侦听器。

结果是,要创建一个任务,您只需从您的代码中分派一个事件。您的代码因此不知道它是被异步处理的。

但是,因为任务在单独的池中工作,这意味着它们收到的事件实例在技术上是副本,而不是引用;因此,您的应用程序代码不能期望侦听器将事件状态传回给您。如果您选择使用此功能,请仅将其用于即发即弃事件。

我现在提出所有这些,因为我稍后会回过头来。

调度作业

Swoole对调度作业的回答是它的计时器。有了计时器,您可以勾选:每次经过一段时间后调用功能。计时器在事件循环中运行,这意味着Swoole公开的每个服务器类型有一个tick()方法,包括HTTP服务器。

那么,显而易见的答案是注册一个报价单:

// Intervals are measured in milliseconds.
// The following means "every 3 hours".
$server->tick(1000 * 60 * 60 * 3, $callback);

现在我遇到了问题:

  • 我如何访问服务器实例?
  • 我可以指定什么作为回调,我如何获得它?

使用mezzio-swoole,注册时间是在HTTP服务器启动的时候。由于Swoole每个事件只允许一个监听器,mezzio-swoole组成一个PSR-14事件分发器,注册到每个SwooleHTTP服务器事件。然后监听器通过PSR-14事件调度器触发事件​​,在内部使用自定义事件类型,提供对最初传递给Swoole服务器事件的数据的访问。这种方法允许应用程序开发人员将监听器附加到事件并修改应用程序的工作方式。

如果需要,为了允许这些“工作流”事件与应用程序分离,我们注册了一个Mezzio\Swoole\Event\EventDispatcherInterface服务,该服务返回一个离散的PSR-14事件调度程序实现。我通常将此别名为PSR-14接口,因此我可以为应用程序事件使用相同的实例。

我使用自己的phly/phly-event-dispatcher实现,它提供了许多不同的侦听器提供程序。最简单的是Phly\EventDispatcher\AttachableListenerProvider,它定义单个listen()方法,用于将侦听器附加到给定的事件类。

最重要的是,Mezzio和Laminas有委托工厂的概念。这些允许您“装饰”服务的创建。一个用例是装饰AttachableListenerProvider服务,并调用它的listen()方法附加监听器。

这是对接下来发生的事情的长篇大论的解释:AttachableListenerProvider上的委托工厂在Mezzio\Swoole\Event\ServerStartEvent上注册一个监听器,后者又注册运行从容器中提取的作业的滴答声:

namespace Mwop;

use Mezzio\Swoole\Event\ServerStartEvent;
use Phly\EventDispatcher\AttachableListenerProvider;
use Psr\Container\ContainerInterface;

class RunPeriodicJobDelegatorFactory
{
    public function __invoke(
        ContainerInterface $container,
        string $serviceName,
        callable $factory,
    ): AttachableListenerProvider {
        /** @var AttachableListenerProvider $provider */
        $provider = $factory();

        $provider->listen(
            ServerStartEvent::class,
            function (ServerStartEvent $e) use ($container): void {
                $e->getServer()->tick(
                    1000 * 60 * 60 * 3,
                    $container->get(SomeJobRunner::class),
                );
            },
        );

        return $provider;
    }
}

然后我会通过配置将其附加到AttachableListenerProvider

use Mwop\RunPeriodicJobDelegatorFactory;
use Phly\EventDispatcher\AttachableListenerProvider;

return [
    'dependencies' => [
        'delegators' => [
            AttachableListenerProvider::class => [
                RunPeriodicJobDelegatorFactory::class,
            ],
        ],
    ],
];

这…很好。但是,我几乎立即遇到了这种方法在应用程序中导致段错误并导致服务器宕机的情况。

这就是任务重新发挥作用的地方。

我修改了上面的例子,改为发送一个事件。

namespace Mwop;

use Mezzio\Swoole\Event\ServerStartEvent;
use Phly\EventDispatcher\AttachableListenerProvider;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;

class RunPeriodicJobDelegatorFactory
{
    public function __invoke(
        ContainerInterface $container,
        string $serviceName,
        callable $factory,
    ): AttachableListenerProvider {
        /** @var AttachableListenerProvider $provider */
        $provider = $factory();

        $provider->listen(
            ServerStartEvent::class,
            function (ServerStartEvent $e) use ($container): void {
                // This is done in the listener to prevent a race condition!
                $dispatcher = $container->get(EventDispatcherInterface::class),

                $e->getServer()->tick(
                    1000 * 60 * 60 * 3,
                    function () use ($dispatcher): void {
                        $dispatcher->dispatch(new SomeJob());
                    }
                );
            },
        );

        return $provider;
    }
}

这种方法需要更多的工作。我现在还需要为SomeJob事件注册一个侦听器,并且我需要将侦听器配置为可延期

首先,让我们创建一个委托器来附加该监听器;它看起来很像前面的示例:

namespace Mwop;

use Phly\EventDispatcher\AttachableListenerProvider;
use Psr\Container\ContainerInterface;

class SomeJobRunnerDelegatorFactory
{
    public function __invoke(
        ContainerInterface $container,
        string $serviceName,
        callable $factory,
    ): AttachableListenerProvider {
        /** @var AttachableListenerProvider $provider */
        $provider = $factory();

        $provider->listen(
            SomeJob::class,
            // Since listeners are invokables, we can likely use the same class as previously
            $container->get(SomeJobRunner::class)
        );

        return $provider;
    }
}

现在进行布线。我们将使用AttachableListenerProvider注册两个委托工厂,但我们还将为我们的SomeJobRunner注册一个委托工厂>班级:

return [
use Mezzio\Swoole\Task\DeferredServiceListenerDelegator;
use Mwop\RunPeriodicJobDelegatorFactory;
use Phly\EventDispatcher\AttachableListenerProvider;

return [
    'dependencies' => [
        'delegators' => [
            AttachableListenerProvider::class => [
                RunPeriodicJobDelegatorFactory::class,
                SomeJobRunnerDelegatorFactory::class,
            ],
            SomeJobRunner::class => [
                DeferredServiceListenerDelegator::class,
            ],
        ],
    ],
];

这概述了为什么委托器工厂配置映射到数组而不是类名:因此您可以为每个服务运行多个。当我们请求我们的AttachableListenerProvider服务,它的工厂将传递给第一个委托者,该委托者的返回值传递给下一个,依此类推。这里的结果是我们最终向它注册了我们的两个侦听器。

第二个注册比较有趣。DeferredServiceListenerDelegator注册了一个Mezzio\Swoole\Task\ServiceBasedTask,其中包含了服务名称和容器。调用时,它通过任务实例提供给它的事件实例。当任务被调用时,它从容器中拉取监听器,然后用事件调用它。

最终结果是,通过在我们的tick处理程序中调度一个事件,我们有效地将执行推入了我们的任务工作者,确保我们不会浪费宝贵的网络工作者来处理周期性事件。

调度作业

我看到这种方法的问题是每次我想创建一个新的定期作业时都需要添加一个滴答声。最重要的是,我无法控制何时它会执行,只有频率。说出你对cron的看法,但它确实了解如何安排特定时间。

所以,我抓住了ChrisTankersley的cron-expression包。这个优秀的包允许您将cron计划字符串传递给它,然后它会让您知道:

  • 如果它是一个有效的时间表。
  • 如果它应该在给定时间运行(默认为“现在”)。

有了这个,我可以创建一个通用的刻度线。

我决定我的配置采用以下格式:

[
    'jobs' => [
        'job name' => [
            'schedule' => 'crontab expression',
            'event'    => 'event class name',
        ],
    ],
]

从那里,我创建了一个Cronjob类,它具有时间表和事件类的属性:

namespace Mwop;

class Cronjob
{
    public function __construct(
        public readonly string $schedule,
        public readonly string $eventClass,
    ) {
    }
}

还有一个代表整个crontab:

namespace Mwop;

use ArrayIterator;
use Countable;
use IteratorAggregate;
use Traversable;

use function count;

class Crontab implements Countable, IteratorAggregate
{
    /** @var Cronjob[] */
    private array $jobs = [];

    public function count(): int
    {
        return count($this->jobs);
    }

    public function getIterator(): Traversable
    {
        return new ArrayIterator($this->jobs);
    }

    public function append(Cronjob $job): void
    {
        $this->jobs[] = $job;
    }
}

cron事件接口将允许我实例化要监听的事件,并允许我在需要时访问时间戳:

namespace Mwop;

use DateTimeInterface;

interface CronEventInterface
{
    public static function forTimestamp(DateTimeInterface $timestamp): self;

    public function timestamp(): DateTimeInterface;
}

配置解析器将验证各种条目,记录并忽略任何无效的条目。我没有显示该代码,因为它相当冗长,并且很容易自行创建。

通过这些更改,我现在可以将我的委托人更新为更通用:

namespace Mwop;

use Cron\CronExpression;
use DateTimeImmutable;
use Mezzio\Swoole\Event\ServerStartEvent;
use Phly\EventDispatcher\AttachableListenerProvider;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;

class RunPeriodicJobDelegatorFactory
{
    public function __invoke(
        ContainerInterface $container,
        string $serviceName,
        callable $factory,
    ): AttachableListenerProvider {
        /** @var AttachableListenerProvider $provider */
        $provider = $factory();

        $config = $container->get('config')['cron']['jobs'] ?? [];

        /** @var Crontab $crontab */
        $crontab = (new ConfigParser())($config, $container->get(LoggerInterface::class));

        // Do not register if there are no jobs!
        if (0 === $crontab->count()) {
            return $provider;
        }

        $provider->listen(
            ServerStartEvent::class,
            function (ServerStartEvent $e) use ($container, $crontab): void {
                // This is done in the listener to prevent a race condition!
                $dispatcher = $container->get(EventDispatcherInterface::class),

                $e->getServer()->tick(
                    1000 * 60, // every minute
                    function () use ($dispatcher, $crontab): void {
                        $now = new DateTimeImmutable('now')
                        foreach ($crontab as $job) {
                            $cron = new CronExpression($job->schedule);
                            if (! $cron->isDue($now)) {
                                continue;
                            }
                            $dispatcher->dispatch(($job->eventClass)::forTimestamp($now));
                        }
                    }
                );
            },
        );

        return $provider;
    }
}

从那里,我可以配置作业:

namespace Mwop;

return [
    'cron' => [
        'jobs' => [
            'some-job' => [
                'schedule' => '*/15 * * * *',
                'event'    => SomeJob::class,
            ],
        ],
    ],
];

在最终版本中,我提取了一个可调用类来注册tick,但仍然仅在充当ServerStartEvent侦听器的匿名函数中从容器中提取该服务,以便防止竞争条件试图拉取事件调度程序服务,然后需要侦听器提供程序……这反过来又需要调度程序。你可以看到它的去向。

在最终版本中,我提取了一个可调用类来注册tick,但仍然仅在充当ServerStartEvent侦听器的匿名函数中从容器中提取该服务,以便防止竞争条件试图拉取事件调度程序服务,然后需要侦听器提供程序……这反过来又需要调度程序。你可以看到它的去向。

这种方法非常有效!

通过每分钟运行一次tick,我可以评估是否有任何应运行的cronjobs,如果有,则分派它们。由于我将侦听器配置为作为任务运行,它们被卸载到任务工作队列中,所以我的网络工作者不要阻止他们。因为这是在同一个进程组中运行,我不必担心权限,而且环境与网络工作者完全相同。在很多方面,它最终是比使用cron更强大的解决方案。

要点

这些年来,我见过许多从PHP应用程序运行cronjobs的解决方案。框架和PHP应用程序包含在将缓冲区刷新到网络服务器后定期运行cronjobs的功能并不少见。它们的主要优点是它们与网络服务器共享相同的环境和权限——这通常对应用程序相关的工作很有用——并且它们不需要在网络服务器上存在单独的守护进程。但是,我倾向于避开这些,因为他们依赖于您的网站获得正常流量的想法,并且他们占用网络工作进程(无论是mod_php还是php-fpm)。

能够将这些卸载到一个单独的工作池中完全消除了我的反对意见。如果所有任务工作人员都很忙,一旦他们通过队列工作,任务就会被处理。并且没有传入的请求会被这个队列阻塞或cronjob本身,一旦它处理。

增加了应用程序的复杂性。但是,通过抽象cronrunner,添加新的cronjobs变成:

  • 创建自定义事件类型。
  • 为完成工作的事件创建侦听器。
  • 向侦听器提供程序注册侦听器。
  • 配置监听器,使其延迟。
  • 添加详细说明日程安排和事件的配置。

我不必担心我是否以正确的用户身份运行作业,用户是否有登录shell(webworker用户通常没有,这增加了设置cronjob的复杂性),cronjob是否在与应用程序相同的环境下运行,等等。最后三项是简单的依赖项和配置连接,只要它们被记录在案即可。

我仍在测试该功能,但计划将其提交给mezzio-swoole,或为其创建一个包。由于mezzio-swoole是容器化应用程序的理想目标,因此拥有此功能将是一个很好的功能那些想在他们的应用程序中提供预定工作的人。

未经允许不得转载:我爱分享网 » 通过 Openswoole 计时器运行 cronjobs

感觉很棒!可以赞赏支持我哟~

赞(0) 打赏