From 8ff3a133c4d7ffc02a47d6fc5fb4dae52b2cd0ab Mon Sep 17 00:00:00 2001 From: epriestley Date: Tue, 3 Sep 2019 11:26:20 -0700 Subject: [PATCH] Generalize repository proxy retry logic to writes Summary: Ref T13286. The current (very safe / conservative) rules for retrying git reads generalize to git writes, so we can use the same ruleset in both cases. Normally, writes converge rapidly to only having good nodes at the head of the list, so this has less impact than the similar change to reads, but it generally improves consistency and allows us to assert that writes which can be served will be served. Test Plan: - In a cluster with an up node and a down node, pushed changes. - Saw a push to the down node fail, retry, and succeed. - Did some pulls, saw appropriate retries and success. - Note that once one write goes through, the node which received the write always ends up at the head of the writable list, so nodes need to be explicitly thawed to reproduce the failure/retry behavior. Maniphest Tasks: T13286 Differential Revision: https://secure.phabricator.com/D20778 --- .../DiffusionGitReceivePackSSHWorkflow.php | 61 ++++------ .../diffusion/ssh/DiffusionGitSSHWorkflow.php | 112 +++++++++++++++++ .../ssh/DiffusionGitUploadPackSSHWorkflow.php | 115 +----------------- 3 files changed, 137 insertions(+), 151 deletions(-) diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php index abf2a4323e..f59a9b58b4 100644 --- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php @@ -14,42 +14,33 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { } protected function executeRepositoryOperations() { + // This is a write, and must have write access. + $this->requireWriteAccess(); + + $is_proxy = $this->shouldProxy(); + if ($is_proxy) { + return $this->executeRepositoryProxyOperations($for_write = true); + } + $host_wait_start = microtime(true); $repository = $this->getRepository(); $viewer = $this->getSSHUser(); $device = AlmanacKeys::getLiveDevice(); - // This is a write, and must have write access. - $this->requireWriteAccess(); - $cluster_engine = id(new DiffusionRepositoryClusterEngine()) ->setViewer($viewer) ->setRepository($repository) ->setLog($this); - $is_proxy = $this->shouldProxy(); - if ($is_proxy) { - $command = $this->getProxyCommand(true); - $did_write = false; + $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); + $cluster_engine->synchronizeWorkingCopyBeforeWrite(); - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Push received by \"%s\", forwarding to cluster host.\n", - $device->getName())); - } - } else { - $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); - $did_write = true; - $cluster_engine->synchronizeWorkingCopyBeforeWrite(); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Ready to receive on cluster host \"%s\".\n", - $device->getName())); - } + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Ready to receive on cluster host \"%s\".\n", + $device->getName())); } $log = $this->newProtocolLog($is_proxy); @@ -71,9 +62,7 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { // We've committed the write (or rejected it), so we can release the lock // without waiting for the client to receive the acknowledgement. - if ($did_write) { - $cluster_engine->synchronizeWorkingCopyAfterWrite(); - } + $cluster_engine->synchronizeWorkingCopyAfterWrite(); if ($caught) { throw $caught; @@ -85,18 +74,16 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { // When a repository is clustered, we reach this cleanup code on both // the proxy and the actual final endpoint node. Don't do more cleanup // or logging than we need to. - if ($did_write) { - $repository->writeStatusMessage( - PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, - PhabricatorRepositoryStatusMessage::CODE_OKAY); + $repository->writeStatusMessage( + PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, + PhabricatorRepositoryStatusMessage::CODE_OKAY); - $host_wait_end = microtime(true); + $host_wait_end = microtime(true); - $this->updatePushLogWithTimingInformation( - $this->getClusterEngineLogProperty('writeWait'), - $this->getClusterEngineLogProperty('readWait'), - ($host_wait_end - $host_wait_start)); - } + $this->updatePushLogWithTimingInformation( + $this->getClusterEngineLogProperty('writeWait'), + $this->getClusterEngineLogProperty('readWait'), + ($host_wait_end - $host_wait_start)); } return $err; diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php index d8d0116017..292741e34d 100644 --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -10,6 +10,8 @@ abstract class DiffusionGitSSHWorkflow private $wireProtocol; private $ioBytesRead = 0; private $ioBytesWritten = 0; + private $requestAttempts = 0; + private $requestFailures = 0; protected function writeError($message) { // Git assumes we'll add our own newlines. @@ -146,4 +148,114 @@ abstract class DiffusionGitSSHWorkflow return $this->ioBytesWritten; } + final protected function executeRepositoryProxyOperations($for_write) { + $device = AlmanacKeys::getLiveDevice(); + + $refs = $this->getAlmanacServiceRefs($for_write); + $err = 1; + + while (true) { + $ref = head($refs); + + $command = $this->getProxyCommandForServiceRef($ref); + + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Request received by \"%s\", forwarding to cluster ". + "host \"%s\".\n", + $device->getName(), + $ref->getDeviceName())); + } + + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); + + $future = id(new ExecFuture('%C', $command)) + ->setEnv($this->getEnvironment()); + + $this->didBeginRequest(); + + $err = $this->newPassthruCommand() + ->setIOChannel($this->getIOChannel()) + ->setCommandChannelFromExecFuture($future) + ->execute(); + + // TODO: Currently, when proxying, we do not write an event log on the + // proxy. Perhaps we should write a "proxy log". This is not very useful + // for statistics or auditing, but could be useful for diagnostics. + // Marking the proxy logs as proxied (and recording devicePHID on all + // logs) would make differentiating between these use cases easier. + + if (!$err) { + $this->waitForGitClient(); + return $err; + } + + // Throw away this service: the request failed and we're treating the + // failure as persistent, so we don't want to retry another request to + // the same host. + array_shift($refs); + + $should_retry = $this->shouldRetryRequest($refs); + if (!$should_retry) { + return $err; + } + + // If we haven't bailed out yet, we'll retry the request with the next + // service. + } + + throw new Exception(pht('Reached an unreachable place.')); + } + + private function didBeginRequest() { + $this->requestAttempts++; + return $this; + } + + private function shouldRetryRequest(array $remaining_refs) { + $this->requestFailures++; + + if ($this->requestFailures > $this->requestAttempts) { + throw new Exception( + pht( + "Workflow has recorded more failures than attempts; there is a ". + "missing call to \"didBeginRequest()\".\n")); + } + + if (!$remaining_refs) { + $this->writeClusterEngineLogMessage( + pht( + "# All available services failed to serve the request, ". + "giving up.\n")); + return false; + } + + $read_len = $this->getIOBytesRead(); + if ($read_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already read from service (%s bytes), unable to retry.\n", + new PhutilNumber($read_len))); + return false; + } + + $write_len = $this->getIOBytesWritten(); + if ($write_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already wrote to service (%s bytes), unable to retry.\n", + new PhutilNumber($write_len))); + return false; + } + + $this->writeClusterEngineLogMessage( + pht( + "# Service request failed, retrying (making attempt %s of %s).\n", + new PhutilNumber($this->requestAttempts + 1), + new PhutilNumber($this->requestAttempts + count($remaining_refs)))); + + return true; + } + } diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php index 5c0e2588b7..57c43b5a12 100644 --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -3,9 +3,6 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { - private $requestAttempts = 0; - private $requestFailures = 0; - protected function didConstruct() { $this->setName('git-upload-pack'); $this->setArguments( @@ -20,7 +17,7 @@ final class DiffusionGitUploadPackSSHWorkflow protected function executeRepositoryOperations() { $is_proxy = $this->shouldProxy(); if ($is_proxy) { - return $this->executeRepositoryProxyOperations(); + return $this->executeRepositoryProxyOperations($for_write = false); } $viewer = $this->getSSHUser(); @@ -94,114 +91,4 @@ final class DiffusionGitUploadPackSSHWorkflow return $err; } - private function executeRepositoryProxyOperations() { - $device = AlmanacKeys::getLiveDevice(); - $for_write = false; - - $refs = $this->getAlmanacServiceRefs($for_write); - $err = 1; - - while (true) { - $ref = head($refs); - - $command = $this->getProxyCommandForServiceRef($ref); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n", - $device->getName(), - $ref->getDeviceName())); - } - - $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); - - $future = id(new ExecFuture('%C', $command)) - ->setEnv($this->getEnvironment()); - - $this->didBeginRequest(); - - $err = $this->newPassthruCommand() - ->setIOChannel($this->getIOChannel()) - ->setCommandChannelFromExecFuture($future) - ->execute(); - - // TODO: Currently, when proxying, we do not write an event log on the - // proxy. Perhaps we should write a "proxy log". This is not very useful - // for statistics or auditing, but could be useful for diagnostics. - // Marking the proxy logs as proxied (and recording devicePHID on all - // logs) would make differentiating between these use cases easier. - - if (!$err) { - $this->waitForGitClient(); - return $err; - } - - // Throw away this service: the request failed and we're treating the - // failure as persistent, so we don't want to retry another request to - // the same host. - array_shift($refs); - - $should_retry = $this->shouldRetryRequest($refs); - if (!$should_retry) { - return $err; - } - - // If we haven't bailed out yet, we'll retry the request with the next - // service. - } - - throw new Exception(pht('Reached an unreachable place.')); - } - - private function didBeginRequest() { - $this->requestAttempts++; - return $this; - } - - private function shouldRetryRequest(array $remaining_refs) { - $this->requestFailures++; - - if ($this->requestFailures > $this->requestAttempts) { - throw new Exception( - pht( - "Workflow has recorded more failures than attempts; there is a ". - "missing call to \"didBeginRequest()\".\n")); - } - - if (!$remaining_refs) { - $this->writeClusterEngineLogMessage( - pht( - "# All available services failed to serve the request, ". - "giving up.\n")); - return false; - } - - $read_len = $this->getIOBytesRead(); - if ($read_len) { - $this->writeClusterEngineLogMessage( - pht( - "# Client already read from service (%s bytes), unable to retry.\n", - new PhutilNumber($read_len))); - return false; - } - - $write_len = $this->getIOBytesWritten(); - if ($write_len) { - $this->writeClusterEngineLogMessage( - pht( - "# Client already wrote to service (%s bytes), unable to retry.\n", - new PhutilNumber($write_len))); - return false; - } - - $this->writeClusterEngineLogMessage( - pht( - "# Service request failed, retrying (making attempt %s of %s).\n", - new PhutilNumber($this->requestAttempts + 1), - new PhutilNumber($this->requestAttempts + count($remaining_refs)))); - - return true; - } - }