开放的编程资料库

当前位置:我爱分享网 > PHP教程 > 正文

用 PHP 编写 Gearman Worker

几年来我一直在听说和阅读有关Gearman的信息,但是,由于我的工作性质,我从来没有真正需要调查它;当您编写后端代码时,可扩展性是您留给最终用户的事情,对吧?

错了!但也许需要一个解释。

背景

我们正在考虑迁移到Git以在框架上进行我们的主要开发。我们需要适应很多用例:

  • 我们希望支持原子变更集(即包含与单个问题相关的所有变更的变更集:通常是单元测试、代码和文档)。
  • 同时,开发人员希望能够将库代码作为git子模块或手册的单一语言等引入。
  • 用户需要一个只读的Subversion存储库,以便他们可以继续使用svn:externals。只是因为我们正在迁移并不意味着我们的用户需要迁移。
  • 当然,很多人喜欢通过RSS提要保持对提交的关注
  • 还有像我这样的受虐狂就像提交电子邮件一样。(我从来没有将收件箱归零,这是一个奇迹吗?)

事实证明,前两项很难同时完成。如果你让你想要的每一个不同的子树都可以离散地克隆,然后构建一个由一堆git子模块组成的存储库,你就失去了原子性。(你最终对你接触的每个子模块都有一个提交,再加上一个masterrepository。Eww。)

不过,我找到了一种使用子树合并的方法。但是,由于这篇文章是关于编写Gearmanworker的,所以我将把它留到另一天。然而,重要的是我发现了其他有趣的东西。

Git允许您定义“钩子”,即在git提交生命周期的不同点运行的脚本。一种可以在服务器上运行的挂钩称为“post-receive”。我发现即使post-receive在提交被存储库接受后运行,如果你在钩子仍在运行时对gitrepository执行操作,你会得到一些奇怪的行为。在我的例子中,我让脚本在工作树中触发gitpull。当工作树收到变更集时,它无法干净地应用它们,因为服务器实际上还没有最终确定其状态。我能得到干净拉动的唯一方法是在挂钩完成后拉动。这挫败了我对自动化的尝试。

现在我们来看看Gearman。我意识到我可以让我的post-receive脚本队列成为Gearman的后台任务。由于这几乎是瞬时操作,这意味着我的钩子在Gearman启动worker之前就已完成;如果没有,我总是可以在我的worker中执行sleep()以确保它已完成。

编写Gearman任务

所以,现在我可以完成我的任务了,我开始考虑我还能做些什么,突然间,Gearman看起来像是该架构的绝佳解决方案。基本上,它可以防止提交更改的最终用户因挂钩脚本而出现任何延迟,同时允许我执行我们需要的任务自动化。

所以我写了两个任务作为概念验证,使用了直接的PHP和ZendFramework的混合物;这些用于我之前提到的子树合并(实际上,实际工作是在bash脚本中完成的),还有一个用于RSS提要。

Gearman客户端:接收后挂钩

首先,让我们看看我的钩子脚本,它使用了Gearman客户端。我正在使用来自PECL的ext/gearman。我的post-receive挂钩脚本如下所示:

#!/usr/bin/env php
<?php
$workload = implode(':', $argv);
$client = new GearmanClient();
$client->addServer();
$client->doBackground('post-receive', $workload);
$client->doBackground('rss', $workload);

上面的内容应该很简单:我创建了一个GearmanClient,告诉它使用默认服务器(localhost),并触发两个Gearman函数,post-receiverss,使用我的脚本收到的参数作为有效载荷。我使用doBackground()方法使任务可以异步执行;钩子脚本不需要在执行任何给定任务时被阻塞,并且可以愉快地继续执行。

任务

我编写了两个类,一个用于我想创建的每个Gearman作业。我本可以将它们作为lambda、普通的旧函数等来完成。我选择了对象,以便我可以测试它们,如果我愿意,也可以从其他脚本中使用它们。这些类实现了一个Command接口,该接口简单地定义了一个execute()方法,该方法接受一个GearmanJob实例。

