1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-11-22 06:42:42 +01:00

Allow worker tasks to have priorities

Summary: Fixes T5336. Currently, `PhabricatorWorkerLeaseQuery` is basically FIFO. It makes more sense for the queue to be a priority-queue, and to assign higher priorities to alerts (email and SMS).

Test Plan: Created dummy tasks in the queue (with different priorities). Verified that the priority field was set correctly in the DB and that the priority was shown on the `/daemon/` page. Started a `PhabricatorTaskmasterDaemon` and verified that the higher priority tasks were executed before lower priority tasks.

Reviewers: epriestley, #blessed_reviewers

Reviewed By: epriestley, #blessed_reviewers

Subscribers: epriestley, Korvin

Maniphest Tasks: T5336

Differential Revision: https://secure.phabricator.com/D9871
This commit is contained in:
Joshua Spence 2014-07-12 03:02:06 +10:00
parent 66a3abe058
commit 9a679bf374
15 changed files with 131 additions and 91 deletions

View file

@ -0,0 +1,11 @@
ALTER TABLE {$NAMESPACE}_worker.worker_activetask
ADD COLUMN priority int unsigned NOT NULL;
ALTER TABLE {$NAMESPACE}_worker.worker_activetask
ADD KEY (leaseOwner, priority, id);
ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
ADD COLUMN priority int unsigned NOT NULL;
ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
ADD KEY (leaseOwner, priority, id);

View file

@ -267,7 +267,10 @@ foreach ($commits as $commit) {
if ($all_from_repo && !$force_local) {
foreach ($classes as $class) {
PhabricatorWorker::scheduleTask($class, $spec);
PhabricatorWorker::scheduleTask(
$class,
$spec,
PhabricatorWorker::PRIORITY_IMPORT);
$commit_name = 'r'.$callsign.$commit->getCommitIdentifier();
echo " Queued '{$class}' for commit '{$commit_name}'.\n";

View file

@ -136,6 +136,7 @@ final class PhabricatorDaemonConsoleController
$task->getTaskClass(),
$task->getLeaseOwner(),
$task->getLeaseExpires() - time(),
$task->getPriority(),
$task->getFailureCount(),
phutil_tag(
'a',
@ -158,6 +159,7 @@ final class PhabricatorDaemonConsoleController
pht('Class'),
pht('Owner'),
pht('Expires'),
pht('Priority'),
pht('Failures'),
'',
));
@ -168,6 +170,7 @@ final class PhabricatorDaemonConsoleController
'',
'',
'n',
'n',
'action',
));
$leased_table->setNoDataString(pht('No tasks are leased by workers.'));

View file

@ -187,7 +187,8 @@ final class DiffusionCommitHookEngine extends Phobject {
'eventPHID' => $event->getPHID(),
'emailPHIDs' => array_values($this->emailPHIDs),
'info' => $this->loadCommitInfoForWorker($all_updates),
));
),
PhabricatorWorker::PRIORITY_ALERTS);
}
return 0;

View file

@ -49,7 +49,8 @@ final class PhabricatorMailManagementResendWorkflow
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
$message->getID());
$message->getID(),
PhabricatorWorker::PRIORITY_ALERTS);
$console->writeOut(
"Queued message #%d for resend.\n",

View file

@ -300,7 +300,8 @@ final class PhabricatorMetaMTAMail extends PhabricatorMetaMTADAO {
// Queue a task to send this mail.
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
$this->getID());
$this->getID(),
PhabricatorWorker::PRIORITY_ALERTS);
$this->saveTransaction();

View file

