1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-09-20 01:08:50 +02:00

Version clustered, observed repositories in a reasonable way (by largest discovered HEAD)

Summary:
Ref T4292. For hosted, clustered repositories we have a good way to increment the internal version of the repository: every time a user pushes something, we increment the version by 1.

We don't have a great way to do this for observed/remote repositories because when we `git fetch` we might get nothing, or we might get some changes, and we can't easily tell //what// changes we got.

For example, if we see that another node is at "version 97", and we do a fetch and see some changes, we don't know if we're in sync with them (i.e., also at "version 97") or ahead of them (at "version 98").

This implements a simple way to version an observed repository:

  - Take the head of every branch/tag.
  - Look them up.
  - Pick the biggest internal ID number.

This will work //except// when branches are deleted, which could cause the version to go backward if the "biggest commit" is the one that was deleted. This should be OK, since it's rare and the effects are minor and the repository will "self-heal" on the next actual push.

Test Plan:
  - Created an observed repository.
  - Ran `bin/repository update` and observed a sensible version number appear in the version table.
  - Pushed to the remote, did another update, saw a sensible update.
  - Did an update with no push, saw no effect on version number.
  - Toggled repository to hosted, saw the version reset.
  - Simulated read traffic to out-of-sync node, saw it do a remote fetch.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T4292

Differential Revision: https://secure.phabricator.com/D15986
This commit is contained in:
epriestley 2016-05-27 06:21:19 -07:00
parent e81637a6c6
commit f5f784f4c1
10 changed files with 316 additions and 70 deletions

View file

@ -56,7 +56,10 @@ final class DiffusionBranchQueryConduitAPIMethod
} else {
$refs = id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsOriginBranch(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_BRANCH,
))
->execute();
}

View file

@ -72,7 +72,10 @@ final class DiffusionTagsQueryConduitAPIMethod
$refs = id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsTag(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_TAG,
))
->execute();
$tags = array();

View file

@ -463,6 +463,8 @@ final class DiffusionURIEditor
break;
}
$was_hosted = $repository->isHosted();
if ($observe_uri) {
$repository
->setHosted(false)
@ -477,6 +479,17 @@ final class DiffusionURIEditor
$repository->save();
$is_hosted = $repository->isHosted();
// If we've swapped the repository from hosted to observed or vice versa,
// reset all the cluster version clocks.
if ($was_hosted != $is_hosted) {
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($this->getActor())
->setRepository($repository)
->synchronizeWorkingCopyAfterHostingChange();
}
return $xactions;
}

View file

@ -137,12 +137,28 @@ final class DiffusionRepositoryStorageManagementPanel
$version = idx($versions, $device->getPHID());
if ($version) {
$version_number = $version->getRepositoryVersion();
$version_number = phutil_tag(
'a',
array(
'href' => "/diffusion/pushlog/view/{$version_number}/",
),
$version_number);
$href = null;
if ($repository->isHosted()) {
$href = "/diffusion/pushlog/view/{$version_number}/";
} else {
$commit = id(new DiffusionCommitQuery())
->setViewer($viewer)
->withIDs(array($version_number))
->executeOne();
if ($commit) {
$href = $commit->getURI();
}
}
if ($href) {
$version_number = phutil_tag(
'a',
array(
'href' => $href,
),
$version_number);
}
} else {
$version_number = '-';
}

View file

