几年来我一直在听说和阅读有关Gearman的信息,但是,由于我的工作性质,我从来没有真正需要调查它;当您编写后端代码时,可扩展性是您留给最终用户的事情,对吧?
错了!但也许需要一个解释。
背景
我们正在考虑迁移到Git以在框架上进行我们的主要开发。我们需要适应很多用例:
- 我们希望支持原子变更集(即包含与单个问题相关的所有变更的变更集:通常是单元测试、代码和文档)。
- 同时,开发人员希望能够将库代码作为git子模块或手册的单一语言等引入。
- 用户需要一个只读的Subversion存储库,以便他们可以继续使用svn:externals。只是因为我们正在迁移并不意味着我们的用户需要迁移。
- 当然,很多人喜欢通过RSS提要保持对提交的关注
- 还有像我这样的受虐狂就像提交电子邮件一样。(我从来没有将收件箱归零,这是一个奇迹吗?)
事实证明,前两项很难同时完成。如果你让你想要的每一个不同的子树都可以离散地克隆,然后构建一个由一堆git子模块组成的存储库,你就失去了原子性。(你最终对你接触的每个子模块都有一个提交,再加上一个masterrepository。Eww。)
不过,我找到了一种使用子树合并的方法。但是,由于这篇文章是关于编写Gearmanworker的,所以我将把它留到另一天。然而,重要的是我发现了其他有趣的东西。
Git允许您定义“钩子”,即在git提交生命周期的不同点运行的脚本。一种可以在服务器上运行的挂钩称为“post-receive”。我发现即使post-receive
在提交被存储库接受后运行,如果你在钩子仍在运行时对gitrepository执行操作,你会得到一些奇怪的行为。在我的例子中,我让脚本在工作树中触发gitpull
。当工作树收到变更集时,它无法干净地应用它们,因为服务器实际上还没有最终确定其状态。我能得到干净拉动的唯一方法是在挂钩完成后拉动。这挫败了我对自动化的尝试。
现在我们来看看Gearman。我意识到我可以让我的post-receive
脚本队列成为Gearman的后台任务。由于这几乎是瞬时操作,这意味着我的钩子在Gearman启动worker之前就已完成;如果没有,我总是可以在我的worker中执行sleep()
以确保它已完成。
编写Gearman任务
所以,现在我可以完成我的任务了,我开始考虑我还能做些什么,突然间,Gearman看起来像是该架构的绝佳解决方案。基本上,它可以防止提交更改的最终用户因挂钩脚本而出现任何延迟,同时允许我执行我们需要的任务自动化。
所以我写了两个任务作为概念验证,使用了直接的PHP和ZendFramework的混合物;这些用于我之前提到的子树合并(实际上,实际工作是在bash脚本中完成的),还有一个用于RSS提要。
Gearman客户端:接收后挂钩
首先,让我们看看我的钩子脚本,它使用了Gearman客户端。我正在使用来自PECL的ext/gearman。我的post-receive
挂钩脚本如下所示:
#!/usr/bin/env php <?php $workload = implode(':', $argv); $client = new GearmanClient(); $client->addServer(); $client->doBackground('post-receive', $workload); $client->doBackground('rss', $workload);
上面的内容应该很简单:我创建了一个GearmanClient
,告诉它使用默认服务器(localhost),并触发两个Gearman函数,post-receive
和rss
,使用我的脚本收到的参数作为有效载荷。我使用doBackground()
方法使任务可以异步执行;钩子脚本不需要在执行任何给定任务时被阻塞,并且可以愉快地继续执行。
任务
我编写了两个类,一个用于我想创建的每个Gearman作业。我本可以将它们作为lambda、普通的旧函数等来完成。我选择了对象,以便我可以测试它们,如果我愿意,也可以从其他脚本中使用它们。这些类实现了一个Command
接口,该接口简单地定义了一个execute()
方法,该方法接受一个GearmanJob
实例。
首先是触发我的子树合并的作业:
<?php namespace ZF\Git; class MergeSubtree implements Command { protected $_logger; protected $_wd = '/var/spool/gearman'; public function setWorkingDir($path) { if (!is_dir($path)) { throw new \Exception('Invalid path provided for working directory'); } $this->_wd = $path; } public function getLogger() { if (null === $this->_logger) { $this->setLogger(new \Zend_Log(new \Zend_Log_Writer_Stream($this->_wd . '/merge_subtree_error.log'))); } return $this->_logger; } public function setLogger(\Zend_Log $logger) { $this->_logger = $logger; } public function executeMerge() { chdir($_ENV['HOME'] . '/working/zf-master'); $return = shell_exec($this->_wd . '/update-master.sh'); return $return; } public function execute(\GearmanJob $job) { $this->getLogger()->info('Received merge request'); $return = $this->executeMerge(); if (strstr($return, 'Failed')) { $this->getLogger()->err('Failed pull: ' . $return); $job->sendFail(); return; } $this->getLogger()->info('Merge complete'); } }
(注意ZF类名前面的反斜杠;因为我使用的是命名空间,所以我需要完全限定我的类。)
上面的类可能有点矫枉过正。但它有一些不错的特性,特别是对于Gearman环境:它会在mymerge脚本中发现任何失败时记录下来。通过这种方式,我可以随时查看我的日志,我开始发现我的存储库之间存在差异。
我的下一堂课有点复杂,但对许多人来说可能更有用。它获取最近的15个gitlog
条目,并创建一个RSS提要:
<?php namespace ZF\Git; class Log2RSS implements Command { protected $_repo; protected $_feedDir = '/var/spool/gearman/feeds'; protected $_feedName = 'rss'; protected $_baseLink = 'http://some.viewgit.repo/?a=commit&p=zf&h='; public function setRepo($repo) { if (!is_dir($repo) || !is_dir($repo . '/.git')) { throw new \Exception('Invalid repository specified; not a Git repository'); } $this->_repo = $repo; return $this; } public function getRepo() { if (null === $this->_repo) { throw new \Exception('No repository directory specified'); } return $this->_repo; } public function setBaseLink($url) { $this->_baseLink = $url; return $this; } public function getBaseLink() { return $this->_baseLink; } public function setFeedDir($path) { if (!is_dir($path) || !is_writable($path)) { throw new \Exception('Invalid feed directory specified, or not writeable'); } $this->_feedDir = $path; return $this; } public function getFeedDir() { return $this->_feedDir; } public function setFeedName($feedName) { $this->_feedName = (string) $feedName; return $this; } public function getFeedName() { return $this->_feedName; } public function generateFeed() { $feed = new \Zend_Feed_Writer_Feed; $feed->setTitle('git commits'); $feed->setLink('http://some.viewgit.repo/'); $feed->setFeedLink('http://some.viewgit.repo/feeds/' . $this->getFeedName() . '.xml', 'rss'); $feed->addAuthor(array( 'name' => 'Name of this feed', 'email' => 'git@somedomain', 'uri' => 'http://some.viewgit.repo/', )); $feed->setDateModified(time()); $feed->setDescription('git commits'); $logs = $this->_parseLogs(); foreach ($logs as $log) { $date = strtotime($log['date']); $entry = $feed->createEntry(); $entry->setTitle($log['commit']); $entry->setLink($this->getBaseLink() . $log['commit']); $entry->setDateModified($date); $entry->setDateCreated($date); $entry->setDescription($log['subject']); $entry->setContent($log['subject'] . "\n\n" . $log['notes']); $feed->addEntry($entry); } $output = $feed->export('rss'); file_put_contents($this->getFeedDir() . '/' . $this->getFeedName() . '.xml', $output); } public function execute(\GearmanJob $job) { $this->generateFeed(); } protected function _parseLogs() { $repoPath = $this->getRepo(); $command = 'git --git-dir=' . $repoPath . '/.git --work-tree=' . $repoPath . ' log --max-count=15 --format=\'Commit: %H%nAuthor: %an%nDate: %cD%nSubject: %s%nNotes: %N%n\' -p'; $log = shell_exec($command); $lines = preg_split("/[\n?|\r]/", $log); $logs = array(); $index = 0; $current = false; foreach ($lines as $line) { if (preg_match('/^(Commit|Author|Date|Subject|Notes): (.*)$/', $line, $matches)) { $current = strtolower($matches[1]); $value = $matches[2]; if ('commit' == $current) { $index++; $logs[$index] = array(); } $logs[$index][$current] = $value; } elseif (false !== $current) { $logs[$index][$current] .= "\n" . $line; } } return $logs; } }
上面的对象可以使用更多的自定义向量——注入RSS提要名称、URL等的方法,以及描述限制的一些阈值,因此它不能截断超过一定数量的行。然而,它完成了工作——它创建了一个RSS提要,其中包含基于每次提交的条目。
齿轮工人
现在,对于工人。由于我正在使用一些ZendFramework类,并且依赖于自动加载,因此我需要设置一些自动加载。我还需要实例化这些类、配置实例并将它们附加到Gearmanworker。
#!/usr/bin/env php <?php ini_set('memory_limit', -1); $autoloader = function($class) { $file = str_replace(array('\', '_'), DIRECTORY_SEPARATOR, $class) . '.php'; return include_once $file; }; spl_autoload_register($autoloader); $mergeSubtree = new ZF\Git\MergeSubtree(); $mergeSubtree->setWorkingDir(__DIR__); $log2rss = new ZF\Git\Log2RSS(); $log2rss->setRepo('/home/gitolite/working/zf-master') ->setFeedName('zf'); $worker = new GearmanWorker(); $worker->addServer(); $worker->addFunction('post-receive', array($mergeSubtree, 'execute')); $worker->addFunction('rss', array($log2rss, 'execute')); while ($worker->work()) { if (GEARMAN_SUCCESS != $worker->returnCode()) { echo "Worker failed: " . $worker->error() . "\n"; } }
为了将它们结合在一起,我正在使用supervisord来管理这个脚本,这样我就不必担心它会死在我身上;当Gearman需要它时,它将始终可用。我不会在这里进入设置;它非常简单。(非常感谢SeanCoates在他2009PHPAdvent上发表的关于在PHP中使用supervisord的文章,并感谢MikeNaberezny在多年前向我介绍了supervisord。)
结论
Gearman是并行化任务以及创建异步流程的绝佳工具。结合supervisord和您选择的脚本语言,您可以毫不费力地取得一些令人难以置信的结果。
这也是挑选ZF组件用于简单任务的一个很好的例子—我正在使用Zend_Log
来报告作业的状态,并使用Zend_Feed_Writer
生成RSS提要。这两个组件可以独立运行,非常适合长时间运行的环境,您无需担心任务需要多长时间。
我强烈建议您研究使用异步处理工具——有多种消息系统、队列等可供您利用,它们可以帮助您从主应用程序中卸载资源密集型任务。
对于那些对我正在开发的子树合并工作流感到好奇的人,我将在本月就该主题撰写更多博文。