@ -7,7 +7,8 @@ final class PhabricatorSearchIndexer {
'PhabricatorSearchWorker',
array(
'documentPHID' => $phid,
));
),
PhabricatorWorker::PRIORITY_IMPORT);
}
public function indexDocumentByPHID($phid) {

View file

@ -9,6 +9,11 @@ abstract class PhabricatorWorker {
private static $runAllTasksInProcess = false;
private $queuedTasks = array();
const PRIORITY_ALERTS = 4000;
const PRIORITY_DEFAULT = 3000;
const PRIORITY_BULK = 2000;
const PRIORITY_IMPORT = 1000;
/* -( Configuring Retries and Failures )----------------------------------- */
@ -32,8 +37,8 @@ abstract class PhabricatorWorker {
/**
* Return the maximum number of times this task may be retried before it
* is considered permanently failed. By default, tasks retry indefinitely. You
* Return the maximum number of times this task may be retried before it is
* considered permanently failed. By default, tasks retry indefinitely. You
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
* immediate permanent failure.
*
@ -53,11 +58,11 @@ abstract class PhabricatorWorker {
* a short default retry period (currently 60 seconds).
*
* @param PhabricatorWorkerTask The task itself. This object is probably
* useful mostly to examine the failure
* count if you want to implement staggered
* retries, or to examine the execution
* exception if you want to react to
* different failures in different ways.
* useful mostly to examine the failure count
* if you want to implement staggered retries,
* or to examine the execution exception if
* you want to react to different failures in
* different ways.
* @return int|null Number of seconds to wait between retries,
* or null for a default retry period
* (currently 60 seconds).
@ -70,7 +75,6 @@ abstract class PhabricatorWorker {
abstract protected function doWork();
final public function __construct($data) {
$this->data = $data;
}
@ -83,10 +87,19 @@ abstract class PhabricatorWorker {
$this->doWork();
}
final public static function scheduleTask($task_class, $data) {
final public static function scheduleTask(
$task_class,
$data,
$priority = null) {
if ($priority === null) {
$priority = self::PRIORITY_DEFAULT;
}
$task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class)
->setData($data);
->setData($data)
->setPriority($priority);
if (self::$runAllTasksInProcess) {
// Do the work in-process.
@ -96,8 +109,8 @@ abstract class PhabricatorWorker {
try {
$worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) {
list($queued_class, $queued_data) = $queued_task;
self::scheduleTask($queued_class, $queued_data);
list($queued_class, $queued_data, $queued_priority) = $queued_task;
self::scheduleTask($queued_class, $queued_data, $queued_priority);
}
break;
} catch (PhabricatorWorkerYieldException $ex) {
@ -106,7 +119,6 @@ abstract class PhabricatorWorker {
'In-process task "%s" yielded for %s seconds, sleeping...',
$task_class,
$ex->getDuration()));
sleep($ex->getDuration());
}
}
@ -184,8 +196,7 @@ abstract class PhabricatorWorker {
foreach ($tasks as $task) {
if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) {
throw new Exception(
pht('Task %d failed!', $task->getID()));
throw new Exception(pht('Task %d failed!', $task->getID()));
}
}
}
@ -204,7 +215,7 @@ abstract class PhabricatorWorker {
self::$runAllTasksInProcess = $all;
}
protected function log($pattern /* $args */) {
final protected function log($pattern /* , ... */) {
$console = PhutilConsole::getConsole();
$argv = func_get_args();
call_user_func_array(array($console, 'writeLog'), $argv);
@ -219,10 +230,11 @@ abstract class PhabricatorWorker {
*
* @param string Task class to queue.
* @param array Data for the followup task.
* @param int|null Priority for the followup task.
* @return this
*/
protected function queueTask($class, array $data) {
$this->queuedTasks[] = array($class, $data);
final protected function queueTask($class, array $data, $priority = null) {
$this->queuedTasks[] = array($class, $data, $priority);
return $this;
}
@ -230,9 +242,9 @@ abstract class PhabricatorWorker {
/**
* Get tasks queued as followups by @{method:queueTask}.
*
* @return list<pair<string, wild>> Queued task specifications.
* @return list<tuple<string, wild, int|null>> Queued task specifications.
*/
public function getQueuedTasks() {
final public function getQueuedTasks() {
return $this->queuedTasks;
}

View file

@ -26,8 +26,7 @@ final class PhabricatorTestWorker extends PhabricatorWorker {
protected function doWork() {
switch (idx($this->getTaskData(), 'doWork')) {
case 'fail-temporary':
throw new Exception(
'Temporary failure!');
throw new Exception('Temporary failure!');
case 'fail-permanent':
throw new PhabricatorWorkerPermanentFailureException(
'Permanent failure!');

View file

@ -9,35 +9,30 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
}
public function testLeaseTask() {
// Leasing should work.
$task = $this->scheduleTask();
$this->expectNextLease($task);
$this->expectNextLease($task, 'Leasing should work.');
}
public function testMultipleLease() {
// We should not be able to lease a task multiple times.
$task = $this->scheduleTask();
$this->expectNextLease($task);
$this->expectNextLease(null);
$this->expectNextLease(
null,
'We should not be able to lease a task multiple times.');
}
public function testOldestFirst() {
// Older tasks should lease first, all else being equal.
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
$this->expectNextLease($task1);
$this->expectNextLease(
$task1,
'Older tasks should lease first, all else being equal.');
$this->expectNextLease($task2);
}
public function testNewBeforeLeased() {
// Tasks not previously leased should lease before previously leased tasks.
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@ -45,7 +40,10 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
$task1->setLeaseExpires(time() - 100000);
$task1->forceSaveWithoutLease();
$this->expectNextLease($task2);
$this->expectNextLease(
$task2,
'Tasks not previously leased should lease before previously '.
'leased tasks.');
$this->expectNextLease($task1);
}
@ -145,8 +143,6 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
}
public function testLeasedIsOldestFirst() {
// Tasks which expired earlier should lease first, all else being equal.
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@ -158,36 +154,59 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
$task2->setLeaseExpires(time() - 200000);
$task2->forceSaveWithoutLease();
$this->expectNextLease($task2);
$this->expectNextLease(
$task2,
'Tasks which expired earlier should lease first, all else being equal.');
$this->expectNextLease($task1);
}
private function expectNextLease($task) {
public function testLeasedIsHighestPriority() {
$task1 = $this->scheduleTask(array(), 2);
$task2 = $this->scheduleTask(array(), 1);
$task3 = $this->scheduleTask(array(), 1);
$this->expectNextLease(
$task1,
'Tasks with a higher priority should be scheduled first.');
$this->expectNextLease(
$task2,
'Tasks with the same priority should be FIFO.');
$this->expectNextLease($task3);
}
private function expectNextLease($task, $message = null) {
$leased = id(new PhabricatorWorkerLeaseQuery())
->setLimit(1)
->execute();
if ($task === null) {
$this->assertEqual(0, count($leased));
$this->assertEqual(0, count($leased), $message);
return null;
} else {
$this->assertEqual(1, count($leased));
$this->assertEqual(1, count($leased), $message);
$this->assertEqual(
(int)head($leased)->getID(),
(int)$task->getID());
(int)$task->getID(),
$message);
return head($leased);
}
}
private function scheduleAndExecuteTask(array $data = array()) {
$task = $this->scheduleTask($data);
private function scheduleAndExecuteTask(
array $data = array(),
$priority = null) {
$task = $this->scheduleTask($data, $priority);
$task = $this->expectNextLease($task);
$task = $task->executeTask();
return $task;
}
private function scheduleTask(array $data = array()) {
return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data);
private function scheduleTask(array $data = array(), $priority = null) {
return PhabricatorWorker::scheduleTask(
'PhabricatorTestWorker',
$data,
$priority);
}
}

View file

@ -52,7 +52,6 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
$leased = 0;
foreach ($phases as $phase) {
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
// goes very, very slowly. The `ORDER BY` triggers this, although we get
// the same apparent results without it. Without the ORDER BY, binary
@ -126,7 +125,6 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
}
private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
$where = array();
switch ($phase) {
@ -141,10 +139,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
}
if ($this->ids) {
$where[] = qsprintf(
$conn_w,
'id IN (%Ld)',
$this->ids);
$where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
}
return $this->formatWhereClause($where);
@ -162,13 +157,8 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
switch ($phase) {
case self::PHASE_UNLEASED:
$where[] = qsprintf(
$conn_w,
'leaseOwner IS NULL');
$where[] = qsprintf(
$conn_w,
'id IN (%Ld)',
ipull($rows, 'id'));
$where[] = qsprintf($conn_w, 'leaseOwner IS NULL');
$where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id'));
break;
case self::PHASE_EXPIRED:
$in = array();
@ -179,24 +169,20 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
$row['id'],
$row['leaseOwner']);
}
$where[] = qsprintf(
$conn_w,
'(%Q)',
implode(' OR ', $in));
$where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in));
break;
default:
throw new Exception("Unknown phase '{$phase}'!");
}
return $this->formatWhereClause($where);
}
private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
switch ($phase) {
case self::PHASE_UNLEASED:
// When selecting new tasks, we want to consume them in roughly
// FIFO order, so we order by the task ID.
// When selecting new tasks, we want to consume them in order of
// decreasing priority (and then FIFO).
return qsprintf($conn_w, 'ORDER BY id ASC');
case self::PHASE_EXPIRED:
// When selecting failed tasks, we want to consume them in roughly

View file

@ -86,6 +86,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
->setLeaseExpires($this->getLeaseExpires())
->setFailureCount($this->getFailureCount())
->setDataID($this->getDataID())
->setPriority($this->getPriority())
->setResult($result)
->setDuration($duration);
@ -164,12 +165,11 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) {
list($class, $data) = $task;
PhabricatorWorker::scheduleTask($class, $data);
PhabricatorWorker::scheduleTask($class, $data, $this->getPriority());
}
}
return $result;
}
}

View file

@ -11,8 +11,7 @@ final class PhabricatorWorkerArchiveTask extends PhabricatorWorkerTask {
public function save() {
if ($this->getID() === null) {
throw new Exception(
'Trying to archive a task with no ID.');
throw new Exception('Trying to archive a task with no ID.');
}
$other = new PhabricatorWorkerActiveTask();
@ -57,6 +56,7 @@ final class PhabricatorWorkerArchiveTask extends PhabricatorWorkerTask {
->setLeaseExpires(0)
->setFailureCount(0)
->setDataID($this->getDataID())
->setPriority($this->getPriority())
->insert();
$this->setDataID(null);

View file

@ -9,33 +9,34 @@ abstract class PhabricatorWorkerTask extends PhabricatorWorkerDAO {
protected $leaseExpires;
protected $failureCount;
protected $dataID;
protected $priority;
private $data;
private $executionException;
public function setExecutionException(Exception $execution_exception) {
final public function setExecutionException(Exception $execution_exception) {
$this->executionException = $execution_exception;
return $this;
}
public function getExecutionException() {
final public function getExecutionException() {
return $this->executionException;
}
public function setData($data) {
final public function setData($data) {
$this->data = $data;
return $this;
}
public function getData() {
final public function getData() {
return $this->data;
}
public function isArchived() {
final public function isArchived() {
return ($this instanceof PhabricatorWorkerArchiveTask);
}
public function getWorkerInstance() {
final public function getWorkerInstance() {
$id = $this->getID();
$class = $this->getTaskClass();

View file

@ -79,6 +79,8 @@ abstract class PhabricatorSMSImplementationAdapter {
'PhabricatorSMSDemultiplexWorker',
array(
'toNumbers' => $to_numbers,
'body' => $body));
'body' => $body,
),
PhabricatorWorker::PRIORITY_ALERTS);
}
}