@ -82,6 +82,53 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterHostingChange() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
// After converting a hosted repository to observed, or vice versa, we
// need to reset version numbers because the clocks for observed and hosted
// repositories run on different units.
// We identify all the cluster leaders and reset their version to 0.
// We identify all the cluster followers and demote them.
// This allows the cluter to start over again at version 0 but keep the
// same leaders.
if ($versions) {
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
foreach ($versions as $version) {
$device_phid = $version->getDevicePHID();
if ($version->getRepositoryVersion() == $max_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
} else {
PhabricatorRepositoryWorkingCopyVersion::demoteDevice(
$repository_phid,
$device_phid);
}
}
}
return $this;
}
/**
* @task sync
*/
@ -149,14 +196,18 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
if ($repository->isHosted()) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
$this->synchronizeWorkingCopyFromDevices($fetchable);
} else {
$this->synchornizeWorkingCopyFromRemote();
}
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
@ -329,6 +380,47 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
}
public function synchronizeWorkingCopyAfterDiscovery($new_version) {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
if ($repository->isHosted()) {
return;
}
$viewer = $this->getViewer();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// NOTE: We are not holding a lock here because this method is only called
// from PhabricatorRepositoryDiscoveryEngine, which already holds a device
// lock. Even if we do race here and record an older version, the
// consequences are mild: we only do extra work to correct it later.
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($new_version > $this_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$new_version);
}
}
/**
* @task sync
*/
@ -471,13 +563,6 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
return false;
}
// TODO: It may eventually make sense to try to version and synchronize
// observed repositories (so that daemons don't do reads against out-of
// date hosts), but don't bother for now.
if (!$repository->isHosted()) {
return false;
}
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
@ -487,6 +572,50 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
}
/**
* @task internal
*/
private function synchornizeWorkingCopyFromRemote() {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$local_path = $repository->getLocalPath();
$fetch_uri = $repository->getRemoteURIEnvelope();
if ($repository->isGit()) {
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %P %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Remote sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setSudoAsDaemon(true)
->setCredentialPHID($repository->getCredentialPHID())
->setProtocol($repository->getRemoteProtocol())
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from remote failed: %s',
$device->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
@ -560,17 +689,7 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
$local_path = $repository->getLocalPath();
if ($repository->isGit()) {
if (!Filesystem::pathExists($local_path)) {
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %s %s',
@ -622,4 +741,24 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
}
return $this;
}
private function requireWorkingCopy() {
$repository = $this->getRepository();
$local_path = $repository->getLocalPath();
if (!Filesystem::pathExists($local_path)) {
$device = AlmanacKeys::getLiveDevice();
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
}
}

View file

@ -6,30 +6,33 @@
*/
final class DiffusionLowLevelGitRefQuery extends DiffusionLowLevelQuery {
private $isTag;
private $isOriginBranch;
private $refTypes;
public function withIsTag($is_tag) {
$this->isTag = $is_tag;
return $this;
}
public function withIsOriginBranch($is_origin_branch) {
$this->isOriginBranch = $is_origin_branch;
public function withRefTypes(array $ref_types) {
$this->refTypes = $ref_types;
return $this;
}
protected function executeQuery() {
$ref_types = $this->refTypes;
if ($ref_types) {
$type_branch = PhabricatorRepositoryRefCursor::TYPE_BRANCH;
$type_tag = PhabricatorRepositoryRefCursor::TYPE_TAG;
$ref_types = array_fuse($ref_types);
$with_branches = isset($ref_types[$type_branch]);
$with_tags = isset($ref_types[$type_tag]);
} else {
$with_branches = true;
$with_tags = true;
}
$repository = $this->getRepository();
$prefixes = array();
$any = ($this->isTag || $this->isOriginBranch);
if (!$any) {
throw new Exception(pht('Specify types of refs to query.'));
}
if ($this->isOriginBranch) {
if ($with_branches) {
if ($repository->isWorkingCopyBare()) {
$prefix = 'refs/heads/';
} else {
@ -39,7 +42,7 @@ final class DiffusionLowLevelGitRefQuery extends DiffusionLowLevelQuery {
$prefixes[] = $prefix;
}
if ($this->isTag) {
if ($with_tags) {
$prefixes[] = 'refs/tags/';
}

View file

@ -66,8 +66,11 @@ final class DiffusionLowLevelResolveRefsQuery
// First, resolve branches and tags.
$ref_map = id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsTag(true)
->withIsOriginBranch(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_BRANCH,
PhabricatorRepositoryRefCursor::TYPE_TAG,
))
->execute();
$ref_map = mgroup($ref_map, 'getShortName');

View file

@ -63,6 +63,7 @@ final class PhabricatorRepositoryDiscoveryEngine
private function discoverCommitsWithLock() {
$repository = $this->getRepository();
$viewer = $this->getViewer();
$vcs = $repository->getVersionControlSystem();
switch ($vcs) {
@ -104,6 +105,14 @@ final class PhabricatorRepositoryDiscoveryEngine
$this->commitCache[$ref->getIdentifier()] = true;
}
$version = $this->getObservedVersion($repository);
if ($version !== null) {
id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyAfterDiscovery($version);
}
return $refs;
}
@ -121,9 +130,15 @@ final class PhabricatorRepositoryDiscoveryEngine
$this->verifyGitOrigin($repository);
}
// TODO: This should also import tags, but some of the logic is still
// branch-specific today.
$branches = id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsOriginBranch(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_BRANCH,
))
->execute();
if (!$branches) {
@ -642,4 +657,49 @@ final class PhabricatorRepositoryDiscoveryEngine
return true;
}
private function getObservedVersion(PhabricatorRepository $repository) {
if ($repository->isHosted()) {
return null;
}
if ($repository->isGit()) {
return $this->getGitObservedVersion($repository);
}
return null;
}
private function getGitObservedVersion(PhabricatorRepository $repository) {
$refs = id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->execute();
if (!$refs) {
return null;
}
// In Git, the observed version is the most recently discovered commit
// at any repository HEAD. It's possible for this to regress temporarily
// if a branch is pushed and then deleted. This is acceptable because it
// doesn't do anything meaningfully bad and will fix itself on the next
// push.
$ref_identifiers = mpull($refs, 'getCommitIdentifier');
$ref_identifiers = array_fuse($ref_identifiers);
$version = queryfx_one(
$repository->establishConnection('w'),
'SELECT MAX(id) version FROM %T WHERE repositoryID = %d
AND commitIdentifier IN (%Ls)',
id(new PhabricatorRepositoryCommit())->getTableName(),
$repository->getID(),
$ref_identifiers);
if (!$version) {
return null;
}
return (int)$version['version'];
}
}

View file

@ -108,27 +108,27 @@ final class PhabricatorRepositoryPullEngine
} else {
$this->executeSubversionCreate();
}
} else {
if (!$repository->isHosted()) {
$this->logPull(
pht(
'Updating the working copy for repository "%s".',
$repository->getDisplayName()));
if ($is_git) {
$this->verifyGitOrigin($repository);
$this->executeGitUpdate();
} else if ($is_hg) {
$this->executeMercurialUpdate();
}
}
id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
if (!$repository->isHosted()) {
$this->logPull(
pht(
'Updating the working copy for repository "%s".',
$repository->getDisplayName()));
if ($is_git) {
$this->verifyGitOrigin($repository);
$this->executeGitUpdate();
} else if ($is_hg) {
$this->executeMercurialUpdate();
}
}
if ($repository->isHosted()) {
id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->synchronizeWorkingCopyBeforeRead();
if ($is_git) {
$this->installGitHook();
} else if ($is_svn) {

View file

@ -452,7 +452,10 @@ final class PhabricatorRepositoryRefEngine
private function loadGitBranchPositions(PhabricatorRepository $repository) {
return id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsOriginBranch(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_BRANCH,
))
->execute();
}
@ -463,7 +466,10 @@ final class PhabricatorRepositoryRefEngine
private function loadGitTagPositions(PhabricatorRepository $repository) {
return id(new DiffusionLowLevelGitRefQuery())
->setRepository($repository)
->withIsTag(true)
->withRefTypes(
array(
PhabricatorRepositoryRefCursor::TYPE_TAG,
))
->execute();
}