mirror of
https://we.phorge.it/source/phorge.git
synced 2024-12-19 12:00:55 +01:00
Fix daemon task queue to respect task priority
Summary: Fixes an issue with T5336 / D9871. We did 99% of the work here but didn't actually turn on the priority sorting. The unit test passed by default, which didn't catch this. - Fix the unit test (it failed). - Fix the query (test now passes). - Add a "Next in Queue" element to the UI to make this kind of thing easier to spot/understand. Test Plan: Ran unit test. Viewed "Next in Queue". Queued some tasks, flushed the queue. Web UI tracked the state sensibly. Reviewers: joshuaspence, btrahan Reviewed By: btrahan Subscribers: cburroughs, epriestley Differential Revision: https://secure.phabricator.com/D10766
This commit is contained in:
parent
915d9a52f0
commit
914b8bb32c
3 changed files with 121 additions and 66 deletions
|
@ -126,58 +126,21 @@ final class PhabricatorDaemonConsoleController
|
|||
$daemon_table->setUser($user);
|
||||
$daemon_table->setDaemonLogs($logs);
|
||||
|
||||
$tasks = id(new PhabricatorWorkerActiveTask())->loadAllWhere(
|
||||
'leaseOwner IS NOT NULL');
|
||||
|
||||
$rows = array();
|
||||
foreach ($tasks as $task) {
|
||||
$rows[] = array(
|
||||
$task->getID(),
|
||||
$task->getTaskClass(),
|
||||
$task->getLeaseOwner(),
|
||||
$task->getLeaseExpires() - time(),
|
||||
$task->getPriority(),
|
||||
$task->getFailureCount(),
|
||||
phutil_tag(
|
||||
'a',
|
||||
array(
|
||||
'href' => '/daemon/task/'.$task->getID().'/',
|
||||
'class' => 'button small grey',
|
||||
),
|
||||
pht('View Task')),
|
||||
);
|
||||
}
|
||||
|
||||
$daemon_panel = new PHUIObjectBoxView();
|
||||
$daemon_panel->setHeaderText(pht('Active Daemons'));
|
||||
$daemon_panel->appendChild($daemon_table);
|
||||
|
||||
$leased_table = new AphrontTableView($rows);
|
||||
$leased_table->setHeaders(
|
||||
array(
|
||||
pht('ID'),
|
||||
pht('Class'),
|
||||
pht('Owner'),
|
||||
pht('Expires'),
|
||||
pht('Priority'),
|
||||
pht('Failures'),
|
||||
'',
|
||||
));
|
||||
$leased_table->setColumnClasses(
|
||||
array(
|
||||
'n',
|
||||
'wide',
|
||||
'',
|
||||
'',
|
||||
'n',
|
||||
'n',
|
||||
'action',
|
||||
));
|
||||
$leased_table->setNoDataString(pht('No tasks are leased by workers.'));
|
||||
|
||||
$leased_panel = new PHUIObjectBoxView();
|
||||
$leased_panel->setHeaderText(pht('Leased Tasks'));
|
||||
$leased_panel->appendChild($leased_table);
|
||||
$tasks = id(new PhabricatorWorkerActiveTask())->loadAllWhere(
|
||||
'leaseOwner IS NOT NULL');
|
||||
|
||||
$tasks_table = $this->renderTasksTable(
|
||||
$tasks,
|
||||
pht('No tasks are leased by workers.'));
|
||||
|
||||
$leased_panel = id(new PHUIObjectBoxView())
|
||||
->setHeaderText(pht('Leased Tasks'))
|
||||
->appendChild($tasks_table);
|
||||
|
||||
$task_table = new PhabricatorWorkerActiveTask();
|
||||
$queued = queryfx_all(
|
||||
|
@ -211,6 +174,16 @@ final class PhabricatorDaemonConsoleController
|
|||
$queued_panel->setHeaderText(pht('Queued Tasks'));
|
||||
$queued_panel->appendChild($queued_table);
|
||||
|
||||
$upcoming = id(new PhabricatorWorkerLeaseQuery())
|
||||
->setLimit(10)
|
||||
->setSkipLease(true)
|
||||
->execute();
|
||||
|
||||
$upcoming_panel = id(new PHUIObjectBoxView())
|
||||
->setHeaderText(pht('Next In Queue'))
|
||||
->appendChild(
|
||||
$this->renderTasksTable($upcoming, pht('Task queue is empty.')));
|
||||
|
||||
$crumbs = $this->buildApplicationCrumbs();
|
||||
$crumbs->addTextCrumb(pht('Console'));
|
||||
|
||||
|
@ -223,6 +196,7 @@ final class PhabricatorDaemonConsoleController
|
|||
$daemon_panel,
|
||||
$queued_panel,
|
||||
$leased_panel,
|
||||
$upcoming_panel,
|
||||
));
|
||||
|
||||
return $this->buildApplicationPage(
|
||||
|
@ -233,4 +207,52 @@ final class PhabricatorDaemonConsoleController
|
|||
));
|
||||
}
|
||||
|
||||
private function renderTasksTable(array $tasks, $nodata) {
|
||||
$rows = array();
|
||||
foreach ($tasks as $task) {
|
||||
$rows[] = array(
|
||||
$task->getID(),
|
||||
$task->getTaskClass(),
|
||||
$task->getLeaseOwner(),
|
||||
$task->getLeaseExpires()
|
||||
? phutil_format_relative_time($task->getLeaseExpires() - time())
|
||||
: '-',
|
||||
$task->getPriority(),
|
||||
$task->getFailureCount(),
|
||||
phutil_tag(
|
||||
'a',
|
||||
array(
|
||||
'href' => '/daemon/task/'.$task->getID().'/',
|
||||
'class' => 'button small grey',
|
||||
),
|
||||
pht('View Task')),
|
||||
);
|
||||
}
|
||||
|
||||
$table = new AphrontTableView($rows);
|
||||
$table->setHeaders(
|
||||
array(
|
||||
pht('ID'),
|
||||
pht('Class'),
|
||||
pht('Owner'),
|
||||
pht('Expires'),
|
||||
pht('Priority'),
|
||||
pht('Failures'),
|
||||
'',
|
||||
));
|
||||
$table->setColumnClasses(
|
||||
array(
|
||||
'n',
|
||||
'wide',
|
||||
'',
|
||||
'',
|
||||
'n',
|
||||
'n',
|
||||
'action',
|
||||
));
|
||||
$table->setNoDataString($nodata);
|
||||
|
||||
return $table;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -161,17 +161,17 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
|
|||
}
|
||||
|
||||
public function testLeasedIsHighestPriority() {
|
||||
$task1 = $this->scheduleTask(array(), 2);
|
||||
$task1 = $this->scheduleTask(array(), 1);
|
||||
$task2 = $this->scheduleTask(array(), 1);
|
||||
$task3 = $this->scheduleTask(array(), 1);
|
||||
$task3 = $this->scheduleTask(array(), 2);
|
||||
|
||||
$this->expectNextLease(
|
||||
$task1,
|
||||
$task3,
|
||||
'Tasks with a higher priority should be scheduled first.');
|
||||
$this->expectNextLease(
|
||||
$task2,
|
||||
$task1,
|
||||
'Tasks with the same priority should be FIFO.');
|
||||
$this->expectNextLease($task3);
|
||||
$this->expectNextLease($task2);
|
||||
}
|
||||
|
||||
private function expectNextLease($task, $message = null) {
|
||||
|
|
|
@ -10,6 +10,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
|
||||
private $ids;
|
||||
private $limit;
|
||||
private $skipLease;
|
||||
|
||||
public static function getDefaultWaitBeforeRetry() {
|
||||
return phutil_units('5 minutes in seconds');
|
||||
|
@ -19,6 +20,20 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
return phutil_units('2 hours in seconds');
|
||||
}
|
||||
|
||||
/**
|
||||
* Set this flag to select tasks from the top of the queue without leasing
|
||||
* them.
|
||||
*
|
||||
* This can be used to show which tasks are coming up next without altering
|
||||
* the queue's behavior.
|
||||
*
|
||||
* @param bool True to skip the lease acquisition step.
|
||||
*/
|
||||
public function setSkipLease($skip) {
|
||||
$this->skipLease = $skip;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function withIDs(array $ids) {
|
||||
$this->ids = $ids;
|
||||
return $this;
|
||||
|
@ -51,6 +66,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
$limit = $this->limit;
|
||||
|
||||
$leased = 0;
|
||||
$task_ids = array();
|
||||
foreach ($phases as $phase) {
|
||||
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
|
||||
// goes very, very slowly. The `ORDER BY` triggers this, although we get
|
||||
|
@ -74,6 +90,10 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
// total runtime, so keep it simple for the moment.
|
||||
|
||||
if ($rows) {
|
||||
if ($this->skipLease) {
|
||||
$leased += count($rows);
|
||||
$task_ids += array_fuse(ipull($rows, 'id'));
|
||||
} else {
|
||||
queryfx(
|
||||
$conn_w,
|
||||
'UPDATE %T task
|
||||
|
@ -85,6 +105,8 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
$this->buildUpdateWhereClause($conn_w, $phase, $rows));
|
||||
|
||||
$leased += $conn_w->getAffectedRows();
|
||||
}
|
||||
|
||||
if ($leased == $limit) {
|
||||
break;
|
||||
}
|
||||
|
@ -95,16 +117,27 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
return array();
|
||||
}
|
||||
|
||||
if ($this->skipLease) {
|
||||
$selection_condition = qsprintf(
|
||||
$conn_w,
|
||||
'task.id IN (%Ld)',
|
||||
$task_ids);
|
||||
} else {
|
||||
$selection_condition = qsprintf(
|
||||
$conn_w,
|
||||
'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()',
|
||||
$lease_ownership_name);
|
||||
}
|
||||
|
||||
$data = queryfx_all(
|
||||
$conn_w,
|
||||
'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime
|
||||
FROM %T task LEFT JOIN %T taskdata
|
||||
ON taskdata.id = task.dataID
|
||||
WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()
|
||||
%Q %Q',
|
||||
WHERE %Q %Q %Q',
|
||||
$task_table->getTableName(),
|
||||
$taskdata_table->getTableName(),
|
||||
$lease_ownership_name,
|
||||
$selection_condition,
|
||||
$this->buildOrderClause($conn_w, $phase),
|
||||
$this->buildLimitClause($conn_w, $limit));
|
||||
|
||||
|
@ -183,7 +216,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
|
|||
case self::PHASE_UNLEASED:
|
||||
// When selecting new tasks, we want to consume them in order of
|
||||
// decreasing priority (and then FIFO).
|
||||
return qsprintf($conn_w, 'ORDER BY id ASC');
|
||||
return qsprintf($conn_w, 'ORDER BY priority DESC, id ASC');
|
||||
case self::PHASE_EXPIRED:
|
||||
// When selecting failed tasks, we want to consume them in roughly
|
||||
// FIFO order of their failures, which is not necessarily their original
|
||||
|
|
Loading…
Reference in a new issue