From 88fad90c1c0a9bc15a5131c60fa811d0f6dfb4c1 Mon Sep 17 00:00:00 2001 From: epriestley Date: Thu, 1 Nov 2012 11:30:16 -0700 Subject: [PATCH] Move task leasing to a dedicated query Summary: This simplifies the fairly thorny logic of leasing tasks a bit. I'm planning to introduce another callsite shortly for Drydock. Test Plan: Ran `bin/phd debug taskmaster`, observed sensible queries and correct operation. Reviewers: btrahan Reviewed By: btrahan CC: aran Maniphest Tasks: T2015 Differential Revision: https://secure.phabricator.com/D3855 --- src/__phutil_library_map__.php | 2 + .../workers/PhabricatorTaskmasterDaemon.php | 64 +------- .../query/PhabricatorWorkerLeaseQuery.php | 154 ++++++++++++++++++ 3 files changed, 161 insertions(+), 59 deletions(-) create mode 100644 src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php index 18852344dd..2443dda263 100644 --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -1143,6 +1143,7 @@ phutil_register_library_map(array( 'PhabricatorWorkerActiveTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php', 'PhabricatorWorkerArchiveTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php', 'PhabricatorWorkerDAO' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerDAO.php', + 'PhabricatorWorkerLeaseQuery' => 'infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php', 'PhabricatorWorkerTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php', 'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTaskData.php', 'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php', @@ -2305,6 +2306,7 @@ phutil_register_library_map(array( 'PhabricatorWorkerActiveTask' => 'PhabricatorWorkerTask', 'PhabricatorWorkerArchiveTask' => 'PhabricatorWorkerTask', 'PhabricatorWorkerDAO' => 'PhabricatorLiskDAO', + 'PhabricatorWorkerLeaseQuery' => 'PhabricatorQuery', 'PhabricatorWorkerTask' => 'PhabricatorWorkerDAO', 'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO', 'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController', diff --git a/src/infrastructure/daemon/workers/PhabricatorTaskmasterDaemon.php b/src/infrastructure/daemon/workers/PhabricatorTaskmasterDaemon.php index 973e2c0dca..3ceae21f6c 100644 --- a/src/infrastructure/daemon/workers/PhabricatorTaskmasterDaemon.php +++ b/src/infrastructure/daemon/workers/PhabricatorTaskmasterDaemon.php @@ -19,59 +19,13 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon { public function run() { - $lease_ownership_name = $this->getLeaseOwnershipName(); - - $task_table = new PhabricatorWorkerActiveTask(); - $taskdata_table = new PhabricatorWorkerTaskData(); - $sleep = 0; do { - $this->log('Dequeuing a task...'); - - $conn_w = $task_table->establishConnection('w'); - queryfx( - $conn_w, - 'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15 - WHERE leaseOwner IS NULL LIMIT 1', - $task_table->getTableName(), - $lease_ownership_name); - $rows = $conn_w->getAffectedRows(); - - if (!$rows) { - $this->log('No unleased tasks. Dequeuing an expired lease...'); - queryfx( - $conn_w, - 'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15 - WHERE leaseExpires < UNIX_TIMESTAMP() LIMIT 1', - $task_table->getTableName(), - $lease_ownership_name); - $rows = $conn_w->getAffectedRows(); - } - - if ($rows) { - $data = queryfx_all( - $conn_w, - 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime - FROM %T task LEFT JOIN %T taskdata - ON taskdata.id = task.dataID - WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() - LIMIT 1', - $task_table->getTableName(), - $taskdata_table->getTableName(), - $lease_ownership_name); - $tasks = $task_table->loadAllFromArray($data); - $tasks = mpull($tasks, null, 'getID'); - - $task_data = array(); - foreach ($data as $row) { - $tasks[$row['id']]->setServerTime($row['_serverTime']); - if ($row['_taskData']) { - $task_data[$row['id']] = json_decode($row['_taskData'], true); - } else { - $task_data[$row['id']] = null; - } - } + $tasks = id(new PhabricatorWorkerLeaseQuery()) + ->setLimit(1) + ->execute(); + if ($tasks) { foreach ($tasks as $task) { $id = $task->getID(); $class = $task->getTaskClass(); @@ -84,7 +38,7 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon { // TODO: We should detect if we acquired a task with an excessive // failure count and fail it permanently. - $data = idx($task_data, $task->getID()); + $data = $task->getData(); try { if (!class_exists($class) || !is_subclass_of($class, 'PhabricatorWorker')) { @@ -124,12 +78,4 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon { } while (true); } - private function getLeaseOwnershipName() { - static $name = null; - if ($name === null) { - $name = getmypid().':'.time().':'.php_uname('n'); - } - return $name; - } - } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php new file mode 100644 index 0000000000..a1c060225a --- /dev/null +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -0,0 +1,154 @@ +ids = $ids; + return $this; + } + + public function setLimit($limit) { + $this->limit = $limit; + return $this; + } + + public function execute() { + if (!$this->limit) { + throw new Exception("You must setLimit() when leasing tasks."); + } + + $task_table = new PhabricatorWorkerActiveTask(); + $taskdata_table = new PhabricatorWorkerTaskData(); + $lease_ownership_name = $this->getLeaseOwnershipName(); + + $conn_w = $task_table->establishConnection('w'); + + // Try to satisfy the request from new, unleased tasks first. If we don't + // find enough tasks, try tasks with expired leases (i.e., tasks which have + // previously failed). + + $phases = array( + self::PHASE_UNLEASED, + self::PHASE_EXPIRED, + ); + $limit = $this->limit; + + $leased = 0; + foreach ($phases as $phase) { + queryfx( + $conn_w, + 'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15 + %Q %Q %Q', + $task_table->getTableName(), + $lease_ownership_name, + $this->buildWhereClause($conn_w, $phase), + $this->buildOrderClause($conn_w), + $this->buildLimitClause($conn_w, $limit - $leased)); + + $leased += $conn_w->getAffectedRows(); + if ($leased == $limit) { + break; + } + } + + if (!$leased) { + return array(); + } + + $data = queryfx_all( + $conn_w, + 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime + FROM %T task LEFT JOIN %T taskdata + ON taskdata.id = task.dataID + WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() + %Q %Q', + $task_table->getTableName(), + $taskdata_table->getTableName(), + $lease_ownership_name, + $this->buildOrderClause($conn_w), + $this->buildLimitClause($conn_w, $limit)); + + $tasks = $task_table->loadAllFromArray($data); + $tasks = mpull($tasks, null, 'getID'); + + foreach ($data as $row) { + $tasks[$row['id']]->setServerTime($row['_serverTime']); + if ($row['_taskData']) { + $task_data = json_decode($row['_taskData'], true); + } else { + $task_data = null; + } + $tasks[$row['id']]->setData($task_data); + } + + return $tasks; + } + + private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { + $where = array(); + + switch ($phase) { + case self::PHASE_UNLEASED: + $where[] = 'leaseOwner IS NULL'; + break; + case self::PHASE_Expired: + $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; + break; + default: + throw new Exception("Unknown phase '{$phase}'!"); + } + + if ($this->ids) { + $where[] = qsprintf( + $conn_w, + 'id IN (%Ld)', + $this->ids); + } + + return $this->formatWhereClause($where); + } + + private function buildOrderClause(AphrontDatabaseConnection $conn_w) { + return qsprintf($conn_w, 'ORDER BY id ASC'); + } + + private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { + return qsprintf($conn_w, 'LIMIT %d', $limit); + } + + private function getLeaseOwnershipName() { + static $name = null; + if ($name === null) { + $name = getmypid().':'.time().':'.php_uname('n'); + } + return $name; + } + +}