首先是触发我的子树合并的作业:

<?php

namespace ZF\Git;

class MergeSubtree implements Command
{
    protected $_logger;
    protected $_wd = '/var/spool/gearman';

    public function setWorkingDir($path)
    {
        if (!is_dir($path)) {
            throw new \Exception('Invalid path provided for working directory');
        }
        $this->_wd = $path;
    }

    public function getLogger()
    {
        if (null === $this->_logger) {
            $this->setLogger(new \Zend_Log(new \Zend_Log_Writer_Stream($this->_wd . '/merge_subtree_error.log')));
        }
        return $this->_logger;
    }

    public function setLogger(\Zend_Log $logger)
    {
        $this->_logger = $logger;
    }

    public function executeMerge()
    {
        chdir($_ENV['HOME'] . '/working/zf-master');
        $return = shell_exec($this->_wd . '/update-master.sh');
        return $return;
    }

    public function execute(\GearmanJob $job)
    {
        $this->getLogger()->info('Received merge request');
        $return = $this->executeMerge();
        if (strstr($return, 'Failed')) {
            $this->getLogger()->err('Failed pull: ' . $return);
            $job->sendFail();
            return;
        }
        $this->getLogger()->info('Merge complete');
    }
}

(注意ZF类名前面的反斜杠;因为我使用的是命名空间,所以我需要完全限定我的类。)

上面的类可能有点矫枉过正。但它有一些不错的特性,特别是对于Gearman环境:它会在mymerge脚本中发现任何失败时记录下来。通过这种方式,我可以随时查看我的日志,我开始发现我的存储库之间存在差异。

我的下一堂课有点复杂,但对许多人来说可能更有用。它获取最近的15个gitlog条目,并创建一个RSS提要:

<?php
namespace ZF\Git;

class Log2RSS implements Command
{
    protected $_repo;
    protected $_feedDir  = '/var/spool/gearman/feeds';
    protected $_feedName = 'rss';
    protected $_baseLink = 'http://some.viewgit.repo/?a=commit&p=zf&h=';

    public function setRepo($repo)
    {
        if (!is_dir($repo) || !is_dir($repo . '/.git')) {
            throw new \Exception('Invalid repository specified; not a Git repository');
        }
        $this->_repo = $repo;
        return $this;
    }

    public function getRepo()
    {
        if (null === $this->_repo) {
            throw new \Exception('No repository directory specified');
        }
        return $this->_repo;
    }

    public function setBaseLink($url)
    {
        $this->_baseLink = $url;
        return $this;
    }

    public function getBaseLink()
    {
        return $this->_baseLink;
    }

    public function setFeedDir($path)
    {
        if (!is_dir($path) || !is_writable($path)) {
            throw new \Exception('Invalid feed directory specified, or not writeable');
        }
        $this->_feedDir = $path;
        return $this;
    }

    public function getFeedDir()
    {
        return $this->_feedDir;
    }

    public function setFeedName($feedName)
    {
        $this->_feedName = (string) $feedName;
        return $this;
    }

    public function getFeedName()
    {
        return $this->_feedName;
    }

    public function generateFeed()
    {
        $feed = new \Zend_Feed_Writer_Feed;
        $feed->setTitle('git commits');
        $feed->setLink('http://some.viewgit.repo/');
        $feed->setFeedLink('http://some.viewgit.repo/feeds/' . $this->getFeedName() . '.xml', 'rss');
        $feed->addAuthor(array(
            'name'  => 'Name of this feed',
            'email' => 'git@somedomain',
            'uri'   => 'http://some.viewgit.repo/',
        ));
        $feed->setDateModified(time());
        $feed->setDescription('git commits');

        $logs = $this->_parseLogs();

        foreach ($logs as $log) {
            $date  = strtotime($log['date']);
            $entry = $feed->createEntry();
            $entry->setTitle($log['commit']);
            $entry->setLink($this->getBaseLink() . $log['commit']);
            $entry->setDateModified($date);
            $entry->setDateCreated($date);
            $entry->setDescription($log['subject']);
            $entry->setContent($log['subject'] . "\n\n" . $log['notes']);
            $feed->addEntry($entry);
        }

        $output = $feed->export('rss');
        file_put_contents($this->getFeedDir() . '/' . $this->getFeedName() . '.xml', $output);
    }

