mirror of
https://we.phorge.it/source/phorge.git
synced 2025-01-22 20:51:10 +01:00
Synchronize (hosted, git, clustered, SSH) repositories prior to reads
Summary: Ref T4292. Before we write or read a hosted, clustered Git repository over SSH, check if another version of the repository exists on another node that is more up-to-date. If such a version does exist, fetch that version first. This allows reads and writes of any node to always act on the most up-to-date code. Test Plan: Faked my way through this and got a fetch via `bin/repository update`; this is difficult to test locally and needs more work before we can put it in production. Reviewers: chad Reviewed By: chad Maniphest Tasks: T4292 Differential Revision: https://secure.phabricator.com/D15757
This commit is contained in:
parent
c70f4815a9
commit
31bc023eff
5 changed files with 178 additions and 34 deletions
|
@ -132,7 +132,7 @@ final class AlmanacServiceViewController
|
|||
->setHideServiceColumn(true);
|
||||
|
||||
$header = id(new PHUIHeaderView())
|
||||
->setHeader(pht('SERVICE BINDINGS'))
|
||||
->setHeader(pht('Service Bindings'))
|
||||
->addActionLink(
|
||||
id(new PHUIButtonView())
|
||||
->setTag('a')
|
||||
|
|
|
@ -16,11 +16,15 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
protected function executeRepositoryOperations() {
|
||||
$repository = $this->getRepository();
|
||||
|
||||
$skip_sync = $this->shouldSkipReadSynchronization();
|
||||
|
||||
if ($this->shouldProxy()) {
|
||||
$command = $this->getProxyCommand();
|
||||
} else {
|
||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
if (!$skip_sync) {
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
}
|
||||
}
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
|
|
|
@ -201,6 +201,14 @@ abstract class DiffusionSSHWorkflow extends PhabricatorSSHWorkflow {
|
|||
$repository = $this->getRepository();
|
||||
$viewer = $this->getUser();
|
||||
|
||||
if ($viewer->isOmnipotent()) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'This request is authenticated as a cluster device, but is '.
|
||||
'performing a write. Writes must be performed with a real '.
|
||||
'user account.'));
|
||||
}
|
||||
|
||||
switch ($repository->getServeOverSSH()) {
|
||||
case PhabricatorRepository::SERVE_READONLY:
|
||||
if ($protocol_command !== null) {
|
||||
|
@ -236,4 +244,18 @@ abstract class DiffusionSSHWorkflow extends PhabricatorSSHWorkflow {
|
|||
return $this->hasWriteAccess;
|
||||
}
|
||||
|
||||
protected function shouldSkipReadSynchronization() {
|
||||
$viewer = $this->getUser();
|
||||
|
||||
// Currently, the only case where devices interact over SSH without
|
||||
// assuming user credentials is when synchronizing before a read. These
|
||||
// synchronizing reads do not themselves need to be synchronized.
|
||||
if ($viewer->isOmnipotent()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -96,6 +96,8 @@ final class PhabricatorRepositoryPullEngine
|
|||
}
|
||||
|
||||
if ($repository->isHosted()) {
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
if ($is_git) {
|
||||
$this->installGitHook();
|
||||
} else if ($is_svn) {
|
||||
|
|
|
@ -1349,7 +1349,7 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
}
|
||||
|
||||
$ssh_user = AlmanacKeys::getClusterSSHUser();
|
||||
if ($ssh_user !== null) {
|
||||
if (strlen($ssh_user)) {
|
||||
$uri->setUser($ssh_user);
|
||||
}
|
||||
|
||||
|
@ -1927,31 +1927,9 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
$never_proxy,
|
||||
array $protocols) {
|
||||
|
||||
$service_phid = $this->getAlmanacServicePHID();
|
||||
if (!$service_phid) {
|
||||
// No service, so this is a local repository.
|
||||
return null;
|
||||
}
|
||||
|
||||
$service = id(new AlmanacServiceQuery())
|
||||
->setViewer(PhabricatorUser::getOmnipotentUser())
|
||||
->withPHIDs(array($service_phid))
|
||||
->needBindings(true)
|
||||
->needProperties(true)
|
||||
->executeOne();
|
||||
$service = $this->loadAlmanacService();
|
||||
if (!$service) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'The Almanac service for this repository is invalid or could not '.
|
||||
'be loaded.'));
|
||||
}
|
||||
|
||||
$service_type = $service->getServiceImplementation();
|
||||
if (!($service_type instanceof AlmanacClusterRepositoryServiceType)) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'The Almanac service for this repository does not have the correct '.
|
||||
'service type.'));
|
||||
return null;
|
||||
}
|
||||
|
||||
$bindings = $service->getBindings();
|
||||
|
@ -1990,16 +1968,14 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
}
|
||||
}
|
||||
|
||||
$protocol = $binding->getAlmanacPropertyValue('protocol');
|
||||
if ($protocol === null) {
|
||||
$protocol = 'https';
|
||||
}
|
||||
$uri = $this->getClusterRepositoryURIFromBinding($binding);
|
||||
|
||||
$protocol = $uri->getProtocol();
|
||||
if (empty($protocol_map[$protocol])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$uris[] = $protocol.'://'.$iface->renderDisplayAddress().'/';
|
||||
$uris[] = $uri;
|
||||
}
|
||||
|
||||
if (!$uris) {
|
||||
|
@ -2226,6 +2202,16 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
return false;
|
||||
}
|
||||
|
||||
$service_phid = $this->getAlmanacServicePHID();
|
||||
if (!$service_phid) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: For now, this is only supported for Git.
|
||||
if (!$this->isGit()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2275,8 +2261,7 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Actualy fetch the newer version from one of the nodes which has
|
||||
// it.
|
||||
$this->synchronizeWorkingCopyFromDevices($fetchable);
|
||||
|
||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
|
||||
$repository_phid,
|
||||
|
@ -2393,6 +2378,137 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* @task sync
|
||||
*/
|
||||
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
|
||||
$service = $this->loadAlmanacService();
|
||||
if (!$service) {
|
||||
throw new Exception(pht('Failed to load repository cluster service.'));
|
||||
}
|
||||
|
||||
$device_map = array_fuse($device_phids);
|
||||
$bindings = $service->getBindings();
|
||||
|
||||
$fetchable = array();
|
||||
foreach ($bindings as $binding) {
|
||||
// We can't fetch from disabled nodes.
|
||||
if ($binding->getIsDisabled()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We can't fetch from nodes which don't have the newest version.
|
||||
$device_phid = $binding->getDevicePHID();
|
||||
if (empty($device_map[$device_phid])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: For now, only fetch over SSH. We could support fetching over
|
||||
// HTTP eventually.
|
||||
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$fetchable[] = $binding;
|
||||
}
|
||||
|
||||
if (!$fetchable) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'Leader lost: no up-to-date nodes in repository cluster are '.
|
||||
'fetchable.'));
|
||||
}
|
||||
|
||||
$caught = null;
|
||||
foreach ($fetchable as $binding) {
|
||||
try {
|
||||
$this->synchronizeWorkingCopyFromBinding($binding);
|
||||
$caught = null;
|
||||
break;
|
||||
} catch (Exception $ex) {
|
||||
$caught = $ex;
|
||||
}
|
||||
}
|
||||
|
||||
if ($caught) {
|
||||
throw $caught;
|
||||
}
|
||||
}
|
||||
|
||||
private function synchronizeWorkingCopyFromBinding($binding) {
|
||||
$fetch_uri = $this->getClusterRepositoryURIFromBinding($binding);
|
||||
|
||||
if ($this->isGit()) {
|
||||
$argv = array(
|
||||
'fetch --prune -- %s %s',
|
||||
$fetch_uri,
|
||||
'+refs/*:refs/*',
|
||||
);
|
||||
} else {
|
||||
throw new Exception(pht('Binding sync only supported for git!'));
|
||||
}
|
||||
|
||||
$future = DiffusionCommandEngine::newCommandEngine($this)
|
||||
->setArgv($argv)
|
||||
->setConnectAsDevice(true)
|
||||
->setProtocol($fetch_uri->getProtocol())
|
||||
->newFuture();
|
||||
|
||||
$future->setCWD($this->getLocalPath());
|
||||
|
||||
$future->resolvex();
|
||||
}
|
||||
|
||||
private function getClusterRepositoryURIFromBinding(
|
||||
AlmanacBinding $binding) {
|
||||
$protocol = $binding->getAlmanacPropertyValue('protocol');
|
||||
if ($protocol === null) {
|
||||
$protocol = 'https';
|
||||
}
|
||||
|
||||
$iface = $binding->getInterface();
|
||||
$address = $iface->renderDisplayAddress();
|
||||
|
||||
$path = $this->getURI();
|
||||
|
||||
return id(new PhutilURI("{$protocol}://{$address}"))
|
||||
->setPath($path);
|
||||
}
|
||||
|
||||
private function loadAlmanacService() {
|
||||
$service_phid = $this->getAlmanacServicePHID();
|
||||
if (!$service_phid) {
|
||||
// No service, so this is a local repository.
|
||||
return null;
|
||||
}
|
||||
|
||||
$service = id(new AlmanacServiceQuery())
|
||||
->setViewer(PhabricatorUser::getOmnipotentUser())
|
||||
->withPHIDs(array($service_phid))
|
||||
->needBindings(true)
|
||||
->needProperties(true)
|
||||
->executeOne();
|
||||
if (!$service) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'The Almanac service for this repository is invalid or could not '.
|
||||
'be loaded.'));
|
||||
}
|
||||
|
||||
$service_type = $service->getServiceImplementation();
|
||||
if (!($service_type instanceof AlmanacClusterRepositoryServiceType)) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'The Almanac service for this repository does not have the correct '.
|
||||
'service type.'));
|
||||
}
|
||||
|
||||
return $service;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* -( Symbols )-------------------------------------------------------------*/
|
||||
|
||||
public function getSymbolSources() {
|
||||
|
|
Loading…
Reference in a new issue