1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-12-21 21:10:56 +01:00

Make PullLocal daemon more flexible and transparent about scheduling

Summary:
Ref T4605. Fixes T3466. The major change here is that we now run up to four simultaneous updates. This should ease cases where, e.g., one very slow repository was blocking other repositories. It also tends to increase load; the next diff will introduce smart backoff for cold repositories to ease this.

The rest of this is just a ton of logging so I can IRC debug these things by having users run them in `phd debug pulllocal` mode.

For T3466:

  - You now have to hit four simultaneous hangs to completely block the update process.
  - Importing repository updates are killed after 4 hours.
  - Imported repository updates are killed after 15 minutes.

Test Plan:
  - Ran `phd debug pulllocal` and observed sensible logs and behavior.
  - Interrupted daemon from sleeps and processing with `diffusion.looksoon`.
  - Ran with various `--not`, `--no-discovery` flags.

Reviewers: btrahan

Reviewed By: btrahan

Subscribers: epriestley

Maniphest Tasks: T3466, T4605

Differential Revision: https://secure.phabricator.com/D8785
This commit is contained in:
epriestley 2014-04-16 13:00:49 -07:00
parent 417056932e
commit 5671c4b276

View file

@ -23,8 +23,6 @@
* repository).
*
* @task pull Pulling Repositories
* @task git Git Implementation
* @task hg Mercurial Implementation
*/
final class PhabricatorRepositoryPullLocalDaemon
extends PhabricatorDaemon {
@ -60,8 +58,8 @@ final class PhabricatorRepositoryPullLocalDaemon
));
$no_discovery = $args->getArg('no-discovery');
$repo_names = $args->getArg('repositories');
$exclude_names = $args->getArg('not');
$include = $args->getArg('repositories');
$exclude = $args->getArg('not');
// Each repository has an individual pull frequency; after we pull it,
// wait that long to pull it again. When we start up, try to pull everything
@ -69,24 +67,27 @@ final class PhabricatorRepositoryPullLocalDaemon
$retry_after = array();
$min_sleep = 15;
$max_futures = 4;
$futures = array();
$queue = array();
while (true) {
$repositories = $this->loadRepositories($repo_names);
if ($exclude_names) {
$exclude = $this->loadRepositories($exclude_names);
$repositories = array_diff_key($repositories, $exclude);
}
// Shuffle the repositories, then re-key the array since shuffle()
// discards keys. This is mostly for startup, we'll use soft priorities
// later.
shuffle($repositories);
$repositories = mpull($repositories, null, 'getID');
$pullable = $this->loadPullableRepositories($include, $exclude);
// If any repositories have the NEEDS_UPDATE flag set, pull them
// as soon as possible.
$need_update_messages = $this->loadRepositoryUpdateMessages();
foreach ($need_update_messages as $message) {
$repo = idx($pullable, $message->getRepositoryID());
if (!$repo) {
continue;
}
$this->log(
pht(
'Got an update message for repository "%s"!',
$repo->getMonogram()));
$retry_after[$message->getRepositoryID()] = time();
}
@ -95,102 +96,190 @@ final class PhabricatorRepositoryPullLocalDaemon
// causes us to sleep for the minimum amount of time.
$retry_after = array_select_keys(
$retry_after,
array_keys($repositories));
array_keys($pullable));
// Assign soft priorities to repositories based on how frequently they
// should pull again.
asort($retry_after);
$repositories = array_select_keys(
$repositories,
array_keys($retry_after)) + $repositories;
foreach ($repositories as $id => $repository) {
// Figure out which repositories we need to queue for an update.
foreach ($pullable as $id => $repository) {
$monogram = $repository->getMonogram();
if (isset($futures[$id])) {
$this->log(pht('Repository "%s" is currently updating.', $monogram));
continue;
}
if (isset($queue[$id])) {
$this->log(pht('Repository "%s" is already queued.', $monogram));
continue;
}
$after = idx($retry_after, $id, 0);
if ($after > time()) {
$this->log(
pht(
'Repository "%s" is not due for an update for %s second(s).',
$monogram,
new PhutilNumber($after - time())));
continue;
}
$tracked = $repository->isTracked();
if (!$tracked) {
continue;
if (!$after) {
$this->log(
pht(
'Scheduling repository "%s" for an initial update.',
$monogram));
} else {
$this->log(
pht(
'Scheduling repository "%s" for an update (%s seconds overdue).',
$monogram,
new PhutilNumber(time() - $after)));
}
$callsign = $repository->getCallsign();
try {
$this->log("Updating repository '{$callsign}'.");
$bin_dir = dirname(phutil_get_library_root('phabricator')).'/bin';
$flags = array();
if ($no_discovery) {
$flags[] = '--no-discovery';
}
list($stdout, $stderr) = execx(
'%s/repository update %Ls -- %s',
$bin_dir,
$flags,
$callsign);
if (strlen($stderr)) {
$stderr_msg = pht(
'Unexpected output while updating the %s repository: %s',
$callsign,
$stderr);
phlog($stderr_msg);
}
$sleep_for = $repository->getDetail('pull-frequency', $min_sleep);
$retry_after[$id] = time() + $sleep_for;
} catch (Exception $ex) {
$retry_after[$id] = time() + $min_sleep;
$proxy = new PhutilProxyException(
"Error while fetching changes to the '{$callsign}' repository.",
$ex);
phlog($proxy);
}
$this->stillWorking();
$queue[$id] = $after;
}
if ($retry_after) {
$sleep_until = max(min($retry_after), time() + $min_sleep);
} else {
$sleep_until = time() + $min_sleep;
}
// Process repositories in the order they became candidates for updates.
asort($queue);
// Dequeue repositories until we hit maximum parallelism.
while ($queue && (count($futures) < $max_futures)) {
foreach ($queue as $id => $time) {
$repository = idx($pullable, $id);
if (!$repository) {
$this->log(
pht('Repository %s is no longer pullable; skipping.', $id));
break;
}
$monogram = $repository->getMonogram();
$this->log(pht('Starting update for repository "%s".', $monogram));
unset($queue[$id]);
$futures[$id] = $this->buildUpdateFuture(
$repository,
$no_discovery);
while (($sleep_until - time()) > 0) {
$this->sleep(1);
if ($this->loadRepositoryUpdateMessages()) {
break;
}
}
if ($queue) {
$this->log(
pht(
'Not enough process slots to schedule the other %s '.
'repository(s) for updates yet.',
new PhutilNumber(count($queue))));
}
if ($futures) {
$iterator = id(new FutureIterator($futures))
->setUpdateInterval($min_sleep);
foreach ($iterator as $id => $future) {
$this->stillWorking();
if ($future === null) {
$this->log(pht('Waiting for updates to complete...'));
$this->stillWorking();
if ($this->loadRepositoryUpdateMessages()) {
$this->log(pht('Interrupted by pending updates!'));
break;
}
continue;
}
unset($futures[$id]);
$retry_after[$id] = $this->resolveUpdateFuture(
$pullable[$id],
$future,
$min_sleep);
// We have a free slot now, so go try to fill it.
break;
}
// Jump back into prioritization if we had any futures to deal with.
continue;
}
$this->waitForUpdates($min_sleep, $retry_after);
}
}
/**
* @task pull
*/
private function buildUpdateFuture(
PhabricatorRepository $repository,
$no_discovery) {
$bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
$flags = array();
if ($no_discovery) {
$flags[] = '--no-discovery';
}
$callsign = $repository->getCallsign();
$future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign);
// Sometimes, the underlying VCS commands will hang indefinitely. We've
// observed this occasionally with GitHub, and other users have observed
// it with other VCS servers.
// To limit the damage this can cause, kill the update out after a
// reasonable amount of time, under the assumption that it has hung.
// Since it's hard to know what a "reasonable" amount of time is given that
// users may be downloading a repository full of pirated movies over a
// potato, these limits are fairly generous. Repositories exceeding these
// limits can be manually pulled with `bin/repository update X`, which can
// just run for as long as it wants.
if ($repository->isImporting()) {
$timeout = phutil_units('4 hours in seconds');
} else {
$timeout = phutil_units('15 minutes in seconds');
}
$future->setTimeout($timeout);
return $future;
}
/**
* @task pull
*/
private function loadRepositoryUpdateMessages() {
$type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE;
return id(new PhabricatorRepositoryStatusMessage())
->loadAllWhere('statusType = %s', $type_need_update);
}
/**
* @task pull
*/
protected function loadRepositories(array $names) {
private function loadPullableRepositories(array $include, array $exclude) {
$query = id(new PhabricatorRepositoryQuery())
->setViewer($this->getViewer());
if ($names) {
$query->withCallsigns($names);
if ($include) {
$query->withCallsigns($include);
}
$repos = $query->execute();
$repositories = $query->execute();
if ($names) {
$by_callsign = mpull($repos, null, 'getCallsign');
foreach ($names as $name) {
if ($include) {
$by_callsign = mpull($repositories, null, 'getCallsign');
foreach ($include as $name) {
if (empty($by_callsign[$name])) {
throw new Exception(
"No repository exists with callsign '{$name}'!");
@ -198,7 +287,103 @@ final class PhabricatorRepositoryPullLocalDaemon
}
}
return $repos;
if ($exclude) {
$exclude = array_fuse($exclude);
foreach ($repositories as $key => $repository) {
if (isset($exclude[$repository->getCallsign()])) {
unset($repositories[$key]);
}
}
}
foreach ($repositories as $key => $repository) {
if (!$repository->isTracked()) {
unset($repositories[$key]);
}
}
// Shuffle the repositories, then re-key the array since shuffle()
// discards keys. This is mostly for startup, we'll use soft priorities
// later.
shuffle($repositories);
$repositories = mpull($repositories, null, 'getID');
return $repositories;
}
/**
* @task pull
*/
private function resolveUpdateFuture(
PhabricatorRepository $repository,
ExecFuture $future,
$min_sleep) {
$monogram = $repository->getMonogram();
$this->log(pht('Resolving update for "%s".', $monogram));
try {
list($stdout, $stderr) = $future->resolvex();
} catch (Exception $ex) {
$proxy = new PhutilProxyException(
pht(
'Error while updating the "%s" repository.',
$repository->getMonogram()),
$ex);
phlog($proxy);
return time() + $min_sleep;
}
if (strlen($stderr)) {
$stderr_msg = pht(
'Unexpected output while updating repository "%s": %s',
$monogram,
$stderr);
phlog($stderr_msg);
}
$sleep_for = (int)$repository->getDetail('pull-frequency', $min_sleep);
if ($sleep_for < $min_sleep) {
$sleep_for = $min_sleep;
}
return time() + $sleep_for;
}
/**
* Sleep for a short period of time, waiting for update messages from the
*
*
* @task pull
*/
private function waitForUpdates($min_sleep, array $retry_after) {
$this->log(
pht('No repositories need updates right now, sleeping...'));
$sleep_until = time() + $min_sleep;
if ($retry_after) {
$sleep_until = min($sleep_until, min($retry_after));
}
while (($sleep_until - time()) > 0) {
$sleep_duration = ($sleep_until - time());
$this->log(
pht(
'Sleeping for %s more second(s)...',
new PhutilNumber($sleep_duration)));
$this->sleep(1);
if ($this->loadRepositoryUpdateMessages()) {
$this->log(pht('Awakened from sleep by pending updates!'));
break;
}
}
}
}