    public function execute(\GearmanJob $job)
    {
        $this->generateFeed();
    }

    protected function _parseLogs()
    {
        $repoPath = $this->getRepo();

        $command = 'git --git-dir=' . $repoPath . '/.git --work-tree=' . $repoPath . ' log --max-count=15 --format=\'Commit: %H%nAuthor: %an%nDate: %cD%nSubject: %s%nNotes: %N%n\' -p';
        $log     = shell_exec($command);
        $lines   = preg_split("/[\n?|\r]/", $log);
        $logs    = array();
        $index   = 0;
        $current = false;
        foreach ($lines as $line) {
            if (preg_match('/^(Commit|Author|Date|Subject|Notes): (.*)$/', $line, $matches)) {
                $current = strtolower($matches[1]);
                $value   = $matches[2];
                if ('commit' == $current) {
                    $index++;
                    $logs[$index] = array();
                }
                $logs[$index][$current] = $value;
            } elseif (false !== $current) {
                $logs[$index][$current] .= "\n" . $line;
            }
        }
        return $logs;
    }
}

上面的对象可以使用更多的自定义向量——注入RSS提要名称、URL等的方法,以及描述限制的一些阈值,因此它不能截断超过一定数量的行。然而,它完成了工作——它创建了一个RSS提要,其中包含基于每次提交的条目。

齿轮工人

现在,对于工人。由于我正在使用一些ZendFramework类,并且依赖于自动加载,因此我需要设置一些自动加载。我还需要实例化这些类、配置实例并将它们附加到Gearmanworker。

#!/usr/bin/env php
<?php
ini_set('memory_limit', -1);

$autoloader = function($class) {
    $file = str_replace(array('\', '_'), DIRECTORY_SEPARATOR, $class) . '.php';
    return include_once $file;
};
spl_autoload_register($autoloader);

$mergeSubtree = new ZF\Git\MergeSubtree();
$mergeSubtree->setWorkingDir(__DIR__);

$log2rss      = new ZF\Git\Log2RSS();
$log2rss->setRepo('/home/gitolite/working/zf-master')
        ->setFeedName('zf');

$worker = new GearmanWorker();
$worker->addServer();
$worker->addFunction('post-receive', array($mergeSubtree, 'execute'));
$worker->addFunction('rss', array($log2rss, 'execute'));
while ($worker->work()) {
    if (GEARMAN_SUCCESS != $worker->returnCode()) {
        echo "Worker failed: " . $worker->error() . "\n";
    }
}

为了将它们结合在一起,我正在使用supervisord来管理这个脚本,这样我就不必担心它会死在我身上;当Gearman需要它时,它将始终可用。我不会在这里进入设置;它非常简单。(非常感谢SeanCoates在他2009PHPAdvent上发表的关于在PHP中使用supervisord的文章,并感谢MikeNaberezny在多年前向我介绍了supervisord。)

结论

Gearman是并行化任务以及创建异步流程的绝佳工具。结合supervisord和您选择的脚本语言,您可以毫不费力地取得一些令人难以置信的结果。

这也是挑选ZF组件用于简单任务的一个很好的例子—我正在使用Zend_Log来报告作业的状态,并使用Zend_Feed_Writer生成RSS提要。这两个组件可以独立运行,非常适合长时间运行的环境,您无需担心任务需要多长时间。

我强烈建议您研究使用异步处理工具——有多种消息系统、队列等可供您利用,它们可以帮助您从主应用程序中卸载资源密集型任务。

对于那些对我正在开发的子树合并工作流感到好奇的人,我将在本月就该主题撰写更多博文。

未经允许不得转载:我爱分享网 » 用 PHP 编写 Gearman Worker

感觉很棒!可以赞赏支持我哟~

赞(0) 打赏