mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-10 08:52:39 +01:00
Allow tasks to yield to other tasks
Summary: For Harbormaster tasks which want to poll or wait, this lets them say "try again a little later" without having to sleep and hold a queue slot. This is basically the same as failing, except that we don't increment the failure counter. Instead, we just set the current lease to the correct length and then exit. The task will be retried after the lease expires. Test Plan: Using both `bin/harbormaster` and `phd debug taskmaster`, ran a lot of waiting tasks through the queue, faking them to either yield or not yield in a controlled manner. The queue responded as expected, yielding tasks appropraitely and retrying them later. Reviewers: btrahan Reviewed By: btrahan Subscribers: epriestley Differential Revision: https://secure.phabricator.com/D8792
This commit is contained in:
parent
afd04731ab
commit
4a6d2e9c97
8 changed files with 73 additions and 21 deletions
|
@ -2239,6 +2239,7 @@ phutil_register_library_map(array(
|
|||
'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php',
|
||||
'PhabricatorWorkerTaskUpdateController' => 'applications/daemon/controller/PhabricatorWorkerTaskUpdateController.php',
|
||||
'PhabricatorWorkerTestCase' => 'infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php',
|
||||
'PhabricatorWorkerYieldException' => 'infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php',
|
||||
'PhabricatorWorkingCopyDiscoveryTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyDiscoveryTestCase.php',
|
||||
'PhabricatorWorkingCopyPullTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyPullTestCase.php',
|
||||
'PhabricatorWorkingCopyTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyTestCase.php',
|
||||
|
@ -5126,6 +5127,7 @@ phutil_register_library_map(array(
|
|||
'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController',
|
||||
'PhabricatorWorkerTaskUpdateController' => 'PhabricatorDaemonController',
|
||||
'PhabricatorWorkerTestCase' => 'PhabricatorTestCase',
|
||||
'PhabricatorWorkerYieldException' => 'Exception',
|
||||
'PhabricatorWorkingCopyDiscoveryTestCase' => 'PhabricatorWorkingCopyTestCase',
|
||||
'PhabricatorWorkingCopyPullTestCase' => 'PhabricatorWorkingCopyTestCase',
|
||||
'PhabricatorWorkingCopyTestCase' => 'PhabricatorTestCase',
|
||||
|
|
|
@ -132,35 +132,35 @@ final class HarbormasterBuildQuery
|
|||
private function buildWhereClause(AphrontDatabaseConnection $conn_r) {
|
||||
$where = array();
|
||||
|
||||
if ($this->ids) {
|
||||
if ($this->ids !== null) {
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'id IN (%Ld)',
|
||||
$this->ids);
|
||||
}
|
||||
|
||||
if ($this->phids) {
|
||||
if ($this->phids !== null) {
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'phid in (%Ls)',
|
||||
$this->phids);
|
||||
}
|
||||
|
||||
if ($this->buildStatuses) {
|
||||
if ($this->buildStatuses !== null) {
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'buildStatus in (%Ls)',
|
||||
$this->buildStatuses);
|
||||
}
|
||||
|
||||
if ($this->buildablePHIDs) {
|
||||
if ($this->buildablePHIDs !== null) {
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'buildablePHID IN (%Ls)',
|
||||
$this->buildablePHIDs);
|
||||
}
|
||||
|
||||
if ($this->buildPlanPHIDs) {
|
||||
if ($this->buildPlanPHIDs !== null) {
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'buildPlanPHID IN (%Ls)',
|
||||
|
|
|
@ -28,24 +28,17 @@ final class HarbormasterWaitForPreviousBuildStepImplementation
|
|||
// finished.
|
||||
$plan = $build->getBuildPlan();
|
||||
|
||||
$log = null;
|
||||
$log_start = null;
|
||||
$log = $build->createLog($build_target, "waiting", "blockers");
|
||||
$log_start = $log->start();
|
||||
|
||||
$blockers = $this->getBlockers($object, $plan, $build);
|
||||
while (count($blockers) > 0) {
|
||||
if ($log === null) {
|
||||
$log = $build->createLog($build_target, "waiting", "blockers");
|
||||
$log_start = $log->start();
|
||||
}
|
||||
|
||||
if ($blockers) {
|
||||
$log->append("Blocked by: ".implode(",", $blockers)."\n");
|
||||
|
||||
// TODO: This should fail temporarily instead after setting the target to
|
||||
// waiting, and thereby push the build into a waiting status.
|
||||
sleep(1);
|
||||
$blockers = $this->getBlockers($object, $plan, $build);
|
||||
}
|
||||
if ($log !== null) {
|
||||
$log->finalize($log_start);
|
||||
$log->finalize($log_start);
|
||||
|
||||
if ($blockers) {
|
||||
throw new PhabricatorWorkerYieldException(15);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,6 +80,10 @@ final class HarbormasterWaitForPreviousBuildStepImplementation
|
|||
->execute();
|
||||
$buildable_phids = mpull($buildables, 'getPHID');
|
||||
|
||||
if (!$buildable_phids) {
|
||||
return array();
|
||||
}
|
||||
|
||||
$builds = id(new HarbormasterBuildQuery())
|
||||
->setViewer(PhabricatorUser::getOmnipotentUser())
|
||||
->withBuildablePHIDs($buildable_phids)
|
||||
|
@ -101,4 +98,5 @@ final class HarbormasterWaitForPreviousBuildStepImplementation
|
|||
|
||||
return $blockers;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -52,6 +52,10 @@ final class HarbormasterTargetWorker extends HarbormasterWorker {
|
|||
|
||||
$target->setTargetStatus($next_status);
|
||||
$target->save();
|
||||
} catch (PhabricatorWorkerYieldException $ex) {
|
||||
// If the target wants to yield, let that escape without further
|
||||
// processing. We'll resume after the task retries.
|
||||
throw $ex;
|
||||
} catch (Exception $ex) {
|
||||
phlog($ex);
|
||||
|
||||
|
|
|
@ -21,6 +21,8 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon {
|
|||
if ($ex) {
|
||||
if ($ex instanceof PhabricatorWorkerPermanentFailureException) {
|
||||
$this->log("Task {$id} failed permanently.");
|
||||
} else if ($ex instanceof PhabricatorWorkerYieldException) {
|
||||
$this->log(pht('Task %s yielded.', $id));
|
||||
} else {
|
||||
$this->log("Task {$id} failed!");
|
||||
throw new PhutilProxyException(
|
||||
|
|
|
@ -93,7 +93,21 @@ abstract class PhabricatorWorker {
|
|||
if (self::$runAllTasksInProcess) {
|
||||
// Do the work in-process.
|
||||
$worker = newv($task_class, array($data));
|
||||
$worker->doWork();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
$worker->doWork();
|
||||
break;
|
||||
} catch (PhabricatorWorkerYieldException $ex) {
|
||||
phlog(
|
||||
pht(
|
||||
'In-process task "%s" yielded for %s seconds, sleeping...',
|
||||
$task_class,
|
||||
$ex->getDuration()));
|
||||
|
||||
sleep($ex->getDuration());
|
||||
}
|
||||
}
|
||||
|
||||
// Now, save a task row and immediately archive it so we can return an
|
||||
// object with a valid ID.
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
<?php
|
||||
|
||||
/**
|
||||
* Allows tasks to yield to other tasks.
|
||||
*
|
||||
* If a worker throws this exception while processing a task, the task will be
|
||||
* pushed toward the back of the queue and tried again later.
|
||||
*/
|
||||
final class PhabricatorWorkerYieldException extends Exception {
|
||||
|
||||
private $duration;
|
||||
|
||||
public function __construct($duration) {
|
||||
$this->duration = $duration;
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function getDuration() {
|
||||
return $this->duration;
|
||||
}
|
||||
|
||||
}
|
|
@ -133,6 +133,16 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
|
|||
PhabricatorWorkerArchiveTask::RESULT_FAILURE,
|
||||
0);
|
||||
$result->setExecutionException($ex);
|
||||
} catch (PhabricatorWorkerYieldException $ex) {
|
||||
$this->setExecutionException($ex);
|
||||
|
||||
$retry = $ex->getDuration();
|
||||
$retry = max($retry, 5);
|
||||
|
||||
// NOTE: As a side effect, this saves the object.
|
||||
$this->setLeaseDuration($retry);
|
||||
|
||||
$result = $this;
|
||||
} catch (Exception $ex) {
|
||||
$this->setExecutionException($ex);
|
||||
$this->setFailureCount($this->getFailureCount() + 1);
|
||||
|
|
Loading…
Reference in a new issue