diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php index d9cc8063d5..d8d0116017 100644 --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -8,6 +8,8 @@ abstract class DiffusionGitSSHWorkflow private $protocolLog; private $wireProtocol; + private $ioBytesRead = 0; + private $ioBytesWritten = 0; protected function writeError($message) { // Git assumes we'll add our own newlines. @@ -98,6 +100,8 @@ abstract class DiffusionGitSSHWorkflow PhabricatorSSHPassthruCommand $command, $message) { + $this->ioBytesWritten += strlen($message); + $log = $this->getProtocolLog(); if ($log) { $log->didWriteBytes($message); @@ -125,7 +129,21 @@ abstract class DiffusionGitSSHWorkflow $message = $protocol->willReadBytes($message); } + // Note that bytes aren't counted until they're emittted by the protocol + // layer. This means the underlying command might emit bytes, but if they + // are buffered by the protocol layer they won't count as read bytes yet. + + $this->ioBytesRead += strlen($message); + return $message; } + final protected function getIOBytesRead() { + return $this->ioBytesRead; + } + + final protected function getIOBytesWritten() { + return $this->ioBytesWritten; + } + } diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php index 7e1f4a4f33..3e8186190a 100644 --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -1,6 +1,10 @@ setName('git-upload-pack'); @@ -14,39 +18,33 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { } protected function executeRepositoryOperations() { - $repository = $this->getRepository(); + $is_proxy = $this->shouldProxy(); + if ($is_proxy) { + return $this->executeRepositoryProxyOperations(); + } + $viewer = $this->getSSHUser(); + $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $skip_sync = $this->shouldSkipReadSynchronization(); - $is_proxy = $this->shouldProxy(); - if ($is_proxy) { - $command = $this->getProxyCommand(false); + $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); + if (!$skip_sync) { + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->setLog($this) + ->synchronizeWorkingCopyBeforeRead(); if ($device) { $this->writeClusterEngineLogMessage( pht( - "# Fetch received by \"%s\", forwarding to cluster host.\n", + "# Cleared to fetch on cluster host \"%s\".\n", $device->getName())); } - } else { - $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); - if (!$skip_sync) { - $cluster_engine = id(new DiffusionRepositoryClusterEngine()) - ->setViewer($viewer) - ->setRepository($repository) - ->setLog($this) - ->synchronizeWorkingCopyBeforeRead(); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Cleared to fetch on cluster host \"%s\".\n", - $device->getName())); - } - } } + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); $pull_event = $this->newPullEvent(); @@ -60,14 +58,12 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { $log->didStartSession($command); } - if (!$is_proxy) { - if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { - $protocol = new DiffusionGitUploadPackWireProtocol(); - if ($log) { - $protocol->setProtocolLog($log); - } - $this->setWireProtocol($protocol); + if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { + $protocol = new DiffusionGitUploadPackWireProtocol(); + if ($log) { + $protocol->setProtocolLog($log); } + $this->setWireProtocol($protocol); } $err = $this->newPassthruCommand() @@ -89,15 +85,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { ->setResultCode(0); } - // TODO: Currently, when proxying, we do not write a log on the proxy. - // Perhaps we should write a "proxy log". This is not very useful for - // statistics or auditing, but could be useful for diagnostics. Marking - // the proxy logs as proxied (and recording devicePHID on all logs) would - // make differentiating between these use cases easier. - - if (!$is_proxy) { - $pull_event->save(); - } + $pull_event->save(); if (!$err) { $this->waitForGitClient(); @@ -106,4 +94,123 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { return $err; } + private function executeRepositoryProxyOperations() { + $device = AlmanacKeys::getLiveDevice(); + $for_write = false; + + $refs = $this->getAlmanacServiceRefs($for_write); + $err = 1; + + while (true) { + $ref = head($refs); + + $command = $this->getProxyCommandForServiceRef($ref); + + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n", + $device->getName(), + $ref->getDeviceName())); + } + + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); + + $future = id(new ExecFuture('%C', $command)) + ->setEnv($this->getEnvironment()); + + $this->didBeginRequest(); + + $err = $this->newPassthruCommand() + ->setIOChannel($this->getIOChannel()) + ->setCommandChannelFromExecFuture($future) + ->execute(); + + $err = 1; + + // TODO: Currently, when proxying, we do not write an event log on the + // proxy. Perhaps we should write a "proxy log". This is not very useful + // for statistics or auditing, but could be useful for diagnostics. + // Marking the proxy logs as proxied (and recording devicePHID on all + // logs) would make differentiating between these use cases easier. + + if (!$err) { + $this->waitForGitClient(); + return $err; + } + + // Throw away this service: the request failed and we're treating the + // failure as persistent, so we don't want to retry another request to + // the same host. + array_shift($refs); + + // Check if we have more services we can try. If we do, we'll make an + // effort to fall back to them below. If not, we can't do anything to + // recover so just bail out. + if (!$refs) { + return $err; + } + + $should_retry = $this->shouldRetryRequest(); + if (!$should_retry) { + return $err; + } + + // If we haven't bailed out yet, we'll retry the request with the next + // service. + } + + throw new Exception(pht('Reached an unreachable place.')); + } + + private function didBeginRequest() { + $this->requestAttempts++; + return $this; + } + + private function shouldRetryRequest() { + $this->requestFailures++; + + if ($this->requestFailures > $this->requestAttempts) { + throw new Exception( + pht( + "Workflow has recorded more failures than attempts; there is a ". + "missing call to \"didBeginRequest()\".\n")); + } + + $max_failures = 3; + if ($this->requestFailures >= $max_failures) { + $this->writeClusterEngineLogMessage( + pht( + "# Reached maximum number of retry attempts, giving up.\n")); + return false; + } + + $read_len = $this->getIOBytesRead(); + if ($read_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already read from service (%s bytes), unable to retry.\n", + new PhutilNumber($read_len))); + return false; + } + + $write_len = $this->getIOBytesWritten(); + if ($write_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already wrote to service (%s bytes), unable to retry.\n", + new PhutilNumber($write_len))); + return false; + } + + $this->writeClusterEngineLogMessage( + pht( + "# Service request failed, retrying (making attempt %s of %s).\n", + new PhutilNumber($this->requestAttempts + 1), + new PhutilNumber($max_failures))); + + return true; + } + }