From dc75b4bd0602afcd9c75484add4515eb63e0676a Mon Sep 17 00:00:00 2001 From: epriestley Date: Sun, 24 Apr 2016 09:04:27 -0700 Subject: [PATCH] Move all cluster locking logic to a separate class Summary: Ref T10860. This doesn't change anything, it just separates all this stuff out of `PhabricatorRepository` since I'm planning to add a bit more state to it and it's already pretty big and fairly separable. Test Plan: Pulled, pushed, browsed Diffusion. Reviewers: chad Reviewed By: chad Maniphest Tasks: T10860 Differential Revision: https://secure.phabricator.com/D15790 --- src/__phutil_library_map__.php | 2 + .../DiffusionQueryCommitsConduitAPIMethod.php | 10 +- .../DiffusionQueryConduitAPIMethod.php | 8 +- .../controller/DiffusionServeController.php | 10 +- .../DiffusionRepositoryClusterEngine.php | 435 ++++++++++++++++++ .../DiffusionGitReceivePackSSHWorkflow.php | 11 +- .../ssh/DiffusionGitUploadPackSSHWorkflow.php | 6 +- .../editor/PhabricatorRepositoryEditor.php | 5 +- .../PhabricatorRepositoryPullEngine.php | 6 +- .../storage/PhabricatorRepository.php | 379 +-------------- 10 files changed, 479 insertions(+), 393 deletions(-) create mode 100644 src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php index 65eb7f6c20..d20a7b4e14 100644 --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -745,6 +745,7 @@ phutil_register_library_map(array( 'DiffusionRenameHistoryQuery' => 'applications/diffusion/query/DiffusionRenameHistoryQuery.php', 'DiffusionRepositoryBasicsManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryBasicsManagementPanel.php', 'DiffusionRepositoryByIDRemarkupRule' => 'applications/diffusion/remarkup/DiffusionRepositoryByIDRemarkupRule.php', + 'DiffusionRepositoryClusterEngine' => 'applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php', 'DiffusionRepositoryClusterManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php', 'DiffusionRepositoryController' => 'applications/diffusion/controller/DiffusionRepositoryController.php', 'DiffusionRepositoryCreateController' => 'applications/diffusion/controller/DiffusionRepositoryCreateController.php', @@ -4953,6 +4954,7 @@ phutil_register_library_map(array( 'DiffusionRenameHistoryQuery' => 'Phobject', 'DiffusionRepositoryBasicsManagementPanel' => 'DiffusionRepositoryManagementPanel', 'DiffusionRepositoryByIDRemarkupRule' => 'PhabricatorObjectRemarkupRule', + 'DiffusionRepositoryClusterEngine' => 'Phobject', 'DiffusionRepositoryClusterManagementPanel' => 'DiffusionRepositoryManagementPanel', 'DiffusionRepositoryController' => 'DiffusionController', 'DiffusionRepositoryCreateController' => 'DiffusionRepositoryEditController', diff --git a/src/applications/diffusion/conduit/DiffusionQueryCommitsConduitAPIMethod.php b/src/applications/diffusion/conduit/DiffusionQueryCommitsConduitAPIMethod.php index bb205aad32..72a03f9579 100644 --- a/src/applications/diffusion/conduit/DiffusionQueryCommitsConduitAPIMethod.php +++ b/src/applications/diffusion/conduit/DiffusionQueryCommitsConduitAPIMethod.php @@ -29,21 +29,25 @@ final class DiffusionQueryCommitsConduitAPIMethod protected function execute(ConduitAPIRequest $request) { $need_messages = $request->getValue('needMessages'); $bypass_cache = $request->getValue('bypassCache'); + $viewer = $request->getUser(); $query = id(new DiffusionCommitQuery()) - ->setViewer($request->getUser()) + ->setViewer($viewer) ->needCommitData(true); $repository_phid = $request->getValue('repositoryPHID'); if ($repository_phid) { $repository = id(new PhabricatorRepositoryQuery()) - ->setViewer($request->getUser()) + ->setViewer($viewer) ->withPHIDs(array($repository_phid)) ->executeOne(); if ($repository) { $query->withRepository($repository); if ($bypass_cache) { - $repository->synchronizeWorkingCopyBeforeRead(); + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyBeforeRead(); } } } diff --git a/src/applications/diffusion/conduit/DiffusionQueryConduitAPIMethod.php b/src/applications/diffusion/conduit/DiffusionQueryConduitAPIMethod.php index 7a7d81f4e0..716824f9f8 100644 --- a/src/applications/diffusion/conduit/DiffusionQueryConduitAPIMethod.php +++ b/src/applications/diffusion/conduit/DiffusionQueryConduitAPIMethod.php @@ -124,10 +124,11 @@ abstract class DiffusionQueryConduitAPIMethod // to prevent infinite recursion. $is_cluster_request = $request->getIsClusterRequest(); + $viewer = $request->getUser(); $repository = $drequest->getRepository(); $client = $repository->newConduitClient( - $request->getUser(), + $viewer, $is_cluster_request); if ($client) { // We're proxying, so just make an intracluster call. @@ -149,7 +150,10 @@ abstract class DiffusionQueryConduitAPIMethod // fetching the most up-to-date data? Synchronization can be slow, and a // lot of web reads are probably fine if they're a few seconds out of // date. - $repository->synchronizeWorkingCopyBeforeRead(); + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyBeforeRead(); return $this->getResult($request); } diff --git a/src/applications/diffusion/controller/DiffusionServeController.php b/src/applications/diffusion/controller/DiffusionServeController.php index 32c4936484..f1703655fa 100644 --- a/src/applications/diffusion/controller/DiffusionServeController.php +++ b/src/applications/diffusion/controller/DiffusionServeController.php @@ -540,12 +540,16 @@ final class DiffusionServeController extends DiffusionController { $unguarded = AphrontWriteGuard::beginScopedUnguardedWrites(); + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository); + $did_write_lock = false; if ($this->isReadOnlyRequest($repository)) { - $repository->synchronizeWorkingCopyBeforeRead(); + $cluster_engine->synchronizeWorkingCopyBeforeRead(); } else { $did_write_lock = true; - $repository->synchronizeWorkingCopyBeforeWrite($viewer); + $cluster_engine->synchronizeWorkingCopyBeforeWrite(); } $caught = null; @@ -559,7 +563,7 @@ final class DiffusionServeController extends DiffusionController { } if ($did_write_lock) { - $repository->synchronizeWorkingCopyAfterWrite(); + $cluster_engine->synchronizeWorkingCopyAfterWrite(); } unset($unguarded); diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php new file mode 100644 index 0000000000..2ed0a15f18 --- /dev/null +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -0,0 +1,435 @@ +repository = $repository; + return $this; + } + + public function getRepository() { + return $this->repository; + } + + public function setViewer(PhabricatorUser $viewer) { + $this->viewer = $viewer; + return $this; + } + + public function getViewer() { + return $this->viewer; + } + + +/* -( Cluster Synchronization )-------------------------------------------- */ + + + /** + * Synchronize repository version information after creating a repository. + * + * This initializes working copy versions for all currently bound devices to + * 0, so that we don't get stuck making an ambiguous choice about which + * devices are leaders when we later synchronize before a read. + * + * @task sync + */ + public function synchronizeWorkingCopyAfterCreation() { + if (!$this->shouldEnableSynchronization()) { + return; + } + + $repository = $this->getRepository(); + $repository_phid = $repository->getPHID(); + + $service = $repository->loadAlmanacService(); + if (!$service) { + throw new Exception(pht('Failed to load repository cluster service.')); + } + + $bindings = $service->getActiveBindings(); + foreach ($bindings as $binding) { + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $binding->getDevicePHID(), + 0); + } + + return $this; + } + + + /** + * @task sync + */ + public function synchronizeWorkingCopyBeforeRead() { + if (!$this->shouldEnableSynchronization()) { + return; + } + + $repository = $this->getRepository(); + $repository_phid = $repository->getPHID(); + + $device = AlmanacKeys::getLiveDevice(); + $device_phid = $device->getPHID(); + + $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( + $repository_phid, + $device_phid); + + // TODO: Raise a more useful exception if we fail to grab this lock. + $read_lock->lock(phutil_units('2 minutes in seconds')); + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + $versions = mpull($versions, null, 'getDevicePHID'); + + $this_version = idx($versions, $device_phid); + if ($this_version) { + $this_version = (int)$this_version->getRepositoryVersion(); + } else { + $this_version = -1; + } + + if ($versions) { + // This is the normal case, where we have some version information and + // can identify which nodes are leaders. If the current node is not a + // leader, we want to fetch from a leader and then update our version. + + $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); + if ($max_version > $this_version) { + $fetchable = array(); + foreach ($versions as $version) { + if ($version->getRepositoryVersion() == $max_version) { + $fetchable[] = $version->getDevicePHID(); + } + } + + $this->synchronizeWorkingCopyFromDevices($fetchable); + + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $device_phid, + $max_version); + } + + $result_version = $max_version; + } else { + // If no version records exist yet, we need to be careful, because we + // can not tell which nodes are leaders. + + // There might be several nodes with arbitrary existing data, and we have + // no way to tell which one has the "right" data. If we pick wrong, we + // might erase some or all of the data in the repository. + + // Since this is dangeorus, we refuse to guess unless there is only one + // device. If we're the only device in the group, we obviously must be + // a leader. + + $service = $repository->loadAlmanacService(); + if (!$service) { + throw new Exception(pht('Failed to load repository cluster service.')); + } + + $bindings = $service->getActiveBindings(); + $device_map = array(); + foreach ($bindings as $binding) { + $device_map[$binding->getDevicePHID()] = true; + } + + if (count($device_map) > 1) { + throw new Exception( + pht( + 'Repository "%s" exists on more than one device, but no device '. + 'has any repository version information. Phabricator can not '. + 'guess which copy of the existing data is authoritative. Remove '. + 'all but one device from service to mark the remaining device '. + 'as the authority.', + $repository->getDisplayName())); + } + + if (empty($device_map[$device->getPHID()])) { + throw new Exception( + pht( + 'Repository "%s" is being synchronized on device "%s", but '. + 'this device is not bound to the corresponding cluster '. + 'service ("%s").', + $repository->getDisplayName(), + $device->getName(), + $service->getName())); + } + + // The current device is the only device in service, so it must be a + // leader. We can safely have any future nodes which come online read + // from it. + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $device_phid, + 0); + + $result_version = 0; + } + + $read_lock->unlock(); + + return $result_version; + } + + + /** + * @task sync + */ + public function synchronizeWorkingCopyBeforeWrite() { + if (!$this->shouldEnableSynchronization()) { + return; + } + + $repository = $this->getRepository(); + $viewer = $this->getViewer(); + + $repository_phid = $repository->getPHID(); + + $device = AlmanacKeys::getLiveDevice(); + $device_phid = $device->getPHID(); + + $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( + $repository_phid); + + // TODO: Raise a more useful exception if we fail to grab this lock. + $write_lock->lock(phutil_units('2 minutes in seconds')); + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + foreach ($versions as $version) { + if (!$version->getIsWriting()) { + continue; + } + + throw new Exception( + pht( + 'An previous write to this repository was interrupted; refusing '. + 'new writes. This issue resolves operator intervention to resolve, '. + 'see "Write Interruptions" in the "Cluster: Repositories" in the '. + 'documentation for instructions.')); + } + + try { + $max_version = $this->synchronizeWorkingCopyBeforeRead(); + } catch (Exception $ex) { + $write_lock->unlock(); + throw $ex; + } + + PhabricatorRepositoryWorkingCopyVersion::willWrite( + $repository_phid, + $device_phid, + array( + 'userPHID' => $viewer->getPHID(), + 'epoch' => PhabricatorTime::getNow(), + 'devicePHID' => $device_phid, + )); + + $this->clusterWriteVersion = $max_version; + $this->clusterWriteLock = $write_lock; + } + + + /** + * @task sync + */ + public function synchronizeWorkingCopyAfterWrite() { + if (!$this->shouldEnableSynchronization()) { + return; + } + + if (!$this->clusterWriteLock) { + throw new Exception( + pht( + 'Trying to synchronize after write, but not holding a write '. + 'lock!')); + } + + $repository = $this->getRepository(); + $repository_phid = $repository->getPHID(); + + $device = AlmanacKeys::getLiveDevice(); + $device_phid = $device->getPHID(); + + // NOTE: This means we're still bumping the version when pushes fail. We + // could select only un-rejected events instead to bump a little less + // often. + + $new_log = id(new PhabricatorRepositoryPushEventQuery()) + ->setViewer(PhabricatorUser::getOmnipotentUser()) + ->withRepositoryPHIDs(array($repository_phid)) + ->setLimit(1) + ->executeOne(); + + $old_version = $this->clusterWriteVersion; + if ($new_log) { + $new_version = $new_log->getID(); + } else { + $new_version = $old_version; + } + + PhabricatorRepositoryWorkingCopyVersion::didWrite( + $repository_phid, + $device_phid, + $this->clusterWriteVersion, + $new_log->getID()); + + $this->clusterWriteLock->unlock(); + $this->clusterWriteLock = null; + } + + +/* -( Internals )---------------------------------------------------------- */ + + + /** + * @task internal + */ + private function shouldEnableSynchronization() { + $repository = $this->getRepository(); + + $service_phid = $repository->getAlmanacServicePHID(); + if (!$service_phid) { + return false; + } + + // TODO: For now, this is only supported for Git. + if (!$repository->isGit()) { + return false; + } + + // TODO: It may eventually make sense to try to version and synchronize + // observed repositories (so that daemons don't do reads against out-of + // date hosts), but don't bother for now. + if (!$repository->isHosted()) { + return false; + } + + $device = AlmanacKeys::getLiveDevice(); + if (!$device) { + return false; + } + + return true; + } + + + /** + * @task internal + */ + private function synchronizeWorkingCopyFromDevices(array $device_phids) { + $repository = $this->getRepository(); + + $service = $repository->loadAlmanacService(); + if (!$service) { + throw new Exception(pht('Failed to load repository cluster service.')); + } + + $device_map = array_fuse($device_phids); + $bindings = $service->getActiveBindings(); + + $fetchable = array(); + foreach ($bindings as $binding) { + // We can't fetch from nodes which don't have the newest version. + $device_phid = $binding->getDevicePHID(); + if (empty($device_map[$device_phid])) { + continue; + } + + // TODO: For now, only fetch over SSH. We could support fetching over + // HTTP eventually. + if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { + continue; + } + + $fetchable[] = $binding; + } + + if (!$fetchable) { + throw new Exception( + pht( + 'Leader lost: no up-to-date nodes in repository cluster are '. + 'fetchable.')); + } + + $caught = null; + foreach ($fetchable as $binding) { + try { + $this->synchronizeWorkingCopyFromBinding($binding); + $caught = null; + break; + } catch (Exception $ex) { + $caught = $ex; + } + } + + if ($caught) { + throw $caught; + } + } + + + /** + * @task internal + */ + private function synchronizeWorkingCopyFromBinding($binding) { + $repository = $this->getRepository(); + + $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); + $local_path = $repository->getLocalPath(); + + if ($repository->isGit()) { + if (!Filesystem::pathExists($local_path)) { + $device = AlmanacKeys::getLiveDevice(); + throw new Exception( + pht( + 'Repository "%s" does not have a working copy on this device '. + 'yet, so it can not be synchronized. Wait for the daemons to '. + 'construct one or run `bin/repository update %s` on this host '. + '("%s") to build it explicitly.', + $repository->getDisplayName(), + $repository->getMonogram(), + $device->getName())); + } + + $argv = array( + 'fetch --prune -- %s %s', + $fetch_uri, + '+refs/*:refs/*', + ); + } else { + throw new Exception(pht('Binding sync only supported for git!')); + } + + $future = DiffusionCommandEngine::newCommandEngine($repository) + ->setArgv($argv) + ->setConnectAsDevice(true) + ->setSudoAsDaemon(true) + ->setProtocol($fetch_uri->getProtocol()) + ->newFuture(); + + $future->setCWD($local_path); + + $future->resolvex(); + } + +} diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php index f5e314f462..ad38480828 100644 --- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php @@ -15,19 +15,22 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { protected function executeRepositoryOperations() { $repository = $this->getRepository(); + $viewer = $this->getViewer(); // This is a write, and must have write access. $this->requireWriteAccess(); + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository); + if ($this->shouldProxy()) { $command = $this->getProxyCommand(); $did_synchronize = false; } else { $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); - $did_synchronize = true; - $viewer = $this->getUser(); - $repository->synchronizeWorkingCopyBeforeWrite($viewer); + $cluster_engine->synchronizeWorkingCopyBeforeWrite(); } $caught = null; @@ -40,7 +43,7 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { // We've committed the write (or rejected it), so we can release the lock // without waiting for the client to receive the acknowledgement. if ($did_synchronize) { - $repository->synchronizeWorkingCopyAfterWrite(); + $cluster_engine->synchronizeWorkingCopyAfterWrite(); } if ($caught) { diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php index 1a83a1f30b..eb6a99e004 100644 --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -15,6 +15,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { protected function executeRepositoryOperations() { $repository = $this->getRepository(); + $viewer = $this->getUser(); $skip_sync = $this->shouldSkipReadSynchronization(); @@ -23,7 +24,10 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { } else { $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); if (!$skip_sync) { - $repository->synchronizeWorkingCopyBeforeRead(); + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyBeforeRead(); } } $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); diff --git a/src/applications/repository/editor/PhabricatorRepositoryEditor.php b/src/applications/repository/editor/PhabricatorRepositoryEditor.php index 70d904b112..9ef775fe2d 100644 --- a/src/applications/repository/editor/PhabricatorRepositoryEditor.php +++ b/src/applications/repository/editor/PhabricatorRepositoryEditor.php @@ -684,7 +684,10 @@ final class PhabricatorRepositoryEditor } if ($this->getIsNewObject()) { - $object->synchronizeWorkingCopyAfterCreation(); + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($this->getActor()) + ->setRepository($object) + ->synchronizeWorkingCopyAfterCreation(); } return $xactions; diff --git a/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php b/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php index f589f61b9d..621c718577 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php @@ -23,6 +23,7 @@ final class PhabricatorRepositoryPullEngine public function pullRepository() { $repository = $this->getRepository(); + $viewer = PhabricatorUser::getOmnipotentUser(); $is_hg = false; $is_git = false; @@ -96,7 +97,10 @@ final class PhabricatorRepositoryPullEngine } if ($repository->isHosted()) { - $repository->synchronizeWorkingCopyBeforeRead(); + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyBeforeRead(); if ($is_git) { $this->installGitHook(); diff --git a/src/applications/repository/storage/PhabricatorRepository.php b/src/applications/repository/storage/PhabricatorRepository.php index 4a217d7e80..1d251cb34b 100644 --- a/src/applications/repository/storage/PhabricatorRepository.php +++ b/src/applications/repository/storage/PhabricatorRepository.php @@ -68,9 +68,6 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO private $projectPHIDs = self::ATTACHABLE; private $uris = self::ATTACHABLE; - private $clusterWriteLock; - private $clusterWriteVersion; - public static function initializeNewRepository(PhabricatorUser $actor) { $app = id(new PhabricatorApplicationQuery()) @@ -2193,379 +2190,7 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO } -/* -( Cluster Synchronization )-------------------------------------------- */ - - - private function shouldEnableSynchronization() { - $service_phid = $this->getAlmanacServicePHID(); - if (!$service_phid) { - return false; - } - - // TODO: For now, this is only supported for Git. - if (!$this->isGit()) { - return false; - } - - // TODO: It may eventually make sense to try to version and synchronize - // observed repositories (so that daemons don't do reads against out-of - // date hosts), but don't bother for now. - if (!$this->isHosted()) { - return false; - } - - $device = AlmanacKeys::getLiveDevice(); - if (!$device) { - return false; - } - - return true; - } - - - /** - * Synchronize repository version information after creating a repository. - * - * This initializes working copy versions for all currently bound devices to - * 0, so that we don't get stuck making an ambiguous choice about which - * devices are leaders when we later synchronize before a read. - * - * @task sync - */ - public function synchronizeWorkingCopyAfterCreation() { - if (!$this->shouldEnableSynchronization()) { - return; - } - - $repository_phid = $this->getPHID(); - - $service = $this->loadAlmanacService(); - if (!$service) { - throw new Exception(pht('Failed to load repository cluster service.')); - } - - $bindings = $service->getActiveBindings(); - foreach ($bindings as $binding) { - PhabricatorRepositoryWorkingCopyVersion::updateVersion( - $repository_phid, - $binding->getDevicePHID(), - 0); - } - } - - - /** - * @task sync - */ - public function synchronizeWorkingCopyBeforeRead() { - if (!$this->shouldEnableSynchronization()) { - return; - } - - $repository_phid = $this->getPHID(); - - $device = AlmanacKeys::getLiveDevice(); - $device_phid = $device->getPHID(); - - $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( - $repository_phid, - $device_phid); - - // TODO: Raise a more useful exception if we fail to grab this lock. - $read_lock->lock(phutil_units('2 minutes in seconds')); - - $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( - $repository_phid); - $versions = mpull($versions, null, 'getDevicePHID'); - - $this_version = idx($versions, $device_phid); - if ($this_version) { - $this_version = (int)$this_version->getRepositoryVersion(); - } else { - $this_version = -1; - } - - if ($versions) { - // This is the normal case, where we have some version information and - // can identify which nodes are leaders. If the current node is not a - // leader, we want to fetch from a leader and then update our version. - - $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); - if ($max_version > $this_version) { - $fetchable = array(); - foreach ($versions as $version) { - if ($version->getRepositoryVersion() == $max_version) { - $fetchable[] = $version->getDevicePHID(); - } - } - - $this->synchronizeWorkingCopyFromDevices($fetchable); - - PhabricatorRepositoryWorkingCopyVersion::updateVersion( - $repository_phid, - $device_phid, - $max_version); - } - - $result_version = $max_version; - } else { - // If no version records exist yet, we need to be careful, because we - // can not tell which nodes are leaders. - - // There might be several nodes with arbitrary existing data, and we have - // no way to tell which one has the "right" data. If we pick wrong, we - // might erase some or all of the data in the repository. - - // Since this is dangeorus, we refuse to guess unless there is only one - // device. If we're the only device in the group, we obviously must be - // a leader. - - $service = $this->loadAlmanacService(); - if (!$service) { - throw new Exception(pht('Failed to load repository cluster service.')); - } - - $bindings = $service->getActiveBindings(); - $device_map = array(); - foreach ($bindings as $binding) { - $device_map[$binding->getDevicePHID()] = true; - } - - if (count($device_map) > 1) { - throw new Exception( - pht( - 'Repository "%s" exists on more than one device, but no device '. - 'has any repository version information. Phabricator can not '. - 'guess which copy of the existing data is authoritative. Remove '. - 'all but one device from service to mark the remaining device '. - 'as the authority.', - $this->getDisplayName())); - } - - if (empty($device_map[$device->getPHID()])) { - throw new Exception( - pht( - 'Repository "%s" is being synchronized on device "%s", but '. - 'this device is not bound to the corresponding cluster '. - 'service ("%s").', - $this->getDisplayName(), - $device->getName(), - $service->getName())); - } - - // The current device is the only device in service, so it must be a - // leader. We can safely have any future nodes which come online read - // from it. - PhabricatorRepositoryWorkingCopyVersion::updateVersion( - $repository_phid, - $device_phid, - 0); - - $result_version = 0; - } - - $read_lock->unlock(); - - return $result_version; - } - - - /** - * @task sync - */ - public function synchronizeWorkingCopyBeforeWrite( - PhabricatorUser $actor) { - if (!$this->shouldEnableSynchronization()) { - return; - } - - $repository_phid = $this->getPHID(); - - $device = AlmanacKeys::getLiveDevice(); - $device_phid = $device->getPHID(); - - $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( - $repository_phid); - - // TODO: Raise a more useful exception if we fail to grab this lock. - $write_lock->lock(phutil_units('2 minutes in seconds')); - - $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( - $repository_phid); - foreach ($versions as $version) { - if (!$version->getIsWriting()) { - continue; - } - - throw new Exception( - pht( - 'An previous write to this repository was interrupted; refusing '. - 'new writes. This issue resolves operator intervention to resolve, '. - 'see "Write Interruptions" in the "Cluster: Repositories" in the '. - 'documentation for instructions.')); - } - - try { - $max_version = $this->synchronizeWorkingCopyBeforeRead(); - } catch (Exception $ex) { - $write_lock->unlock(); - throw $ex; - } - - PhabricatorRepositoryWorkingCopyVersion::willWrite( - $repository_phid, - $device_phid, - array( - 'userPHID' => $actor->getPHID(), - 'epoch' => PhabricatorTime::getNow(), - 'devicePHID' => $device_phid, - )); - - $this->clusterWriteVersion = $max_version; - $this->clusterWriteLock = $write_lock; - } - - - /** - * @task sync - */ - public function synchronizeWorkingCopyAfterWrite() { - if (!$this->shouldEnableSynchronization()) { - return; - } - - if (!$this->clusterWriteLock) { - throw new Exception( - pht( - 'Trying to synchronize after write, but not holding a write '. - 'lock!')); - } - - $repository_phid = $this->getPHID(); - - $device = AlmanacKeys::getLiveDevice(); - $device_phid = $device->getPHID(); - - // NOTE: This means we're still bumping the version when pushes fail. We - // could select only un-rejected events instead to bump a little less - // often. - - $new_log = id(new PhabricatorRepositoryPushEventQuery()) - ->setViewer(PhabricatorUser::getOmnipotentUser()) - ->withRepositoryPHIDs(array($repository_phid)) - ->setLimit(1) - ->executeOne(); - - $old_version = $this->clusterWriteVersion; - if ($new_log) { - $new_version = $new_log->getID(); - } else { - $new_version = $old_version; - } - - PhabricatorRepositoryWorkingCopyVersion::didWrite( - $repository_phid, - $device_phid, - $this->clusterWriteVersion, - $new_log->getID()); - - $this->clusterWriteLock->unlock(); - $this->clusterWriteLock = null; - } - - - /** - * @task sync - */ - private function synchronizeWorkingCopyFromDevices(array $device_phids) { - $service = $this->loadAlmanacService(); - if (!$service) { - throw new Exception(pht('Failed to load repository cluster service.')); - } - - $device_map = array_fuse($device_phids); - $bindings = $service->getActiveBindings(); - - $fetchable = array(); - foreach ($bindings as $binding) { - // We can't fetch from nodes which don't have the newest version. - $device_phid = $binding->getDevicePHID(); - if (empty($device_map[$device_phid])) { - continue; - } - - // TODO: For now, only fetch over SSH. We could support fetching over - // HTTP eventually. - if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { - continue; - } - - $fetchable[] = $binding; - } - - if (!$fetchable) { - throw new Exception( - pht( - 'Leader lost: no up-to-date nodes in repository cluster are '. - 'fetchable.')); - } - - $caught = null; - foreach ($fetchable as $binding) { - try { - $this->synchronizeWorkingCopyFromBinding($binding); - $caught = null; - break; - } catch (Exception $ex) { - $caught = $ex; - } - } - - if ($caught) { - throw $caught; - } - } - - private function synchronizeWorkingCopyFromBinding($binding) { - $fetch_uri = $this->getClusterRepositoryURIFromBinding($binding); - $local_path = $this->getLocalPath(); - - if ($this->isGit()) { - if (!Filesystem::pathExists($local_path)) { - $device = AlmanacKeys::getLiveDevice(); - throw new Exception( - pht( - 'Repository "%s" does not have a working copy on this device '. - 'yet, so it can not be synchronized. Wait for the daemons to '. - 'construct one or run `bin/repository update %s` on this host '. - '("%s") to build it explicitly.', - $this->getDisplayName(), - $this->getMonogram(), - $device->getName())); - } - - $argv = array( - 'fetch --prune -- %s %s', - $fetch_uri, - '+refs/*:refs/*', - ); - } else { - throw new Exception(pht('Binding sync only supported for git!')); - } - - $future = DiffusionCommandEngine::newCommandEngine($this) - ->setArgv($argv) - ->setConnectAsDevice(true) - ->setSudoAsDaemon(true) - ->setProtocol($fetch_uri->getProtocol()) - ->newFuture(); - - $future->setCWD($local_path); - - $future->resolvex(); - } - - private function getClusterRepositoryURIFromBinding( + public function getClusterRepositoryURIFromBinding( AlmanacBinding $binding) { $protocol = $binding->getAlmanacPropertyValue('protocol'); if ($protocol === null) { @@ -2613,8 +2238,6 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO } - - /* -( Symbols )-------------------------------------------------------------*/ public function getSymbolSources() {