mirror of
https://we.phorge.it/source/phorge.git
synced 2025-01-11 07:11:04 +01:00
On Git cluster read failure, retry safe requests
Summary: Depends on D20775. Ref T13286. When a Git read request fails against a cluster and there are other nodes we could safely try, try more nodes. We DO NOT retry the request if: - the client read anything; - the client wrote anything; - or we've already retried several times. Although //some// requests where bytes went over the wire in either direction may be safe to retry, they're rare in practice under Git, and we'd need to puzzle out what state we can safely emit. Since most types of failure result in an outright connection failure and this catches all of them, it's likely to almost always be sufficient in practice. Test Plan: - Started a cluster with one up node and one down node, pulled it. - Half the time, hit the up node and got a clean pull. - Half the time, hit the down node and got a connection failure followed by a retry and a clean pull. - Forced `$err = 1` so even successful attempts would retry. - On hitting the up node, got a "failure" and a decline to retry (bytes already written). - On hitting the down node, got a failure and a real retry. - (Note that, in both cases, "git pull" exits "0" after the valid wire transaction takes place, even though the remote exited non-zero. If the server gave Git everything it asked for, it doesn't seem to care if the server then exited with an error code.) Maniphest Tasks: T13286 Differential Revision: https://secure.phabricator.com/D20776
This commit is contained in:
parent
b6420e0f0a
commit
95fb237ab3
2 changed files with 163 additions and 38 deletions
|
@ -8,6 +8,8 @@ abstract class DiffusionGitSSHWorkflow
|
|||
private $protocolLog;
|
||||
|
||||
private $wireProtocol;
|
||||
private $ioBytesRead = 0;
|
||||
private $ioBytesWritten = 0;
|
||||
|
||||
protected function writeError($message) {
|
||||
// Git assumes we'll add our own newlines.
|
||||
|
@ -98,6 +100,8 @@ abstract class DiffusionGitSSHWorkflow
|
|||
PhabricatorSSHPassthruCommand $command,
|
||||
$message) {
|
||||
|
||||
$this->ioBytesWritten += strlen($message);
|
||||
|
||||
$log = $this->getProtocolLog();
|
||||
if ($log) {
|
||||
$log->didWriteBytes($message);
|
||||
|
@ -125,7 +129,21 @@ abstract class DiffusionGitSSHWorkflow
|
|||
$message = $protocol->willReadBytes($message);
|
||||
}
|
||||
|
||||
// Note that bytes aren't counted until they're emittted by the protocol
|
||||
// layer. This means the underlying command might emit bytes, but if they
|
||||
// are buffered by the protocol layer they won't count as read bytes yet.
|
||||
|
||||
$this->ioBytesRead += strlen($message);
|
||||
|
||||
return $message;
|
||||
}
|
||||
|
||||
final protected function getIOBytesRead() {
|
||||
return $this->ioBytesRead;
|
||||
}
|
||||
|
||||
final protected function getIOBytesWritten() {
|
||||
return $this->ioBytesWritten;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
<?php
|
||||
|
||||
final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
||||
final class DiffusionGitUploadPackSSHWorkflow
|
||||
extends DiffusionGitSSHWorkflow {
|
||||
|
||||
private $requestAttempts = 0;
|
||||
private $requestFailures = 0;
|
||||
|
||||
protected function didConstruct() {
|
||||
$this->setName('git-upload-pack');
|
||||
|
@ -14,39 +18,33 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
}
|
||||
|
||||
protected function executeRepositoryOperations() {
|
||||
$repository = $this->getRepository();
|
||||
$is_proxy = $this->shouldProxy();
|
||||
if ($is_proxy) {
|
||||
return $this->executeRepositoryProxyOperations();
|
||||
}
|
||||
|
||||
$viewer = $this->getSSHUser();
|
||||
$repository = $this->getRepository();
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
|
||||
$skip_sync = $this->shouldSkipReadSynchronization();
|
||||
$is_proxy = $this->shouldProxy();
|
||||
|
||||
if ($is_proxy) {
|
||||
$command = $this->getProxyCommand(false);
|
||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
|
||||
if (!$skip_sync) {
|
||||
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
|
||||
->setViewer($viewer)
|
||||
->setRepository($repository)
|
||||
->setLog($this)
|
||||
->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Fetch received by \"%s\", forwarding to cluster host.\n",
|
||||
"# Cleared to fetch on cluster host \"%s\".\n",
|
||||
$device->getName()));
|
||||
}
|
||||
} else {
|
||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
|
||||
if (!$skip_sync) {
|
||||
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
|
||||
->setViewer($viewer)
|
||||
->setRepository($repository)
|
||||
->setLog($this)
|
||||
->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Cleared to fetch on cluster host \"%s\".\n",
|
||||
$device->getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
$pull_event = $this->newPullEvent();
|
||||
|
@ -60,14 +58,12 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
$log->didStartSession($command);
|
||||
}
|
||||
|
||||
if (!$is_proxy) {
|
||||
if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) {
|
||||
$protocol = new DiffusionGitUploadPackWireProtocol();
|
||||
if ($log) {
|
||||
$protocol->setProtocolLog($log);
|
||||
}
|
||||
$this->setWireProtocol($protocol);
|
||||
if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) {
|
||||
$protocol = new DiffusionGitUploadPackWireProtocol();
|
||||
if ($log) {
|
||||
$protocol->setProtocolLog($log);
|
||||
}
|
||||
$this->setWireProtocol($protocol);
|
||||
}
|
||||
|
||||
$err = $this->newPassthruCommand()
|
||||
|
@ -89,15 +85,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
->setResultCode(0);
|
||||
}
|
||||
|
||||
// TODO: Currently, when proxying, we do not write a log on the proxy.
|
||||
// Perhaps we should write a "proxy log". This is not very useful for
|
||||
// statistics or auditing, but could be useful for diagnostics. Marking
|
||||
// the proxy logs as proxied (and recording devicePHID on all logs) would
|
||||
// make differentiating between these use cases easier.
|
||||
|
||||
if (!$is_proxy) {
|
||||
$pull_event->save();
|
||||
}
|
||||
$pull_event->save();
|
||||
|
||||
if (!$err) {
|
||||
$this->waitForGitClient();
|
||||
|
@ -106,4 +94,123 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
return $err;
|
||||
}
|
||||
|
||||
private function executeRepositoryProxyOperations() {
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
$for_write = false;
|
||||
|
||||
$refs = $this->getAlmanacServiceRefs($for_write);
|
||||
$err = 1;
|
||||
|
||||
while (true) {
|
||||
$ref = head($refs);
|
||||
|
||||
$command = $this->getProxyCommandForServiceRef($ref);
|
||||
|
||||
if ($device) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n",
|
||||
$device->getName(),
|
||||
$ref->getDeviceName()));
|
||||
}
|
||||
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
$future = id(new ExecFuture('%C', $command))
|
||||
->setEnv($this->getEnvironment());
|
||||
|
||||
$this->didBeginRequest();
|
||||
|
||||
$err = $this->newPassthruCommand()
|
||||
->setIOChannel($this->getIOChannel())
|
||||
->setCommandChannelFromExecFuture($future)
|
||||
->execute();
|
||||
|
||||
$err = 1;
|
||||
|
||||
// TODO: Currently, when proxying, we do not write an event log on the
|
||||
// proxy. Perhaps we should write a "proxy log". This is not very useful
|
||||
// for statistics or auditing, but could be useful for diagnostics.
|
||||
// Marking the proxy logs as proxied (and recording devicePHID on all
|
||||
// logs) would make differentiating between these use cases easier.
|
||||
|
||||
if (!$err) {
|
||||
$this->waitForGitClient();
|
||||
return $err;
|
||||
}
|
||||
|
||||
// Throw away this service: the request failed and we're treating the
|
||||
// failure as persistent, so we don't want to retry another request to
|
||||
// the same host.
|
||||
array_shift($refs);
|
||||
|
||||
// Check if we have more services we can try. If we do, we'll make an
|
||||
// effort to fall back to them below. If not, we can't do anything to
|
||||
// recover so just bail out.
|
||||
if (!$refs) {
|
||||
return $err;
|
||||
}
|
||||
|
||||
$should_retry = $this->shouldRetryRequest();
|
||||
if (!$should_retry) {
|
||||
return $err;
|
||||
}
|
||||
|
||||
// If we haven't bailed out yet, we'll retry the request with the next
|
||||
// service.
|
||||
}
|
||||
|
||||
throw new Exception(pht('Reached an unreachable place.'));
|
||||
}
|
||||
|
||||
private function didBeginRequest() {
|
||||
$this->requestAttempts++;
|
||||
return $this;
|
||||
}
|
||||
|
||||
private function shouldRetryRequest() {
|
||||
$this->requestFailures++;
|
||||
|
||||
if ($this->requestFailures > $this->requestAttempts) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
"Workflow has recorded more failures than attempts; there is a ".
|
||||
"missing call to \"didBeginRequest()\".\n"));
|
||||
}
|
||||
|
||||
$max_failures = 3;
|
||||
if ($this->requestFailures >= $max_failures) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Reached maximum number of retry attempts, giving up.\n"));
|
||||
return false;
|
||||
}
|
||||
|
||||
$read_len = $this->getIOBytesRead();
|
||||
if ($read_len) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Client already read from service (%s bytes), unable to retry.\n",
|
||||
new PhutilNumber($read_len)));
|
||||
return false;
|
||||
}
|
||||
|
||||
$write_len = $this->getIOBytesWritten();
|
||||
if ($write_len) {
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Client already wrote to service (%s bytes), unable to retry.\n",
|
||||
new PhutilNumber($write_len)));
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->writeClusterEngineLogMessage(
|
||||
pht(
|
||||
"# Service request failed, retrying (making attempt %s of %s).\n",
|
||||
new PhutilNumber($this->requestAttempts + 1),
|
||||
new PhutilNumber($max_failures)));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue