1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-11-29 10:12:41 +01:00

Generalize repository proxy retry logic to writes

Summary:
Ref T13286. The current (very safe / conservative) rules for retrying git reads generalize to git writes, so we can use the same ruleset in both cases.

Normally, writes converge rapidly to only having good nodes at the head of the list, so this has less impact than the similar change to reads, but it generally improves consistency and allows us to assert that writes which can be served will be served.

Test Plan:
  - In a cluster with an up node and a down node, pushed changes.
  - Saw a push to the down node fail, retry, and succeed.
  - Did some pulls, saw appropriate retries and success.
  - Note that once one write goes through, the node which received the write always ends up at the head of the writable list, so nodes need to be explicitly thawed to reproduce the failure/retry behavior.

Maniphest Tasks: T13286

Differential Revision: https://secure.phabricator.com/D20778
This commit is contained in:
epriestley 2019-09-03 11:26:20 -07:00
parent ff3d1769b4
commit 8ff3a133c4
3 changed files with 137 additions and 151 deletions

View file

@ -14,34 +14,26 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
} }
protected function executeRepositoryOperations() { protected function executeRepositoryOperations() {
// This is a write, and must have write access.
$this->requireWriteAccess();
$is_proxy = $this->shouldProxy();
if ($is_proxy) {
return $this->executeRepositoryProxyOperations($for_write = true);
}
$host_wait_start = microtime(true); $host_wait_start = microtime(true);
$repository = $this->getRepository(); $repository = $this->getRepository();
$viewer = $this->getSSHUser(); $viewer = $this->getSSHUser();
$device = AlmanacKeys::getLiveDevice(); $device = AlmanacKeys::getLiveDevice();
// This is a write, and must have write access.
$this->requireWriteAccess();
$cluster_engine = id(new DiffusionRepositoryClusterEngine()) $cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer) ->setViewer($viewer)
->setRepository($repository) ->setRepository($repository)
->setLog($this); ->setLog($this);
$is_proxy = $this->shouldProxy();
if ($is_proxy) {
$command = $this->getProxyCommand(true);
$did_write = false;
if ($device) {
$this->writeClusterEngineLogMessage(
pht(
"# Push received by \"%s\", forwarding to cluster host.\n",
$device->getName()));
}
} else {
$command = csprintf('git-receive-pack %s', $repository->getLocalPath()); $command = csprintf('git-receive-pack %s', $repository->getLocalPath());
$did_write = true;
$cluster_engine->synchronizeWorkingCopyBeforeWrite(); $cluster_engine->synchronizeWorkingCopyBeforeWrite();
if ($device) { if ($device) {
@ -50,7 +42,6 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
"# Ready to receive on cluster host \"%s\".\n", "# Ready to receive on cluster host \"%s\".\n",
$device->getName())); $device->getName()));
} }
}
$log = $this->newProtocolLog($is_proxy); $log = $this->newProtocolLog($is_proxy);
if ($log) { if ($log) {
@ -71,9 +62,7 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
// We've committed the write (or rejected it), so we can release the lock // We've committed the write (or rejected it), so we can release the lock
// without waiting for the client to receive the acknowledgement. // without waiting for the client to receive the acknowledgement.
if ($did_write) {
$cluster_engine->synchronizeWorkingCopyAfterWrite(); $cluster_engine->synchronizeWorkingCopyAfterWrite();
}
if ($caught) { if ($caught) {
throw $caught; throw $caught;
@ -85,7 +74,6 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
// When a repository is clustered, we reach this cleanup code on both // When a repository is clustered, we reach this cleanup code on both
// the proxy and the actual final endpoint node. Don't do more cleanup // the proxy and the actual final endpoint node. Don't do more cleanup
// or logging than we need to. // or logging than we need to.
if ($did_write) {
$repository->writeStatusMessage( $repository->writeStatusMessage(
PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE,
PhabricatorRepositoryStatusMessage::CODE_OKAY); PhabricatorRepositoryStatusMessage::CODE_OKAY);
@ -97,7 +85,6 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
$this->getClusterEngineLogProperty('readWait'), $this->getClusterEngineLogProperty('readWait'),
($host_wait_end - $host_wait_start)); ($host_wait_end - $host_wait_start));
} }
}
return $err; return $err;
} }

