几年来我一直在听说和阅读有关Gearman的信息,但是,由于我的工作性质,我从来没有真正需要调查它;当您编写后端代码时,可扩展性是您留给最终用户的事情,对吧?
错了!但也许需要一个解释。
背景
我们正在考虑迁移到Git以在框架上进行我们的主要开发。我们需要适应很多用例:
- 我们希望支持原子变更集(即包含与单个问题相关的所有变更的变更集:通常是单元测试、代码和文档)。
- 同时,开发人员希望能够将库代码作为git子模块或手册的单一语言等引入。
- 用户需要一个只读的Subversion存储库,以便他们可以继续使用svn:externals。只是因为我们正在迁移并不意味着我们的用户需要迁移。
- 当然,很多人喜欢通过RSS提要保持对提交的关注
- 还有像我这样的受虐狂就像提交电子邮件一样。(我从来没有将收件箱归零,这是一个奇迹吗?)
事实证明,前两项很难同时完成。如果你让你想要的每一个不同的子树都可以离散地克隆,然后构建一个由一堆git子模块组成的存储库,你就失去了原子性。(你最终对你接触的每个子模块都有一个提交,再加上一个masterrepository。Eww。)
不过,我找到了一种使用子树合并的方法。但是,由于这篇文章是关于编写Gearmanworker的,所以我将把它留到另一天。然而,重要的是我发现了其他有趣的东西。
Git允许您定义“钩子”,即在git提交生命周期的不同点运行的脚本。一种可以在服务器上运行的挂钩称为“post-receive”。我发现即使post-receive在提交被存储库接受后运行,如果你在钩子仍在运行时对gitrepository执行操作,你会得到一些奇怪的行为。在我的例子中,我让脚本在工作树中触发gitpull。当工作树收到变更集时,它无法干净地应用它们,因为服务器实际上还没有最终确定其状态。我能得到干净拉动的唯一方法是在挂钩完成后拉动。这挫败了我对自动化的尝试。
现在我们来看看Gearman。我意识到我可以让我的post-receive脚本队列成为Gearman的后台任务。由于这几乎是瞬时操作,这意味着我的钩子在Gearman启动worker之前就已完成;如果没有,我总是可以在我的worker中执行sleep()以确保它已完成。
编写Gearman任务
所以,现在我可以完成我的任务了,我开始考虑我还能做些什么,突然间,Gearman看起来像是该架构的绝佳解决方案。基本上,它可以防止提交更改的最终用户因挂钩脚本而出现任何延迟,同时允许我执行我们需要的任务自动化。
所以我写了两个任务作为概念验证,使用了直接的PHP和ZendFramework的混合物;这些用于我之前提到的子树合并(实际上,实际工作是在bash脚本中完成的),还有一个用于RSS提要。
Gearman客户端:接收后挂钩
首先,让我们看看我的钩子脚本,它使用了Gearman客户端。我正在使用来自PECL的ext/gearman。我的post-receive挂钩脚本如下所示:
#!/usr/bin/env php
<?php
$workload = implode(':', $argv);
$client = new GearmanClient();
$client->addServer();
$client->doBackground('post-receive', $workload);
$client->doBackground('rss', $workload);
上面的内容应该很简单:我创建了一个GearmanClient,告诉它使用默认服务器(localhost),并触发两个Gearman函数,post-receive和rss,使用我的脚本收到的参数作为有效载荷。我使用doBackground()方法使任务可以异步执行;钩子脚本不需要在执行任何给定任务时被阻塞,并且可以愉快地继续执行。
任务
我编写了两个类,一个用于我想创建的每个Gearman作业。我本可以将它们作为lambda、普通的旧函数等来完成。我选择了对象,以便我可以测试它们,如果我愿意,也可以从其他脚本中使用它们。这些类实现了一个Command接口,该接口简单地定义了一个execute()方法,该方法接受一个GearmanJob实例。
首先是触发我的子树合并的作业:
<?php
namespace ZF\Git;
class MergeSubtree implements Command
{
protected $_logger;
protected $_wd = '/var/spool/gearman';
public function setWorkingDir($path)
{
if (!is_dir($path)) {
throw new \Exception('Invalid path provided for working directory');
}
$this->_wd = $path;
}
public function getLogger()
{
if (null === $this->_logger) {
$this->setLogger(new \Zend_Log(new \Zend_Log_Writer_Stream($this->_wd . '/merge_subtree_error.log')));
}
return $this->_logger;
}
public function setLogger(\Zend_Log $logger)
{
$this->_logger = $logger;
}
public function executeMerge()
{
chdir($_ENV['HOME'] . '/working/zf-master');
$return = shell_exec($this->_wd . '/update-master.sh');
return $return;
}
public function execute(\GearmanJob $job)
{
$this->getLogger()->info('Received merge request');
$return = $this->executeMerge();
if (strstr($return, 'Failed')) {
$this->getLogger()->err('Failed pull: ' . $return);
$job->sendFail();
return;
}
$this->getLogger()->info('Merge complete');
}
}
(注意ZF类名前面的反斜杠;因为我使用的是命名空间,所以我需要完全限定我的类。)
上面的类可能有点矫枉过正。但它有一些不错的特性,特别是对于Gearman环境:它会在mymerge脚本中发现任何失败时记录下来。通过这种方式,我可以随时查看我的日志,我开始发现我的存储库之间存在差异。
我的下一堂课有点复杂,但对许多人来说可能更有用。它获取最近的15个gitlog条目,并创建一个RSS提要:
<?php
namespace ZF\Git;
class Log2RSS implements Command
{
protected $_repo;
protected $_feedDir = '/var/spool/gearman/feeds';
protected $_feedName = 'rss';
protected $_baseLink = 'http://some.viewgit.repo/?a=commit&p=zf&h=';
public function setRepo($repo)
{
if (!is_dir($repo) || !is_dir($repo . '/.git')) {
throw new \Exception('Invalid repository specified; not a Git repository');
}
$this->_repo = $repo;
return $this;
}
public function getRepo()
{
if (null === $this->_repo) {
throw new \Exception('No repository directory specified');
}
return $this->_repo;
}
public function setBaseLink($url)
{
$this->_baseLink = $url;
return $this;
}
public function getBaseLink()
{
return $this->_baseLink;
}
public function setFeedDir($path)
{
if (!is_dir($path) || !is_writable($path)) {
throw new \Exception('Invalid feed directory specified, or not writeable');
}
$this->_feedDir = $path;
return $this;
}
public function getFeedDir()
{
return $this->_feedDir;
}
public function setFeedName($feedName)
{
$this->_feedName = (string) $feedName;
return $this;
}
public function getFeedName()
{
return $this->_feedName;
}
public function generateFeed()
{
$feed = new \Zend_Feed_Writer_Feed;
$feed->setTitle('git commits');
$feed->setLink('http://some.viewgit.repo/');
$feed->setFeedLink('http://some.viewgit.repo/feeds/' . $this->getFeedName() . '.xml', 'rss');
$feed->addAuthor(array(
'name' => 'Name of this feed',
'email' => 'git@somedomain',
'uri' => 'http://some.viewgit.repo/',
));
$feed->setDateModified(time());
$feed->setDescription('git commits');
$logs = $this->_parseLogs();
foreach ($logs as $log) {
$date = strtotime($log['date']);
$entry = $feed->createEntry();
$entry->setTitle($log['commit']);
$entry->setLink($this->getBaseLink() . $log['commit']);
$entry->setDateModified($date);
$entry->setDateCreated($date);
$entry->setDescription($log['subject']);
$entry->setContent($log['subject'] . "\n\n" . $log['notes']);
$feed->addEntry($entry);
}
$output = $feed->export('rss');
file_put_contents($this->getFeedDir() . '/' . $this->getFeedName() . '.xml', $output);
}
public function execute(\GearmanJob $job)
{
$this->generateFeed();
}
protected function _parseLogs()
{
$repoPath = $this->getRepo();
$command = 'git --git-dir=' . $repoPath . '/.git --work-tree=' . $repoPath . ' log --max-count=15 --format=\'Commit: %H%nAuthor: %an%nDate: %cD%nSubject: %s%nNotes: %N%n\' -p';
$log = shell_exec($command);
$lines = preg_split("/[\n?|\r]/", $log);
$logs = array();
$index = 0;
$current = false;
foreach ($lines as $line) {
if (preg_match('/^(Commit|Author|Date|Subject|Notes): (.*)$/', $line, $matches)) {
$current = strtolower($matches[1]);
$value = $matches[2];
if ('commit' == $current) {
$index++;
$logs[$index] = array();
}
$logs[$index][$current] = $value;
} elseif (false !== $current) {
$logs[$index][$current] .= "\n" . $line;
}
}
return $logs;
}
}
上面的对象可以使用更多的自定义向量——注入RSS提要名称、URL等的方法,以及描述限制的一些阈值,因此它不能截断超过一定数量的行。然而,它完成了工作——它创建了一个RSS提要,其中包含基于每次提交的条目。
齿轮工人
现在,对于工人。由于我正在使用一些ZendFramework类,并且依赖于自动加载,因此我需要设置一些自动加载。我还需要实例化这些类、配置实例并将它们附加到Gearmanworker。
#!/usr/bin/env php
<?php
ini_set('memory_limit', -1);
$autoloader = function($class) {
$file = str_replace(array('\', '_'), DIRECTORY_SEPARATOR, $class) . '.php';
return include_once $file;
};
spl_autoload_register($autoloader);
$mergeSubtree = new ZF\Git\MergeSubtree();
$mergeSubtree->setWorkingDir(__DIR__);
$log2rss = new ZF\Git\Log2RSS();
$log2rss->setRepo('/home/gitolite/working/zf-master')
->setFeedName('zf');
$worker = new GearmanWorker();
$worker->addServer();
$worker->addFunction('post-receive', array($mergeSubtree, 'execute'));
$worker->addFunction('rss', array($log2rss, 'execute'));
while ($worker->work()) {
if (GEARMAN_SUCCESS != $worker->returnCode()) {
echo "Worker failed: " . $worker->error() . "\n";
}
}
为了将它们结合在一起,我正在使用supervisord来管理这个脚本,这样我就不必担心它会死在我身上;当Gearman需要它时,它将始终可用。我不会在这里进入设置;它非常简单。(非常感谢SeanCoates在他2009PHPAdvent上发表的关于在PHP中使用supervisord的文章,并感谢MikeNaberezny在多年前向我介绍了supervisord。)
结论
Gearman是并行化任务以及创建异步流程的绝佳工具。结合supervisord和您选择的脚本语言,您可以毫不费力地取得一些令人难以置信的结果。
这也是挑选ZF组件用于简单任务的一个很好的例子—我正在使用Zend_Log来报告作业的状态,并使用Zend_Feed_Writer生成RSS提要。这两个组件可以独立运行,非常适合长时间运行的环境,您无需担心任务需要多长时间。
我强烈建议您研究使用异步处理工具——有多种消息系统、队列等可供您利用,它们可以帮助您从主应用程序中卸载资源密集型任务。
对于那些对我正在开发的子树合并工作流感到好奇的人,我将在本月就该主题撰写更多博文。
