mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-10 00:42:41 +01:00
Synchronize (hosted, clustered, Git) repositories over Conduit + HTTP
Summary: Ref T4292. We currently synchronize hosted, clustered, Git repositories when we receive an SSH pull or push. Additionally: - Synchronize before HTTP reads and writes. - Synchronize reads before Conduit requests. We could relax Conduit eventually and allow Diffusion to say "it's OK to give me stale data". We could also redirect some set of these actions to just go to the up-to-date host instead of connecting to a random host and synchronizing it. However, this potentially won't work as well at scale: if you have a larger number of servers, it sends all of the traffic to the leader immediately following a write. That can cause "thundering herd" issues, and isn't efficient if replicas are in different geographical regions and the write just went to the east coast but most clients are on the west coast. In large-scale cases, it's better to go to the local replica, wait for an update, then serve traffic from it -- particularly given that writes are relatively rare. But we can finesse this later once things are solid. Test Plan: - Pushed and pulled a Git repository over HTTP. - Browsed a Git repository from the web UI. Reviewers: chad Reviewed By: chad Maniphest Tasks: T4292 Differential Revision: https://secure.phabricator.com/D15758
This commit is contained in:
parent
31bc023eff
commit
d87c500002
4 changed files with 68 additions and 9 deletions
|
@ -145,6 +145,12 @@ abstract class DiffusionQueryConduitAPIMethod
|
|||
|
||||
$this->setDiffusionRequest($drequest);
|
||||
|
||||
// TODO: Allow web UI queries opt out of this if they don't care about
|
||||
// fetching the most up-to-date data? Synchronization can be slow, and a
|
||||
// lot of web reads are probably fine if they're a few seconds out of
|
||||
// date.
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
return $this->getResult($request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -538,10 +538,35 @@ final class DiffusionServeController extends DiffusionController {
|
|||
$command = csprintf('%s', $bin);
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
list($err, $stdout, $stderr) = id(new ExecFuture('%C', $command))
|
||||
->setEnv($env, true)
|
||||
->write($input)
|
||||
->resolve();
|
||||
$unguarded = AphrontWriteGuard::beginScopedUnguardedWrites();
|
||||
|
||||
$did_write_lock = false;
|
||||
if ($this->isReadOnlyRequest($repository)) {
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
} else {
|
||||
$did_write_lock = true;
|
||||
$repository->synchronizeWorkingCopyBeforeWrite($viewer);
|
||||
}
|
||||
|
||||
$caught = null;
|
||||
try {
|
||||
list($err, $stdout, $stderr) = id(new ExecFuture('%C', $command))
|
||||
->setEnv($env, true)
|
||||
->write($input)
|
||||
->resolve();
|
||||
} catch (Exception $ex) {
|
||||
$caught = $ex;
|
||||
}
|
||||
|
||||
if ($did_write_lock) {
|
||||
$repository->synchronizeWorkingCopyAfterWrite();
|
||||
}
|
||||
|
||||
unset($unguarded);
|
||||
|
||||
if ($caught) {
|
||||
throw $caught;
|
||||
}
|
||||
|
||||
if ($err) {
|
||||
if ($this->isValidGitShallowCloneResponse($stdout, $stderr)) {
|
||||
|
|
|
@ -8,6 +8,7 @@ abstract class DiffusionCommandEngine extends Phobject {
|
|||
private $argv;
|
||||
private $passthru;
|
||||
private $connectAsDevice;
|
||||
private $sudoAsDaemon;
|
||||
|
||||
public static function newCommandEngine(PhabricatorRepository $repository) {
|
||||
$engines = self::newCommandEngines();
|
||||
|
@ -92,10 +93,25 @@ abstract class DiffusionCommandEngine extends Phobject {
|
|||
return $this->connectAsDevice;
|
||||
}
|
||||
|
||||
public function setSudoAsDaemon($sudo_as_daemon) {
|
||||
$this->sudoAsDaemon = $sudo_as_daemon;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getSudoAsDaemon() {
|
||||
return $this->sudoAsDaemon;
|
||||
}
|
||||
|
||||
public function newFuture() {
|
||||
$argv = $this->newCommandArgv();
|
||||
$env = $this->newCommandEnvironment();
|
||||
|
||||
if ($this->getSudoAsDaemon()) {
|
||||
$command = call_user_func_array('csprintf', $argv);
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
$argv = array('%C', $command);
|
||||
}
|
||||
|
||||
if ($this->getPassthru()) {
|
||||
$future = newv('PhutilExecPassthru', $argv);
|
||||
} else {
|
||||
|
|
|
@ -1954,6 +1954,10 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
|
||||
$uris = array();
|
||||
foreach ($bindings as $binding) {
|
||||
if ($binding->getIsDisabled()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$iface = $binding->getInterface();
|
||||
|
||||
// If we're never proxying this and it's locally satisfiable, return
|
||||
|
@ -2197,11 +2201,6 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
|
||||
|
||||
private function shouldEnableSynchronization() {
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
if (!$device) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$service_phid = $this->getAlmanacServicePHID();
|
||||
if (!$service_phid) {
|
||||
return false;
|
||||
|
@ -2212,6 +2211,18 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
return false;
|
||||
}
|
||||
|
||||
// TODO: It may eventually make sense to try to version and synchronize
|
||||
// observed repositories (so that daemons don't do reads against out-of
|
||||
// date hosts), but don't bother for now.
|
||||
if (!$this->isHosted()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
if (!$device) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2451,6 +2462,7 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
$future = DiffusionCommandEngine::newCommandEngine($this)
|
||||
->setArgv($argv)
|
||||
->setConnectAsDevice(true)
|
||||
->setSudoAsDaemon(true)
|
||||
->setProtocol($fetch_uri->getProtocol())
|
||||
->newFuture();
|
||||
|
||||
|
|
Loading…
Reference in a new issue