View file

@ -10,6 +10,8 @@ abstract class DiffusionGitSSHWorkflow
private $wireProtocol; private $wireProtocol;
private $ioBytesRead = 0; private $ioBytesRead = 0;
private $ioBytesWritten = 0; private $ioBytesWritten = 0;
private $requestAttempts = 0;
private $requestFailures = 0;
protected function writeError($message) { protected function writeError($message) {
// Git assumes we'll add our own newlines. // Git assumes we'll add our own newlines.
@ -146,4 +148,114 @@ abstract class DiffusionGitSSHWorkflow
return $this->ioBytesWritten; return $this->ioBytesWritten;
} }
final protected function executeRepositoryProxyOperations($for_write) {
$device = AlmanacKeys::getLiveDevice();
$refs = $this->getAlmanacServiceRefs($for_write);
$err = 1;
while (true) {
$ref = head($refs);
$command = $this->getProxyCommandForServiceRef($ref);
if ($device) {
$this->writeClusterEngineLogMessage(
pht(
"# Request received by \"%s\", forwarding to cluster ".
"host \"%s\".\n",
$device->getName(),
$ref->getDeviceName()));
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
$future = id(new ExecFuture('%C', $command))
->setEnv($this->getEnvironment());
$this->didBeginRequest();
$err = $this->newPassthruCommand()
->setIOChannel($this->getIOChannel())
->setCommandChannelFromExecFuture($future)
->execute();
// TODO: Currently, when proxying, we do not write an event log on the
// proxy. Perhaps we should write a "proxy log". This is not very useful
// for statistics or auditing, but could be useful for diagnostics.
// Marking the proxy logs as proxied (and recording devicePHID on all
// logs) would make differentiating between these use cases easier.
if (!$err) {
$this->waitForGitClient();
return $err;
}
// Throw away this service: the request failed and we're treating the
// failure as persistent, so we don't want to retry another request to
// the same host.
array_shift($refs);
$should_retry = $this->shouldRetryRequest($refs);
if (!$should_retry) {
return $err;
}
// If we haven't bailed out yet, we'll retry the request with the next
// service.
}
throw new Exception(pht('Reached an unreachable place.'));
}
private function didBeginRequest() {
$this->requestAttempts++;
return $this;
}
private function shouldRetryRequest(array $remaining_refs) {
$this->requestFailures++;
if ($this->requestFailures > $this->requestAttempts) {
throw new Exception(
pht(
"Workflow has recorded more failures than attempts; there is a ".
"missing call to \"didBeginRequest()\".\n"));
}
if (!$remaining_refs) {
$this->writeClusterEngineLogMessage(
pht(
"# All available services failed to serve the request, ".
"giving up.\n"));
return false;
}
$read_len = $this->getIOBytesRead();
if ($read_len) {
$this->writeClusterEngineLogMessage(
pht(
"# Client already read from service (%s bytes), unable to retry.\n",
new PhutilNumber($read_len)));
return false;
}
$write_len = $this->getIOBytesWritten();
if ($write_len) {
$this->writeClusterEngineLogMessage(
pht(
"# Client already wrote to service (%s bytes), unable to retry.\n",
new PhutilNumber($write_len)));
return false;
}
$this->writeClusterEngineLogMessage(
pht(
"# Service request failed, retrying (making attempt %s of %s).\n",
new PhutilNumber($this->requestAttempts + 1),
new PhutilNumber($this->requestAttempts + count($remaining_refs))));
return true;
}
} }

View file

