mirror of
https://we.phorge.it/source/phorge.git
synced 2024-12-20 04:20:55 +01:00
Make cluster repositories more chatty
Summary: Ref T10860. At least in Git over SSH, we can freely echo a bunch of stuff to stderr and Git will print it to the console, so we can tell users what's going on. This should make debugging, etc., easier. We could tone this down a little bit once things are more stable if it's a little too chatty. Test Plan: ``` $ echo D >> record && git commit -am D && git push [master ca5efff] D 1 file changed, 1 insertion(+) # Push received by "local001.phacility.net", forwarding to cluster host. # Waiting up to 120 second(s) for a cluster write lock... # Acquired write lock immediately. # Waiting up to 120 second(s) for a cluster read lock on "local001.phacility.net"... # Acquired read lock immediately. # Device "local001.phacility.net" is already a cluster leader and does not need to be synchronized. # Ready to receive on cluster host "local001.phacility.net". Counting objects: 3, done. Delta compression using up to 8 threads. Compressing objects: 100% (2/2), done. Writing objects: 100% (3/3), 256 bytes | 0 bytes/s, done. Total 3 (delta 1), reused 0 (delta 0) To ssh://local@localvault.phacility.com/diffusion/26/locktopia.git 8616189..ca5efff master -> master ``` Reviewers: chad Reviewed By: chad Maniphest Tasks: T10860 Differential Revision: https://secure.phabricator.com/D15791
This commit is contained in:
parent
dc75b4bd06
commit
d0b5dac36b
6 changed files with 161 additions and 9 deletions
|
@ -746,6 +746,7 @@ phutil_register_library_map(array(
|
|||
'DiffusionRepositoryBasicsManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryBasicsManagementPanel.php',
|
||||
'DiffusionRepositoryByIDRemarkupRule' => 'applications/diffusion/remarkup/DiffusionRepositoryByIDRemarkupRule.php',
|
||||
'DiffusionRepositoryClusterEngine' => 'applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php',
|
||||
'DiffusionRepositoryClusterEngineLogInterface' => 'applications/diffusion/protocol/DiffusionRepositoryClusterEngineLogInterface.php',
|
||||
'DiffusionRepositoryClusterManagementPanel' => 'applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php',
|
||||
'DiffusionRepositoryController' => 'applications/diffusion/controller/DiffusionRepositoryController.php',
|
||||
'DiffusionRepositoryCreateController' => 'applications/diffusion/controller/DiffusionRepositoryCreateController.php',
|
||||
|
@ -4854,7 +4855,10 @@ phutil_register_library_map(array(
|
|||
'DiffusionGitReceivePackSSHWorkflow' => 'DiffusionGitSSHWorkflow',
|
||||
'DiffusionGitRequest' => 'DiffusionRequest',
|
||||
'DiffusionGitResponse' => 'AphrontResponse',
|
||||
'DiffusionGitSSHWorkflow' => 'DiffusionSSHWorkflow',
|
||||
'DiffusionGitSSHWorkflow' => array(
|
||||
'DiffusionSSHWorkflow',
|
||||
'DiffusionRepositoryClusterEngineLogInterface',
|
||||
),
|
||||
'DiffusionGitUploadPackSSHWorkflow' => 'DiffusionGitSSHWorkflow',
|
||||
'DiffusionHistoryController' => 'DiffusionController',
|
||||
'DiffusionHistoryQueryConduitAPIMethod' => 'DiffusionQueryConduitAPIMethod',
|
||||
|
|
|
@ -13,6 +13,7 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
private $viewer;
|
||||
private $clusterWriteLock;
|
||||
private $clusterWriteVersion;
|
||||
private $logger;
|
||||
|
||||
|
||||
/* -( Configuring Synchronization )---------------------------------------- */
|
||||
|
@ -36,6 +37,11 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
return $this->viewer;
|
||||
}
|
||||
|
||||
public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
|
||||
$this->logger = $log;
|
||||
return $this;
|
||||
}
|
||||
|
||||
|
||||
/* -( Cluster Synchronization )-------------------------------------------- */
|
||||
|
||||
|
@ -92,8 +98,36 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
$repository_phid,
|
||||
$device_phid);
|
||||
|
||||
// TODO: Raise a more useful exception if we fail to grab this lock.
|
||||
$read_lock->lock(phutil_units('2 minutes in seconds'));
|
||||
$lock_wait = phutil_units('2 minutes in seconds');
|
||||
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Waiting up to %s second(s) for a cluster read lock on "%s"...',
|
||||
new PhutilNumber($lock_wait),
|
||||
$device->getName()));
|
||||
|
||||
try {
|
||||
$start = PhabricatorTime::getNow();
|
||||
$read_lock->lock($lock_wait);
|
||||
$waited = (PhabricatorTime::getNow() - $start);
|
||||
|
||||
if ($waited) {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Acquired read lock after %s second(s).',
|
||||
new PhutilNumber($waited)));
|
||||
} else {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Acquired read lock immediately.'));
|
||||
}
|
||||
} catch (Exception $ex) {
|
||||
throw new PhutilProxyException(
|
||||
pht(
|
||||
'Failed to acquire read lock after waiting %s second(s). You '.
|
||||
'may be able to retry later.',
|
||||
new PhutilNumber($lock_wait)));
|
||||
}
|
||||
|
||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
|
||||
$repository_phid);
|
||||
|
@ -126,6 +160,12 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
$repository_phid,
|
||||
$device_phid,
|
||||
$max_version);
|
||||
} else {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Device "%s" is already a cluster leader and does not need '.
|
||||
'to be synchronized.',
|
||||
$device->getName()));
|
||||
}
|
||||
|
||||
$result_version = $max_version;
|
||||
|
@ -210,8 +250,35 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
|
||||
$repository_phid);
|
||||
|
||||
// TODO: Raise a more useful exception if we fail to grab this lock.
|
||||
$write_lock->lock(phutil_units('2 minutes in seconds'));
|
||||
$lock_wait = phutil_units('2 minutes in seconds');
|
||||
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Waiting up to %s second(s) for a cluster write lock...',
|
||||
new PhutilNumber($lock_wait)));
|
||||
|
||||
try {
|
||||
$start = PhabricatorTime::getNow();
|
||||
$write_lock->lock($lock_wait);
|
||||
$waited = (PhabricatorTime::getNow() - $start);
|
||||
|
||||
if ($waited) {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Acquired write lock after %s second(s).',
|
||||
new PhutilNumber($waited)));
|
||||
} else {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Acquired write lock immediately.'));
|
||||
}
|
||||
} catch (Exception $ex) {
|
||||
throw new PhutilProxyException(
|
||||
pht(
|
||||
'Failed to acquire write lock after waiting %s second(s). You '.
|
||||
'may be able to retry later.',
|
||||
new PhutilNumber($lock_wait)));
|
||||
}
|
||||
|
||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
|
||||
$repository_phid);
|
||||
|
@ -393,13 +460,20 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
*/
|
||||
private function synchronizeWorkingCopyFromBinding($binding) {
|
||||
$repository = $this->getRepository();
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Synchronizing this device ("%s") from cluster leader ("%s") before '.
|
||||
'read.',
|
||||
$device->getName(),
|
||||
$binding->getDevice()->getName()));
|
||||
|
||||
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
|
||||
$local_path = $repository->getLocalPath();
|
||||
|
||||
if ($repository->isGit()) {
|
||||
if (!Filesystem::pathExists($local_path)) {
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
throw new Exception(
|
||||
pht(
|
||||
'Repository "%s" does not have a working copy on this device '.
|
||||
|
@ -429,7 +503,36 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
|
|||
|
||||
$future->setCWD($local_path);
|
||||
|
||||
$future->resolvex();
|
||||
try {
|
||||
$future->resolvex();
|
||||
} catch (Exception $ex) {
|
||||
$this->logLine(
|
||||
pht(
|
||||
'Synchronization of "%s" from leader "%s" failed: %s',
|
||||
$device->getName(),
|
||||
$binding->getDevice()->getName(),
|
||||
$ex->getMessage()));
|
||||
throw $ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @task internal
|
||||
*/
|
||||
private function logLine($message) {
|
||||
return $this->logText("# {$message}\n");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @task internal
|
||||
*/
|
||||
private function logText($message) {
|
||||
$log = $this->logger;
|
||||
if ($log) {
|
||||
$log->writeClusterEngineLogMessage($message);
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
<?php
|
||||
|
||||
interface DiffusionRepositoryClusterEngineLogInterface {
|
||||
|
||||
public function writeClusterEngineLogMessage($message);
|
||||
|
||||
}
|
|
@ -16,21 +16,37 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
protected function executeRepositoryOperations() {
|
||||
$repository = $this->getRepository();
|
||||
$viewer = $this->getViewer();
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
|
||||
// This is a write, and must have write access.
|
||||
$this->requireWriteAccess();
|
||||
|
||||
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
|
||||
->setViewer($viewer)
|
||||
->setRepository($repository);
|
||||
->setRepository($repository)
|
||||
->setLog($this);
|
||||
|
||||
if ($this->shouldProxy()) {
|
||||
$command = $this->getProxyCommand();
|
||||
$did_synchronize = false;
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Push received by \"%s\", forwarding to cluster host.\n",
|
||||
$device->getName()));
|
||||
}
|
||||
} else {
|
||||
$command = csprintf('git-receive-pack %s', $repository->getLocalPath());
|
||||
$did_synchronize = true;
|
||||
$cluster_engine->synchronizeWorkingCopyBeforeWrite();
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Ready to receive on cluster host \"%s\".\n",
|
||||
$device->getName()));
|
||||
}
|
||||
}
|
||||
|
||||
$caught = null;
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
<?php
|
||||
|
||||
abstract class DiffusionGitSSHWorkflow extends DiffusionSSHWorkflow {
|
||||
abstract class DiffusionGitSSHWorkflow
|
||||
extends DiffusionSSHWorkflow
|
||||
implements DiffusionRepositoryClusterEngineLogInterface {
|
||||
|
||||
protected function writeError($message) {
|
||||
// Git assumes we'll add our own newlines.
|
||||
return parent::writeError($message."\n");
|
||||
}
|
||||
|
||||
public function writeClusterEngineLogMessage($message) {
|
||||
parent::writeError($message);
|
||||
}
|
||||
|
||||
protected function identifyRepository() {
|
||||
$args = $this->getArgs();
|
||||
$path = head($args->getArg('dir'));
|
||||
|
|
|
@ -16,18 +16,34 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
protected function executeRepositoryOperations() {
|
||||
$repository = $this->getRepository();
|
||||
$viewer = $this->getUser();
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
|
||||
$skip_sync = $this->shouldSkipReadSynchronization();
|
||||
|
||||
if ($this->shouldProxy()) {
|
||||
$command = $this->getProxyCommand();
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Fetch received by \"%s\", forwarding to cluster host.\n",
|
||||
$device->getName()));
|
||||
}
|
||||
} else {
|
||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
|
||||
if (!$skip_sync) {
|
||||
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
|
||||
->setViewer($viewer)
|
||||
->setRepository($repository)
|
||||
->setLog($this)
|
||||
->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Cleared to fetch on cluster host \"%s\".\n",
|
||||
$device->getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
|
Loading…
Reference in a new issue