mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-10 08:52:39 +01:00
Make task queue more robust against long-running tasks
Summary: See discussion in D8773. Three small adjustments which should help prevent this kind of issue: - When queueing followup tasks, hold them on the worker until we finish the task, then queue them only if the work was successful. - Increase the default lease time from 60 seconds to 2 hours. Although most tasks finish in far fewer than 60 seconds, the daemons are generally stable nowadays and these short leases don't serve much of a purpose. I think they also date from an era where lease expiry and failure were less clearly distinguished. - Increase the default wait-after-failure from 60 seconds to 5 minutes. This largely dates from the MetaMTA era, where Facebook ran services with high failure rates and it was appropriate to repeatedly hammer them until things went through. In modern infrastructure, such failures are rare. Test Plan: - Verified that tasks queued properly after the main task was updated. - Verified that leases default to 7200 seconds. - Intentionally failed a task and verified default 300 second wait before retry. - Removed all default leases shorter than 7200 seconds (there was only one). - Checked all the wait before retry implementations for anything much shorter than 5 minutes (they all seem reasonable). Reviewers: btrahan, sowedance Reviewed By: sowedance Subscribers: epriestley Differential Revision: https://secure.phabricator.com/D8774
This commit is contained in:
parent
6a4f126000
commit
cb545856a9
11 changed files with 59 additions and 21 deletions
|
@ -7,7 +7,7 @@ final class FeedPublisherWorker extends FeedPushWorker {
|
||||||
|
|
||||||
$uris = PhabricatorEnv::getEnvConfig('feed.http-hooks');
|
$uris = PhabricatorEnv::getEnvConfig('feed.http-hooks');
|
||||||
foreach ($uris as $uri) {
|
foreach ($uris as $uri) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'FeedPublisherHTTPWorker',
|
'FeedPublisherHTTPWorker',
|
||||||
array(
|
array(
|
||||||
'key' => $story->getChronologicalKey(),
|
'key' => $story->getChronologicalKey(),
|
||||||
|
@ -27,7 +27,7 @@ final class FeedPublisherWorker extends FeedPushWorker {
|
||||||
if (!$worker->isEnabled()) {
|
if (!$worker->isEnabled()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
get_class($worker),
|
get_class($worker),
|
||||||
array(
|
array(
|
||||||
'key' => $story->getChronologicalKey(),
|
'key' => $story->getChronologicalKey(),
|
||||||
|
|
|
@ -8,7 +8,7 @@ final class HarbormasterTargetWorker extends HarbormasterWorker {
|
||||||
public function getRequiredLeaseTime() {
|
public function getRequiredLeaseTime() {
|
||||||
// This worker performs actual build work, which may involve a long wait
|
// This worker performs actual build work, which may involve a long wait
|
||||||
// on external systems.
|
// on external systems.
|
||||||
return 60 * 60 * 24;
|
return phutil_units('24 hours in seconds');
|
||||||
}
|
}
|
||||||
|
|
||||||
private function loadBuildTarget() {
|
private function loadBuildTarget() {
|
||||||
|
|
|
@ -5,7 +5,7 @@ final class PhabricatorRepositoryCommitHeraldWorker
|
||||||
|
|
||||||
public function getRequiredLeaseTime() {
|
public function getRequiredLeaseTime() {
|
||||||
// Herald rules may take a long time to process.
|
// Herald rules may take a long time to process.
|
||||||
return 4 * 60 * 60;
|
return phutil_units('4 hours in seconds');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function parseCommit(
|
public function parseCommit(
|
||||||
|
|
|
@ -3,11 +3,6 @@
|
||||||
final class PhabricatorRepositoryCommitOwnersWorker
|
final class PhabricatorRepositoryCommitOwnersWorker
|
||||||
extends PhabricatorRepositoryCommitParserWorker {
|
extends PhabricatorRepositoryCommitParserWorker {
|
||||||
|
|
||||||
public function getRequiredLeaseTime() {
|
|
||||||
// Commit owner parser may take a while to process for huge commits.
|
|
||||||
return 60 * 60;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected function parseCommit(
|
protected function parseCommit(
|
||||||
PhabricatorRepository $repository,
|
PhabricatorRepository $repository,
|
||||||
PhabricatorRepositoryCommit $commit) {
|
PhabricatorRepositoryCommit $commit) {
|
||||||
|
@ -18,7 +13,7 @@ final class PhabricatorRepositoryCommitOwnersWorker
|
||||||
PhabricatorRepositoryCommit::IMPORTED_OWNERS);
|
PhabricatorRepositoryCommit::IMPORTED_OWNERS);
|
||||||
|
|
||||||
if ($this->shouldQueueFollowupTasks()) {
|
if ($this->shouldQueueFollowupTasks()) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'PhabricatorRepositoryCommitHeraldWorker',
|
'PhabricatorRepositoryCommitHeraldWorker',
|
||||||
array(
|
array(
|
||||||
'commitID' => $commit->getID(),
|
'commitID' => $commit->getID(),
|
||||||
|
|
|
@ -6,7 +6,7 @@ abstract class PhabricatorRepositoryCommitChangeParserWorker
|
||||||
public function getRequiredLeaseTime() {
|
public function getRequiredLeaseTime() {
|
||||||
// It can take a very long time to parse commits; some commits in the
|
// It can take a very long time to parse commits; some commits in the
|
||||||
// Facebook repository affect many millions of paths. Acquire 24h leases.
|
// Facebook repository affect many millions of paths. Acquire 24h leases.
|
||||||
return 60 * 60 * 24;
|
return phutil_units('24 hours in seconds');
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected function parseCommitChanges(
|
abstract protected function parseCommitChanges(
|
||||||
|
@ -98,7 +98,7 @@ abstract class PhabricatorRepositoryCommitChangeParserWorker
|
||||||
|
|
||||||
PhabricatorOwnersPackagePathValidator::updateOwnersPackagePaths($commit);
|
PhabricatorOwnersPackagePathValidator::updateOwnersPackagePaths($commit);
|
||||||
if ($this->shouldQueueFollowupTasks()) {
|
if ($this->shouldQueueFollowupTasks()) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'PhabricatorRepositoryCommitOwnersWorker',
|
'PhabricatorRepositoryCommitOwnersWorker',
|
||||||
array(
|
array(
|
||||||
'commitID' => $commit->getID(),
|
'commitID' => $commit->getID(),
|
||||||
|
|
|
@ -15,7 +15,7 @@ final class PhabricatorRepositoryGitCommitMessageParserWorker
|
||||||
$this->updateCommitData($ref);
|
$this->updateCommitData($ref);
|
||||||
|
|
||||||
if ($this->shouldQueueFollowupTasks()) {
|
if ($this->shouldQueueFollowupTasks()) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'PhabricatorRepositoryGitCommitChangeParserWorker',
|
'PhabricatorRepositoryGitCommitChangeParserWorker',
|
||||||
array(
|
array(
|
||||||
'commitID' => $commit->getID(),
|
'commitID' => $commit->getID(),
|
||||||
|
|
|
@ -15,7 +15,7 @@ final class PhabricatorRepositoryMercurialCommitMessageParserWorker
|
||||||
$this->updateCommitData($ref);
|
$this->updateCommitData($ref);
|
||||||
|
|
||||||
if ($this->shouldQueueFollowupTasks()) {
|
if ($this->shouldQueueFollowupTasks()) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'PhabricatorRepositoryMercurialCommitChangeParserWorker',
|
'PhabricatorRepositoryMercurialCommitChangeParserWorker',
|
||||||
array(
|
array(
|
||||||
'commitID' => $commit->getID(),
|
'commitID' => $commit->getID(),
|
||||||
|
|
|
@ -15,7 +15,7 @@ final class PhabricatorRepositorySvnCommitMessageParserWorker
|
||||||
$this->updateCommitData($ref);
|
$this->updateCommitData($ref);
|
||||||
|
|
||||||
if ($this->shouldQueueFollowupTasks()) {
|
if ($this->shouldQueueFollowupTasks()) {
|
||||||
PhabricatorWorker::scheduleTask(
|
$this->queueTask(
|
||||||
'PhabricatorRepositorySvnCommitChangeParserWorker',
|
'PhabricatorRepositorySvnCommitChangeParserWorker',
|
||||||
array(
|
array(
|
||||||
'commitID' => $commit->getID(),
|
'commitID' => $commit->getID(),
|
||||||
|
|
|
@ -9,6 +9,7 @@ abstract class PhabricatorWorker {
|
||||||
|
|
||||||
private $data;
|
private $data;
|
||||||
private static $runAllTasksInProcess = false;
|
private static $runAllTasksInProcess = false;
|
||||||
|
private $queuedTasks = array();
|
||||||
|
|
||||||
|
|
||||||
/* -( Configuring Retries and Failures )----------------------------------- */
|
/* -( Configuring Retries and Failures )----------------------------------- */
|
||||||
|
@ -17,13 +18,13 @@ abstract class PhabricatorWorker {
|
||||||
/**
|
/**
|
||||||
* Return the number of seconds this worker needs hold a lease on the task for
|
* Return the number of seconds this worker needs hold a lease on the task for
|
||||||
* while it performs work. For most tasks you can leave this at `null`, which
|
* while it performs work. For most tasks you can leave this at `null`, which
|
||||||
* will give you a short default lease (currently 60 seconds).
|
* will give you a default lease (currently 2 hours).
|
||||||
*
|
*
|
||||||
* For tasks which may take a very long time to complete, you should return
|
* For tasks which may take a very long time to complete, you should return
|
||||||
* an upper bound on the amount of time the task may require.
|
* an upper bound on the amount of time the task may require.
|
||||||
*
|
*
|
||||||
* @return int|null Number of seconds this task needs to remain leased for,
|
* @return int|null Number of seconds this task needs to remain leased for,
|
||||||
* or null for a default (currently 60 second) lease.
|
* or null for a default lease.
|
||||||
*
|
*
|
||||||
* @task config
|
* @task config
|
||||||
*/
|
*/
|
||||||
|
@ -194,4 +195,29 @@ abstract class PhabricatorWorker {
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue a task to be executed after this one suceeds.
|
||||||
|
*
|
||||||
|
* The followup task will be queued only if this task completes cleanly.
|
||||||
|
*
|
||||||
|
* @param string Task class to queue.
|
||||||
|
* @param array Data for the followup task.
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
protected function queueTask($class, array $data) {
|
||||||
|
$this->queuedTasks[] = array($class, $data);
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get tasks queued as followups by @{method:queueTask}.
|
||||||
|
*
|
||||||
|
* @return list<pair<string, wild>> Queued task specifications.
|
||||||
|
*/
|
||||||
|
public function getQueuedTasks() {
|
||||||
|
return $this->queuedTasks;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,11 +10,17 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
||||||
const PHASE_UNLEASED = 'unleased';
|
const PHASE_UNLEASED = 'unleased';
|
||||||
const PHASE_EXPIRED = 'expired';
|
const PHASE_EXPIRED = 'expired';
|
||||||
|
|
||||||
const DEFAULT_LEASE_DURATION = 60; // Seconds
|
|
||||||
|
|
||||||
private $ids;
|
private $ids;
|
||||||
private $limit;
|
private $limit;
|
||||||
|
|
||||||
|
public static function getDefaultWaitBeforeRetry() {
|
||||||
|
return phutil_units('5 minutes in seconds');
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getDefaultLeaseDuration() {
|
||||||
|
return phutil_units('2 hours in seconds');
|
||||||
|
}
|
||||||
|
|
||||||
public function withIDs(array $ids) {
|
public function withIDs(array $ids) {
|
||||||
$this->ids = $ids;
|
$this->ids = $ids;
|
||||||
return $this;
|
return $this;
|
||||||
|
@ -78,7 +84,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
||||||
%Q',
|
%Q',
|
||||||
$task_table->getTableName(),
|
$task_table->getTableName(),
|
||||||
$lease_ownership_name,
|
$lease_ownership_name,
|
||||||
self::DEFAULT_LEASE_DURATION,
|
self::getDefaultLeaseDuration(),
|
||||||
$this->buildUpdateWhereClause($conn_w, $phase, $rows));
|
$this->buildUpdateWhereClause($conn_w, $phase, $rows));
|
||||||
|
|
||||||
$leased += $conn_w->getAffectedRows();
|
$leased += $conn_w->getAffectedRows();
|
||||||
|
|
|
@ -100,6 +100,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
|
||||||
// to release the lease otherwise.
|
// to release the lease otherwise.
|
||||||
$this->checkLease();
|
$this->checkLease();
|
||||||
|
|
||||||
|
$did_succeed = false;
|
||||||
try {
|
try {
|
||||||
$worker = $this->getWorkerInstance();
|
$worker = $this->getWorkerInstance();
|
||||||
|
|
||||||
|
@ -126,6 +127,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
|
||||||
$result = $this->archiveTask(
|
$result = $this->archiveTask(
|
||||||
PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
|
PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
|
||||||
$duration);
|
$duration);
|
||||||
|
$did_succeed = true;
|
||||||
} catch (PhabricatorWorkerPermanentFailureException $ex) {
|
} catch (PhabricatorWorkerPermanentFailureException $ex) {
|
||||||
$result = $this->archiveTask(
|
$result = $this->archiveTask(
|
||||||
PhabricatorWorkerArchiveTask::RESULT_FAILURE,
|
PhabricatorWorkerArchiveTask::RESULT_FAILURE,
|
||||||
|
@ -139,7 +141,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
|
||||||
$retry = $worker->getWaitBeforeRetry($this);
|
$retry = $worker->getWaitBeforeRetry($this);
|
||||||
$retry = coalesce(
|
$retry = coalesce(
|
||||||
$retry,
|
$retry,
|
||||||
PhabricatorWorkerLeaseQuery::DEFAULT_LEASE_DURATION);
|
PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry());
|
||||||
|
|
||||||
// NOTE: As a side effect, this saves the object.
|
// NOTE: As a side effect, this saves the object.
|
||||||
$this->setLeaseDuration($retry);
|
$this->setLeaseDuration($retry);
|
||||||
|
@ -147,6 +149,15 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
|
||||||
$result = $this;
|
$result = $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: If this throws, we don't want it to cause the task to fail again,
|
||||||
|
// so execute it out here and just let the exception escape.
|
||||||
|
if ($did_succeed) {
|
||||||
|
foreach ($worker->getQueuedTasks() as $task) {
|
||||||
|
list($class, $data) = $task;
|
||||||
|
PhabricatorWorker::scheduleTask($class, $data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return $result;
|
return $result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue