1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-12-23 22:10:55 +01:00

Move all cluster locking logic to a separate class

Summary: Ref T10860. This doesn't change anything, it just separates all this stuff out of `PhabricatorRepository` since I'm planning to add a bit more state to it and it's already pretty big and fairly separable.

Test Plan: Pulled, pushed, browsed Diffusion.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T10860

Differential Revision: https://secure.phabricator.com/D15790
This commit is contained in:
epriestley 2016-04-24 09:04:27 -07:00
parent 1c0980a26a
commit dc75b4bd06
10 changed files with 479 additions and 393 deletions

View file

@ -745,6 +745,7 @@ phutil_register_library_map(array(
'DiffusionRenameHistoryQuery' => 'applications/diffusion/query/DiffusionRenameHistoryQuery.php', 'DiffusionRenameHistoryQuery' => 'applications/diffusion/query/DiffusionRenameHistoryQuery.php',
'DiffusionRepositoryBasicsManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryBasicsManagementPanel.php', 'DiffusionRepositoryBasicsManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryBasicsManagementPanel.php',
'DiffusionRepositoryByIDRemarkupRule' => 'applications/diffusion/remarkup/DiffusionRepositoryByIDRemarkupRule.php', 'DiffusionRepositoryByIDRemarkupRule' => 'applications/diffusion/remarkup/DiffusionRepositoryByIDRemarkupRule.php',
'DiffusionRepositoryClusterEngine' => 'applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php',
'DiffusionRepositoryClusterManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php', 'DiffusionRepositoryClusterManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php',
'DiffusionRepositoryController' => 'applications/diffusion/controller/DiffusionRepositoryController.php', 'DiffusionRepositoryController' => 'applications/diffusion/controller/DiffusionRepositoryController.php',
'DiffusionRepositoryCreateController' => 'applications/diffusion/controller/DiffusionRepositoryCreateController.php', 'DiffusionRepositoryCreateController' => 'applications/diffusion/controller/DiffusionRepositoryCreateController.php',
@ -4953,6 +4954,7 @@ phutil_register_library_map(array(
'DiffusionRenameHistoryQuery' => 'Phobject', 'DiffusionRenameHistoryQuery' => 'Phobject',
'DiffusionRepositoryBasicsManagementPanel' => 'DiffusionRepositoryManagementPanel', 'DiffusionRepositoryBasicsManagementPanel' => 'DiffusionRepositoryManagementPanel',
'DiffusionRepositoryByIDRemarkupRule' => 'PhabricatorObjectRemarkupRule', 'DiffusionRepositoryByIDRemarkupRule' => 'PhabricatorObjectRemarkupRule',
'DiffusionRepositoryClusterEngine' => 'Phobject',
'DiffusionRepositoryClusterManagementPanel' => 'DiffusionRepositoryManagementPanel', 'DiffusionRepositoryClusterManagementPanel' => 'DiffusionRepositoryManagementPanel',
'DiffusionRepositoryController' => 'DiffusionController', 'DiffusionRepositoryController' => 'DiffusionController',
'DiffusionRepositoryCreateController' => 'DiffusionRepositoryEditController', 'DiffusionRepositoryCreateController' => 'DiffusionRepositoryEditController',

View file

@ -29,21 +29,25 @@ final class DiffusionQueryCommitsConduitAPIMethod
protected function execute(ConduitAPIRequest $request) { protected function execute(ConduitAPIRequest $request) {
$need_messages = $request->getValue('needMessages'); $need_messages = $request->getValue('needMessages');
$bypass_cache = $request->getValue('bypassCache'); $bypass_cache = $request->getValue('bypassCache');
$viewer = $request->getUser();
$query = id(new DiffusionCommitQuery()) $query = id(new DiffusionCommitQuery())
->setViewer($request->getUser()) ->setViewer($viewer)
->needCommitData(true); ->needCommitData(true);
$repository_phid = $request->getValue('repositoryPHID'); $repository_phid = $request->getValue('repositoryPHID');
if ($repository_phid) { if ($repository_phid) {
$repository = id(new PhabricatorRepositoryQuery()) $repository = id(new PhabricatorRepositoryQuery())
->setViewer($request->getUser()) ->setViewer($viewer)
->withPHIDs(array($repository_phid)) ->withPHIDs(array($repository_phid))
->executeOne(); ->executeOne();
if ($repository) { if ($repository) {
$query->withRepository($repository); $query->withRepository($repository);
if ($bypass_cache) { if ($bypass_cache) {
$repository->synchronizeWorkingCopyBeforeRead(); id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
} }
} }
} }

View file

@ -124,10 +124,11 @@ abstract class DiffusionQueryConduitAPIMethod
// to prevent infinite recursion. // to prevent infinite recursion.
$is_cluster_request = $request->getIsClusterRequest(); $is_cluster_request = $request->getIsClusterRequest();
$viewer = $request->getUser();
$repository = $drequest->getRepository(); $repository = $drequest->getRepository();
$client = $repository->newConduitClient( $client = $repository->newConduitClient(
$request->getUser(), $viewer,
$is_cluster_request); $is_cluster_request);
if ($client) { if ($client) {
// We're proxying, so just make an intracluster call. // We're proxying, so just make an intracluster call.
@ -149,7 +150,10 @@ abstract class DiffusionQueryConduitAPIMethod
// fetching the most up-to-date data? Synchronization can be slow, and a // fetching the most up-to-date data? Synchronization can be slow, and a
// lot of web reads are probably fine if they're a few seconds out of // lot of web reads are probably fine if they're a few seconds out of
// date. // date.
$repository->synchronizeWorkingCopyBeforeRead(); id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
return $this->getResult($request); return $this->getResult($request);
} }

View file

@ -540,12 +540,16 @@ final class DiffusionServeController extends DiffusionController {
$unguarded = AphrontWriteGuard::beginScopedUnguardedWrites(); $unguarded = AphrontWriteGuard::beginScopedUnguardedWrites();
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository);
$did_write_lock = false; $did_write_lock = false;
if ($this->isReadOnlyRequest($repository)) { if ($this->isReadOnlyRequest($repository)) {
$repository->synchronizeWorkingCopyBeforeRead(); $cluster_engine->synchronizeWorkingCopyBeforeRead();
} else { } else {
$did_write_lock = true; $did_write_lock = true;
$repository->synchronizeWorkingCopyBeforeWrite($viewer); $cluster_engine->synchronizeWorkingCopyBeforeWrite();
} }
$caught = null; $caught = null;
@ -559,7 +563,7 @@ final class DiffusionServeController extends DiffusionController {
} }
if ($did_write_lock) { if ($did_write_lock) {
$repository->synchronizeWorkingCopyAfterWrite(); $cluster_engine->synchronizeWorkingCopyAfterWrite();
} }
unset($unguarded); unset($unguarded);

View file

@ -0,0 +1,435 @@
<?php
/**
* Manages repository synchronization for cluster repositories.
*
* @task config Configuring Synchronization
* @task sync Cluster Synchronization
* @task internal Internals
*/
final class DiffusionRepositoryClusterEngine extends Phobject {
private $repository;
private $viewer;
private $clusterWriteLock;
private $clusterWriteVersion;
/* -( Configuring Synchronization )---------------------------------------- */
public function setRepository(PhabricatorRepository $repository) {
$this->repository = $repository;
return $this;
}
public function getRepository() {
return $this->repository;
}
public function setViewer(PhabricatorUser $viewer) {
$this->viewer = $viewer;
return $this;
}
public function getViewer() {
return $this->viewer;
}
/* -( Cluster Synchronization )-------------------------------------------- */
/**
* Synchronize repository version information after creating a repository.
*
* This initializes working copy versions for all currently bound devices to
* 0, so that we don't get stuck making an ambiguous choice about which
* devices are leaders when we later synchronize before a read.
*
* @task sync
*/
public function synchronizeWorkingCopyAfterCreation() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
foreach ($bindings as $binding) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$binding->getDevicePHID(),
0);
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeRead() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
$repository_phid,
$device_phid);
// TODO: Raise a more useful exception if we fail to grab this lock.
$read_lock->lock(phutil_units('2 minutes in seconds'));
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($versions) {
// This is the normal case, where we have some version information and
// can identify which nodes are leaders. If the current node is not a
// leader, we want to fetch from a leader and then update our version.
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$max_version);
}
$result_version = $max_version;
} else {
// If no version records exist yet, we need to be careful, because we
// can not tell which nodes are leaders.
// There might be several nodes with arbitrary existing data, and we have
// no way to tell which one has the "right" data. If we pick wrong, we
// might erase some or all of the data in the repository.
// Since this is dangeorus, we refuse to guess unless there is only one
// device. If we're the only device in the group, we obviously must be
// a leader.
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
$device_map = array();
foreach ($bindings as $binding) {
$device_map[$binding->getDevicePHID()] = true;
}
if (count($device_map) > 1) {
throw new Exception(
pht(
'Repository "%s" exists on more than one device, but no device '.
'has any repository version information. Phabricator can not '.
'guess which copy of the existing data is authoritative. Remove '.
'all but one device from service to mark the remaining device '.
'as the authority.',
$repository->getDisplayName()));
}
if (empty($device_map[$device->getPHID()])) {
throw new Exception(
pht(
'Repository "%s" is being synchronized on device "%s", but '.
'this device is not bound to the corresponding cluster '.
'service ("%s").',
$repository->getDisplayName(),
$device->getName(),
$service->getName()));
}
// The current device is the only device in service, so it must be a
// leader. We can safely have any future nodes which come online read
// from it.
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
$result_version = 0;
}
$read_lock->unlock();
return $result_version;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$viewer = $this->getViewer();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid);
// TODO: Raise a more useful exception if we fail to grab this lock.
$write_lock->lock(phutil_units('2 minutes in seconds'));
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
foreach ($versions as $version) {
if (!$version->getIsWriting()) {
continue;
}
throw new Exception(
pht(
'An previous write to this repository was interrupted; refusing '.
'new writes. This issue resolves operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.'));
}
try {
$max_version = $this->synchronizeWorkingCopyBeforeRead();
} catch (Exception $ex) {
$write_lock->unlock();
throw $ex;
}
PhabricatorRepositoryWorkingCopyVersion::willWrite(
$repository_phid,
$device_phid,
array(
'userPHID' => $viewer->getPHID(),
'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid,
));
$this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
if (!$this->clusterWriteLock) {
throw new Exception(
pht(
'Trying to synchronize after write, but not holding a write '.
'lock!'));
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// NOTE: This means we're still bumping the version when pushes fail. We
// could select only un-rejected events instead to bump a little less
// often.
$new_log = id(new PhabricatorRepositoryPushEventQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())
->withRepositoryPHIDs(array($repository_phid))
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion;
if ($new_log) {
$new_version = $new_log->getID();
} else {
$new_version = $old_version;
}
PhabricatorRepositoryWorkingCopyVersion::didWrite(
$repository_phid,
$device_phid,
$this->clusterWriteVersion,
$new_log->getID());
$this->clusterWriteLock->unlock();
$this->clusterWriteLock = null;
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
private function shouldEnableSynchronization() {
$repository = $this->getRepository();
$service_phid = $repository->getAlmanacServicePHID();
if (!$service_phid) {
return false;
}
// TODO: For now, this is only supported for Git.
if (!$repository->isGit()) {
return false;
}
// TODO: It may eventually make sense to try to version and synchronize
// observed repositories (so that daemons don't do reads against out-of
// date hosts), but don't bother for now.
if (!$repository->isHosted()) {
return false;
}
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
}
return true;
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
$repository = $this->getRepository();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$device_map = array_fuse($device_phids);
$bindings = $service->getActiveBindings();
$fetchable = array();
foreach ($bindings as $binding) {
// We can't fetch from nodes which don't have the newest version.
$device_phid = $binding->getDevicePHID();
if (empty($device_map[$device_phid])) {
continue;
}
// TODO: For now, only fetch over SSH. We could support fetching over
// HTTP eventually.
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
continue;
}
$fetchable[] = $binding;
}
if (!$fetchable) {
throw new Exception(
pht(
'Leader lost: no up-to-date nodes in repository cluster are '.
'fetchable.'));
}
$caught = null;
foreach ($fetchable as $binding) {
try {
$this->synchronizeWorkingCopyFromBinding($binding);
$caught = null;
break;
} catch (Exception $ex) {
$caught = $ex;
}
}
if ($caught) {
throw $caught;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromBinding($binding) {
$repository = $this->getRepository();
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
$local_path = $repository->getLocalPath();
if ($repository->isGit()) {
if (!Filesystem::pathExists($local_path)) {
$device = AlmanacKeys::getLiveDevice();
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
$argv = array(
'fetch --prune -- %s %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Binding sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setConnectAsDevice(true)
->setSudoAsDaemon(true)
->setProtocol($fetch_uri->getProtocol())
->newFuture();
$future->setCWD($local_path);
$future->resolvex();
}
}

View file

@ -15,19 +15,22 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
protected function executeRepositoryOperations() { protected function executeRepositoryOperations() {
$repository = $this->getRepository(); $repository = $this->getRepository();
$viewer = $this->getViewer();
// This is a write, and must have write access. // This is a write, and must have write access.
$this->requireWriteAccess(); $this->requireWriteAccess();
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository);
if ($this->shouldProxy()) { if ($this->shouldProxy()) {
$command = $this->getProxyCommand(); $command = $this->getProxyCommand();
$did_synchronize = false; $did_synchronize = false;
} else { } else {
$command = csprintf('git-receive-pack %s', $repository->getLocalPath()); $command = csprintf('git-receive-pack %s', $repository->getLocalPath());
$did_synchronize = true; $did_synchronize = true;
$viewer = $this->getUser(); $cluster_engine->synchronizeWorkingCopyBeforeWrite();
$repository->synchronizeWorkingCopyBeforeWrite($viewer);
} }
$caught = null; $caught = null;
@ -40,7 +43,7 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
// We've committed the write (or rejected it), so we can release the lock // We've committed the write (or rejected it), so we can release the lock
// without waiting for the client to receive the acknowledgement. // without waiting for the client to receive the acknowledgement.
if ($did_synchronize) { if ($did_synchronize) {
$repository->synchronizeWorkingCopyAfterWrite(); $cluster_engine->synchronizeWorkingCopyAfterWrite();
} }
if ($caught) { if ($caught) {

View file

@ -15,6 +15,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
protected function executeRepositoryOperations() { protected function executeRepositoryOperations() {
$repository = $this->getRepository(); $repository = $this->getRepository();
$viewer = $this->getUser();
$skip_sync = $this->shouldSkipReadSynchronization(); $skip_sync = $this->shouldSkipReadSynchronization();
@ -23,7 +24,10 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
} else { } else {
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
if (!$skip_sync) { if (!$skip_sync) {
$repository->synchronizeWorkingCopyBeforeRead(); $cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
} }
} }
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);

View file

@ -684,7 +684,10 @@ final class PhabricatorRepositoryEditor
} }
if ($this->getIsNewObject()) { if ($this->getIsNewObject()) {
$object->synchronizeWorkingCopyAfterCreation(); id(new DiffusionRepositoryClusterEngine())
->setViewer($this->getActor())
->setRepository($object)
->synchronizeWorkingCopyAfterCreation();
} }
return $xactions; return $xactions;

View file

@ -23,6 +23,7 @@ final class PhabricatorRepositoryPullEngine
public function pullRepository() { public function pullRepository() {
$repository = $this->getRepository(); $repository = $this->getRepository();
$viewer = PhabricatorUser::getOmnipotentUser();
$is_hg = false; $is_hg = false;
$is_git = false; $is_git = false;
@ -96,7 +97,10 @@ final class PhabricatorRepositoryPullEngine
} }
if ($repository->isHosted()) { if ($repository->isHosted()) {
$repository->synchronizeWorkingCopyBeforeRead(); id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
if ($is_git) { if ($is_git) {
$this->installGitHook(); $this->installGitHook();

View file

@ -68,9 +68,6 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
private $projectPHIDs = self::ATTACHABLE; private $projectPHIDs = self::ATTACHABLE;
private $uris = self::ATTACHABLE; private $uris = self::ATTACHABLE;
private $clusterWriteLock;
private $clusterWriteVersion;
public static function initializeNewRepository(PhabricatorUser $actor) { public static function initializeNewRepository(PhabricatorUser $actor) {
$app = id(new PhabricatorApplicationQuery()) $app = id(new PhabricatorApplicationQuery())
@ -2193,379 +2190,7 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
} }
/* -( Cluster Synchronization )-------------------------------------------- */ public function getClusterRepositoryURIFromBinding(
private function shouldEnableSynchronization() {
$service_phid = $this->getAlmanacServicePHID();
if (!$service_phid) {
return false;
}
// TODO: For now, this is only supported for Git.
if (!$this->isGit()) {
return false;
}
// TODO: It may eventually make sense to try to version and synchronize
// observed repositories (so that daemons don't do reads against out-of
// date hosts), but don't bother for now.
if (!$this->isHosted()) {
return false;
}
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
}
return true;
}
/**
* Synchronize repository version information after creating a repository.
*
* This initializes working copy versions for all currently bound devices to
* 0, so that we don't get stuck making an ambiguous choice about which
* devices are leaders when we later synchronize before a read.
*
* @task sync
*/
public function synchronizeWorkingCopyAfterCreation() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository_phid = $this->getPHID();
$service = $this->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
foreach ($bindings as $binding) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$binding->getDevicePHID(),
0);
}
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeRead() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository_phid = $this->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
$repository_phid,
$device_phid);
// TODO: Raise a more useful exception if we fail to grab this lock.
$read_lock->lock(phutil_units('2 minutes in seconds'));
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($versions) {
// This is the normal case, where we have some version information and
// can identify which nodes are leaders. If the current node is not a
// leader, we want to fetch from a leader and then update our version.
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$max_version);
}
$result_version = $max_version;
} else {
// If no version records exist yet, we need to be careful, because we
// can not tell which nodes are leaders.
// There might be several nodes with arbitrary existing data, and we have
// no way to tell which one has the "right" data. If we pick wrong, we
// might erase some or all of the data in the repository.
// Since this is dangeorus, we refuse to guess unless there is only one
// device. If we're the only device in the group, we obviously must be
// a leader.
$service = $this->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
$device_map = array();
foreach ($bindings as $binding) {
$device_map[$binding->getDevicePHID()] = true;
}
if (count($device_map) > 1) {
throw new Exception(
pht(
'Repository "%s" exists on more than one device, but no device '.
'has any repository version information. Phabricator can not '.
'guess which copy of the existing data is authoritative. Remove '.
'all but one device from service to mark the remaining device '.
'as the authority.',
$this->getDisplayName()));
}
if (empty($device_map[$device->getPHID()])) {
throw new Exception(
pht(
'Repository "%s" is being synchronized on device "%s", but '.
'this device is not bound to the corresponding cluster '.
'service ("%s").',
$this->getDisplayName(),
$device->getName(),
$service->getName()));
}
// The current device is the only device in service, so it must be a
// leader. We can safely have any future nodes which come online read
// from it.
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
$result_version = 0;
}
$read_lock->unlock();
return $result_version;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeWrite(
PhabricatorUser $actor) {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository_phid = $this->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid);
// TODO: Raise a more useful exception if we fail to grab this lock.
$write_lock->lock(phutil_units('2 minutes in seconds'));
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
foreach ($versions as $version) {
if (!$version->getIsWriting()) {
continue;
}
throw new Exception(
pht(
'An previous write to this repository was interrupted; refusing '.
'new writes. This issue resolves operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.'));
}
try {
$max_version = $this->synchronizeWorkingCopyBeforeRead();
} catch (Exception $ex) {
$write_lock->unlock();
throw $ex;
}
PhabricatorRepositoryWorkingCopyVersion::willWrite(
$repository_phid,
$device_phid,
array(
'userPHID' => $actor->getPHID(),
'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid,
));
$this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
if (!$this->clusterWriteLock) {
throw new Exception(
pht(
'Trying to synchronize after write, but not holding a write '.
'lock!'));
}
$repository_phid = $this->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// NOTE: This means we're still bumping the version when pushes fail. We
// could select only un-rejected events instead to bump a little less
// often.
$new_log = id(new PhabricatorRepositoryPushEventQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())
->withRepositoryPHIDs(array($repository_phid))
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion;
if ($new_log) {
$new_version = $new_log->getID();
} else {
$new_version = $old_version;
}
PhabricatorRepositoryWorkingCopyVersion::didWrite(
$repository_phid,
$device_phid,
$this->clusterWriteVersion,
$new_log->getID());
$this->clusterWriteLock->unlock();
$this->clusterWriteLock = null;
}
/**
* @task sync
*/
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
$service = $this->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$device_map = array_fuse($device_phids);
$bindings = $service->getActiveBindings();
$fetchable = array();
foreach ($bindings as $binding) {
// We can't fetch from nodes which don't have the newest version.
$device_phid = $binding->getDevicePHID();
if (empty($device_map[$device_phid])) {
continue;
}
// TODO: For now, only fetch over SSH. We could support fetching over
// HTTP eventually.
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
continue;
}
$fetchable[] = $binding;
}
if (!$fetchable) {
throw new Exception(
pht(
'Leader lost: no up-to-date nodes in repository cluster are '.
'fetchable.'));
}
$caught = null;
foreach ($fetchable as $binding) {
try {
$this->synchronizeWorkingCopyFromBinding($binding);
$caught = null;
break;
} catch (Exception $ex) {
$caught = $ex;
}
}
if ($caught) {
throw $caught;
}
}
private function synchronizeWorkingCopyFromBinding($binding) {
$fetch_uri = $this->getClusterRepositoryURIFromBinding($binding);
$local_path = $this->getLocalPath();
if ($this->isGit()) {
if (!Filesystem::pathExists($local_path)) {
$device = AlmanacKeys::getLiveDevice();
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$this->getDisplayName(),
$this->getMonogram(),
$device->getName()));
}
$argv = array(
'fetch --prune -- %s %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Binding sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($this)
->setArgv($argv)
->setConnectAsDevice(true)
->setSudoAsDaemon(true)
->setProtocol($fetch_uri->getProtocol())
->newFuture();
$future->setCWD($local_path);
$future->resolvex();
}
private function getClusterRepositoryURIFromBinding(
AlmanacBinding $binding) { AlmanacBinding $binding) {
$protocol = $binding->getAlmanacPropertyValue('protocol'); $protocol = $binding->getAlmanacPropertyValue('protocol');
if ($protocol === null) { if ($protocol === null) {
@ -2613,8 +2238,6 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
} }
/* -( Symbols )-------------------------------------------------------------*/ /* -( Symbols )-------------------------------------------------------------*/
public function getSymbolSources() { public function getSymbolSources() {