@ -3,9 +3,6 @@
final class DiffusionGitUploadPackSSHWorkflow final class DiffusionGitUploadPackSSHWorkflow
extends DiffusionGitSSHWorkflow { extends DiffusionGitSSHWorkflow {
private $requestAttempts = 0;
private $requestFailures = 0;
protected function didConstruct() { protected function didConstruct() {
$this->setName('git-upload-pack'); $this->setName('git-upload-pack');
$this->setArguments( $this->setArguments(
@ -20,7 +17,7 @@ final class DiffusionGitUploadPackSSHWorkflow
protected function executeRepositoryOperations() { protected function executeRepositoryOperations() {
$is_proxy = $this->shouldProxy(); $is_proxy = $this->shouldProxy();
if ($is_proxy) { if ($is_proxy) {
return $this->executeRepositoryProxyOperations(); return $this->executeRepositoryProxyOperations($for_write = false);
} }
$viewer = $this->getSSHUser(); $viewer = $this->getSSHUser();
@ -94,114 +91,4 @@ final class DiffusionGitUploadPackSSHWorkflow
return $err; return $err;
} }
private function executeRepositoryProxyOperations() {
$device = AlmanacKeys::getLiveDevice();
$for_write = false;
$refs = $this->getAlmanacServiceRefs($for_write);
$err = 1;
while (true) {
$ref = head($refs);
$command = $this->getProxyCommandForServiceRef($ref);
if ($device) {
$this->writeClusterEngineLogMessage(
pht(
"# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n",
$device->getName(),
$ref->getDeviceName()));
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
$future = id(new ExecFuture('%C', $command))
->setEnv($this->getEnvironment());
$this->didBeginRequest();
$err = $this->newPassthruCommand()
->setIOChannel($this->getIOChannel())
->setCommandChannelFromExecFuture($future)
->execute();
// TODO: Currently, when proxying, we do not write an event log on the
// proxy. Perhaps we should write a "proxy log". This is not very useful
// for statistics or auditing, but could be useful for diagnostics.
// Marking the proxy logs as proxied (and recording devicePHID on all
// logs) would make differentiating between these use cases easier.
if (!$err) {
$this->waitForGitClient();
return $err;
}
// Throw away this service: the request failed and we're treating the
// failure as persistent, so we don't want to retry another request to
// the same host.
array_shift($refs);
$should_retry = $this->shouldRetryRequest($refs);
if (!$should_retry) {
return $err;
}
// If we haven't bailed out yet, we'll retry the request with the next
// service.
}
throw new Exception(pht('Reached an unreachable place.'));
}
private function didBeginRequest() {
$this->requestAttempts++;
return $this;
}
private function shouldRetryRequest(array $remaining_refs) {
$this->requestFailures++;
if ($this->requestFailures > $this->requestAttempts) {
throw new Exception(
pht(
"Workflow has recorded more failures than attempts; there is a ".
"missing call to \"didBeginRequest()\".\n"));
}
if (!$remaining_refs) {
$this->writeClusterEngineLogMessage(
pht(
"# All available services failed to serve the request, ".
"giving up.\n"));
return false;
}
$read_len = $this->getIOBytesRead();
if ($read_len) {
$this->writeClusterEngineLogMessage(
pht(
"# Client already read from service (%s bytes), unable to retry.\n",
new PhutilNumber($read_len)));
return false;
}
$write_len = $this->getIOBytesWritten();
if ($write_len) {
$this->writeClusterEngineLogMessage(
pht(
"# Client already wrote to service (%s bytes), unable to retry.\n",
new PhutilNumber($write_len)));
return false;
}
$this->writeClusterEngineLogMessage(
pht(
"# Service request failed, retrying (making attempt %s of %s).\n",
new PhutilNumber($this->requestAttempts + 1),
new PhutilNumber($this->requestAttempts + count($remaining_refs))));
return true;
}
} }