diff --git a/resources/sql/patches/005.workers.sql b/resources/sql/patches/005.workers.sql new file mode 100644 index 0000000000..f3ddb00731 --- /dev/null +++ b/resources/sql/patches/005.workers.sql @@ -0,0 +1,20 @@ +create database phabricator_worker; + +create table phabricator_worker.worker_task ( + id int unsigned not null auto_increment primary key, + taskClass varchar(255) not null, + leaseOwner varchar(255), + leaseExpires int unsigned, + priority bigint unsigned not null, + failureCount int unsigned not null, + key(taskClass), + key(leaseOwner), + key(leaseExpires) +); + +create table phabricator_worker.worker_taskdata ( + id int unsigned not null auto_increment primary key, + taskID int unsigned not null, + data longblob not null, + unique key (taskID) +); diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php index 1d9cba1c31..65be1069ec 100644 --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -190,6 +190,8 @@ phutil_register_library_map(array( 'PhabricatorConduitMethodCallLog' => 'applications/conduit/storage/methodcalllog', 'PhabricatorController' => 'applications/base/controller/base', 'PhabricatorDaemon' => 'infrastructure/daemon/base', + 'PhabricatorDaemonConsoleController' => 'applications/daemon/controller/console', + 'PhabricatorDaemonController' => 'applications/daemon/controller/base', 'PhabricatorDirectoryCategory' => 'applications/directory/storage/category', 'PhabricatorDirectoryCategoryDeleteController' => 'applications/directory/controller/categorydelete', 'PhabricatorDirectoryCategoryEditController' => 'applications/directory/controller/categoryedit', @@ -214,6 +216,7 @@ phutil_register_library_map(array( 'PhabricatorFileURI' => 'applications/files/uri', 'PhabricatorFileUploadController' => 'applications/files/controller/upload', 'PhabricatorFileViewController' => 'applications/files/controller/view', + 'PhabricatorGoodForNothingWorker' => 'infrastructure/daemon/workers/worker/goodfornothing', 'PhabricatorHandleObjectSelectorDataView' => 'applications/phid/handle/view/selector', 'PhabricatorLiskDAO' => 'applications/base/storage/lisk', 'PhabricatorLoginController' => 'applications/auth/controller/login', @@ -300,11 +303,12 @@ phutil_register_library_map(array( 'PhabricatorSearchQuery' => 'applications/search/storage/query', 'PhabricatorSearchRelationship' => 'applications/search/constants/relationship', 'PhabricatorStandardPageView' => 'view/page/standard', - 'PhabricatorTimelineCursor' => 'applications/timeline/storage/cursor', - 'PhabricatorTimelineDAO' => 'applications/timeline/storage/base', - 'PhabricatorTimelineEvent' => 'applications/timeline/storage/event', - 'PhabricatorTimelineEventData' => 'applications/timeline/storage/eventdata', - 'PhabricatorTimelineIterator' => 'applications/timeline/cursor/iterator', + 'PhabricatorTaskmasterDaemon' => 'infrastructure/daemon/workers/taskmaster', + 'PhabricatorTimelineCursor' => 'infrastructure/daemon/timeline/storage/cursor', + 'PhabricatorTimelineDAO' => 'infrastructure/daemon/timeline/storage/base', + 'PhabricatorTimelineEvent' => 'infrastructure/daemon/timeline/storage/event', + 'PhabricatorTimelineEventData' => 'infrastructure/daemon/timeline/storage/eventdata', + 'PhabricatorTimelineIterator' => 'infrastructure/daemon/timeline/cursor/iterator', 'PhabricatorTypeaheadCommonDatasourceController' => 'applications/typeahead/controller/common', 'PhabricatorTypeaheadDatasourceController' => 'applications/typeahead/controller/base', 'PhabricatorUser' => 'applications/people/storage/user', @@ -312,6 +316,10 @@ phutil_register_library_map(array( 'PhabricatorUserOAuthInfo' => 'applications/people/storage/useroauthinfo', 'PhabricatorUserProfile' => 'applications/people/storage/profile', 'PhabricatorUserSettingsController' => 'applications/people/controller/settings', + 'PhabricatorWorker' => 'infrastructure/daemon/workers/worker', + 'PhabricatorWorkerDAO' => 'infrastructure/daemon/workers/storage/base', + 'PhabricatorWorkerTask' => 'infrastructure/daemon/workers/storage/task', + 'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/taskdata', 'PhabricatorXHProfController' => 'applications/xhprof/controller/base', 'PhabricatorXHProfProfileController' => 'applications/xhprof/controller/profile', 'PhabricatorXHProfProfileSymbolView' => 'applications/xhprof/view/symbol', @@ -473,6 +481,8 @@ phutil_register_library_map(array( 'PhabricatorConduitMethodCallLog' => 'PhabricatorConduitDAO', 'PhabricatorController' => 'AphrontController', 'PhabricatorDaemon' => 'PhutilDaemon', + 'PhabricatorDaemonConsoleController' => 'PhabricatorDaemonController', + 'PhabricatorDaemonController' => 'PhabricatorController', 'PhabricatorDirectoryCategory' => 'PhabricatorDirectoryDAO', 'PhabricatorDirectoryCategoryDeleteController' => 'PhabricatorDirectoryController', 'PhabricatorDirectoryCategoryEditController' => 'PhabricatorDirectoryController', @@ -495,6 +505,7 @@ phutil_register_library_map(array( 'PhabricatorFileStorageBlob' => 'PhabricatorFileDAO', 'PhabricatorFileUploadController' => 'PhabricatorFileController', 'PhabricatorFileViewController' => 'PhabricatorFileController', + 'PhabricatorGoodForNothingWorker' => 'PhabricatorWorker', 'PhabricatorLiskDAO' => 'LiskDAO', 'PhabricatorLoginController' => 'PhabricatorAuthController', 'PhabricatorLogoutController' => 'PhabricatorAuthController', @@ -567,6 +578,7 @@ phutil_register_library_map(array( 'PhabricatorSearchMySQLExecutor' => 'PhabricatorSearchExecutor', 'PhabricatorSearchQuery' => 'PhabricatorSearchDAO', 'PhabricatorStandardPageView' => 'AphrontPageView', + 'PhabricatorTaskmasterDaemon' => 'PhabricatorDaemon', 'PhabricatorTimelineCursor' => 'PhabricatorTimelineDAO', 'PhabricatorTimelineDAO' => 'PhabricatorLiskDAO', 'PhabricatorTimelineEvent' => 'PhabricatorTimelineDAO', @@ -578,6 +590,9 @@ phutil_register_library_map(array( 'PhabricatorUserOAuthInfo' => 'PhabricatorUserDAO', 'PhabricatorUserProfile' => 'PhabricatorUserDAO', 'PhabricatorUserSettingsController' => 'PhabricatorPeopleController', + 'PhabricatorWorkerDAO' => 'PhabricatorLiskDAO', + 'PhabricatorWorkerTask' => 'PhabricatorWorkerDAO', + 'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO', 'PhabricatorXHProfController' => 'PhabricatorController', 'PhabricatorXHProfProfileController' => 'PhabricatorXHProfController', 'PhabricatorXHProfProfileSymbolView' => 'AphrontView', diff --git a/src/aphront/default/configuration/AphrontDefaultApplicationConfiguration.php b/src/aphront/default/configuration/AphrontDefaultApplicationConfiguration.php index 9b6e8ef523..43c13010e4 100644 --- a/src/aphront/default/configuration/AphrontDefaultApplicationConfiguration.php +++ b/src/aphront/default/configuration/AphrontDefaultApplicationConfiguration.php @@ -200,6 +200,10 @@ class AphrontDefaultApplicationConfiguration ), ), + '/daemon/' => array( + '$' => 'PhabricatorDaemonConsoleController', + ), + ); } diff --git a/src/applications/daemon/controller/base/PhabricatorDaemonController.php b/src/applications/daemon/controller/base/PhabricatorDaemonController.php new file mode 100644 index 0000000000..9892426fd0 --- /dev/null +++ b/src/applications/daemon/controller/base/PhabricatorDaemonController.php @@ -0,0 +1,42 @@ +buildStandardPageView(); + + $page->setApplicationName('Daemon Console'); + $page->setBaseURI('/'); + $page->setTitle(idx($data, 'title')); + $page->setTabs( + array( + 'console' => array( + 'href' => '/daemon/', + 'name' => 'Console', + ), + ), + idx($data, 'tab')); + $page->setGlyph("\xE2\x98\xAF"); + $page->appendChild($view); + + $response = new AphrontWebpageResponse(); + return $response->setContent($page->render()); + } + +} diff --git a/src/applications/daemon/controller/base/__init__.php b/src/applications/daemon/controller/base/__init__.php new file mode 100644 index 0000000000..63dff99b04 --- /dev/null +++ b/src/applications/daemon/controller/base/__init__.php @@ -0,0 +1,15 @@ +getRequest(); + + if ($request->getStr('new')) { + $task = new PhabricatorWorkerTask(); + $task->setTaskClass('PhabricatorGoodForNothingWorker'); + $task->setPriority(4); + $task->setFailureCount(0); + $task->save(); + } + + $tasks = id(new PhabricatorWorkerTask()) + ->loadAll(); + + $rows = array(); + foreach ($tasks as $task) { + $rows[] = array( + $task->getID(), + $task->getTaskClass(), + $task->getLeaseOwner(), + $task->getLeaseExpires(), + $task->getPriority(), + $task->getFailureCount(), + ); + } + + $table = new AphrontTableView($rows); + $table->setHeaders( + array( + 'ID', + 'Class', + 'Owner', + 'Expries', + 'Priority', + 'Count', + )); + + $panel = new AphrontPanelView(); + $panel->setHeader('Tasks'); + $panel->appendChild($table); + + return $this->buildStandardPageResponse( + $panel, + array( + 'title' => 'Console', + 'tab' => 'console', + )); + } + +} diff --git a/src/applications/daemon/controller/console/__init__.php b/src/applications/daemon/controller/console/__init__.php new file mode 100644 index 0000000000..28cdb0bf7a --- /dev/null +++ b/src/applications/daemon/controller/console/__init__.php @@ -0,0 +1,17 @@ + false, + ) + parent::getConfiguration(); + } + + public function setServerTime($server_time) { + $this->serverTime = $server_time; + $this->localTime = time(); + return $this; + } + + public function setLeaseDuration($lease_duration) { + $server_lease_expires = $this->serverTime + $lease_duration; + $this->setLeaseExpires($server_lease_expires); + return $this->save(); + } + + public function save() { + if ($this->leaseOwner) { + $current_server_time = $this->serverTime + (time() - $this->localTime); + if ($current_server_time >= $this->leaseExpires) { + throw new Exception("Trying to update task after lease expiration!"); + } + } + + return parent::save(); + } + + +} diff --git a/src/infrastructure/daemon/workers/storage/task/__init__.php b/src/infrastructure/daemon/workers/storage/task/__init__.php new file mode 100644 index 0000000000..933817be3c --- /dev/null +++ b/src/infrastructure/daemon/workers/storage/task/__init__.php @@ -0,0 +1,12 @@ + false, + self::CONFIG_SERIALIZATION => array( + 'data' => self::SERIALIZATION_JSON, + ), + ) + parent::getConfiguration(); + } + +} diff --git a/src/infrastructure/daemon/workers/storage/taskdata/__init__.php b/src/infrastructure/daemon/workers/storage/taskdata/__init__.php new file mode 100644 index 0000000000..2255a502a4 --- /dev/null +++ b/src/infrastructure/daemon/workers/storage/taskdata/__init__.php @@ -0,0 +1,12 @@ +getLeaseOwnershipName(); + + $task_table = new PhabricatorWorkerTask(); + $taskdata_table = new PhabricatorWorkerTaskData(); + + $sleep = 0; + do { + $conn_w = $task_table->establishConnection('w'); + queryfx( + $conn_w, + 'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15 + WHERE leaseOwner IS NULL + OR leaseExpires < UNIX_TIMESTAMP() + ORDER BY leaseOwner IS NULL, failureCount, priority + LIMIT 1', + $task_table->getTableName(), + $lease_ownership_name); + $rows = $conn_w->getAffectedRows(); + + if ($rows) { + $data = queryfx_all( + $conn_w, + 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime + FROM %T task LEFT JOIN %T taskdata + ON taskdata.taskID = task.id + WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() + LIMIT 1', + $task_table->getTableName(), + $taskdata_table->getTableName(), + $lease_ownership_name); + $tasks = $task_table->loadAllFromArray($data); + $tasks = mpull($tasks, null, 'getID'); + + $task_data = array(); + foreach ($data as $row) { + $tasks[$row['id']]->setServerTime($row['_serverTime']); + if ($row['_taskData']) { + $task_data[$row['id']] = json_decode($row['_taskData'], true); + } else { + $task_data[$row['id']] = null; + } + } + + foreach ($tasks as $task) { + // TODO: We should detect if we acquired a task with an expired lease + // and log about it / bump up failure count. + + // TODO: We should detect if we acquired a task with an excessive + // failure count and fail it permanently. + + $data = idx($task_data, $task->getID()); + $class = $task->getTaskClass(); + try { + PhutilSymbolLoader::loadClass($class); + if (!is_subclass_of($class, 'PhabricatorWorker')) { + throw new Exception( + "Task class '{$class}' does not extend PhabricatorWorker."); + } + $worker = newv($class, array($data)); + + $lease = $worker->getRequiredLeaseTime(); + if ($lease !== null) { + $task->setLeaseDuration($lease); + } + + $worker->executeTask(); + + $task->delete(); + if ($data !== null) { + queryfx( + $conn_w, + 'DELETE FROM %T WHERE taskID = %d', + $taskdata_table, + $task->getID()); + } + } catch (Exception $ex) { + $task->setFailureCount($task->getFailureCount() + 1); + $task->save(); + throw $ex; + } + } + + $sleep = 0; + } else { + $sleep = min($sleep + 1, 30); + } + + $this->sleep($sleep); + } while (true); + } + + private function getLeaseOwnershipName() { + static $name = null; + if ($name === null) { + $name = getmypid().':'.time().':'.php_uname('n'); + } + return $name; + } + +} diff --git a/src/infrastructure/daemon/workers/taskmaster/__init__.php b/src/infrastructure/daemon/workers/taskmaster/__init__.php new file mode 100644 index 0000000000..25931fb2e8 --- /dev/null +++ b/src/infrastructure/daemon/workers/taskmaster/__init__.php @@ -0,0 +1,18 @@ +data = $data; + } + + final protected function getTaskData() { + return $this->data; + } + + final public function executeTask() { + $this->doWork(); + } + + public function getRequiredLeaseTime() { + return null; + } + + abstract protected function doWork(); +} diff --git a/src/infrastructure/daemon/workers/worker/__init__.php b/src/infrastructure/daemon/workers/worker/__init__.php new file mode 100644 index 0000000000..c2d72e9a10 --- /dev/null +++ b/src/infrastructure/daemon/workers/worker/__init__.php @@ -0,0 +1,10 @@ + $v) { $map[$k] = $v;