mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-29 10:12:41 +01:00
Update some Phabricator behaviors for changes to Futures
Summary: Depends on D21053. Ref T11968. Three things have changed: - Overseers can no longer use FutureIterator to continue execution of an arbitrary list of futures from any state. Use FuturePool instead. - Same with repository daemons. - Probably (?) fix an API change in the Harbormaster exec future. Test Plan: - Ran "bin/phd debug task" and "bin/phd debug pull", no longer saw Future-management related errors. - The Harbormaster future is easiest to test by just seeing if production works once this change is deployed there. Subscribers: PHID-OPKG-gm6ozazyms6q6i22gyam Maniphest Tasks: T11968 Differential Revision: https://secure.phabricator.com/D21054
This commit is contained in:
parent
067b04aaf1
commit
1a59cae743
3 changed files with 60 additions and 32 deletions
|
@ -25,9 +25,13 @@ final class HarbormasterExecFuture
|
||||||
}
|
}
|
||||||
|
|
||||||
public function isReady() {
|
public function isReady() {
|
||||||
|
if ($this->hasResult()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
$future = $this->getFuture();
|
$future = $this->getFuture();
|
||||||
|
|
||||||
$result = $future->isReady();
|
$is_ready = $future->isReady();
|
||||||
|
|
||||||
list($stdout, $stderr) = $future->read();
|
list($stdout, $stderr) = $future->read();
|
||||||
$future->discardBuffers();
|
$future->discardBuffers();
|
||||||
|
@ -40,11 +44,14 @@ final class HarbormasterExecFuture
|
||||||
$this->stderr->append($stderr);
|
$this->stderr->append($stderr);
|
||||||
}
|
}
|
||||||
|
|
||||||
return $result;
|
if ($future->hasResult()) {
|
||||||
|
$this->setResult($future->getResult());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function getResult() {
|
// TODO: This should probably be implemented as a FutureProxy; it will
|
||||||
return $this->getFuture()->getResult();
|
// not currently propagate exceptions or sockets properly.
|
||||||
|
|
||||||
|
return $is_ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,6 +73,11 @@ final class PhabricatorRepositoryPullLocalDaemon
|
||||||
$futures = array();
|
$futures = array();
|
||||||
$queue = array();
|
$queue = array();
|
||||||
|
|
||||||
|
$future_pool = new FuturePool();
|
||||||
|
|
||||||
|
$future_pool->getIteratorTemplate()
|
||||||
|
->setUpdateInterval($min_sleep);
|
||||||
|
|
||||||
$sync_wait = phutil_units('2 minutes in seconds');
|
$sync_wait = phutil_units('2 minutes in seconds');
|
||||||
$last_sync = array();
|
$last_sync = array();
|
||||||
|
|
||||||
|
@ -214,10 +219,14 @@ final class PhabricatorRepositoryPullLocalDaemon
|
||||||
$display_name));
|
$display_name));
|
||||||
|
|
||||||
unset($queue[$id]);
|
unset($queue[$id]);
|
||||||
$futures[$id] = $this->buildUpdateFuture(
|
|
||||||
|
$future = $this->buildUpdateFuture(
|
||||||
$repository,
|
$repository,
|
||||||
$no_discovery);
|
$no_discovery);
|
||||||
|
|
||||||
|
$futures[$id] = $future->getFutureKey();
|
||||||
|
|
||||||
|
$future_pool->addFuture($future);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,16 +239,14 @@ final class PhabricatorRepositoryPullLocalDaemon
|
||||||
phutil_count($queue)));
|
phutil_count($queue)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($futures) {
|
if ($future_pool->hasFutures()) {
|
||||||
$iterator = id(new FutureIterator($futures))
|
while ($future_pool->hasFutures()) {
|
||||||
->setUpdateInterval($min_sleep);
|
$future = $future_pool->resolve();
|
||||||
|
|
||||||
foreach ($iterator as $id => $future) {
|
|
||||||
$this->stillWorking();
|
$this->stillWorking();
|
||||||
|
|
||||||
if ($future === null) {
|
if ($future === null) {
|
||||||
$this->log(pht('Waiting for updates to complete...'));
|
$this->log(pht('Waiting for updates to complete...'));
|
||||||
$this->stillWorking();
|
|
||||||
|
|
||||||
if ($this->loadRepositoryUpdateMessages()) {
|
if ($this->loadRepositoryUpdateMessages()) {
|
||||||
$this->log(pht('Interrupted by pending updates!'));
|
$this->log(pht('Interrupted by pending updates!'));
|
||||||
|
@ -249,9 +256,18 @@ final class PhabricatorRepositoryPullLocalDaemon
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$future_key = $future->getFutureKey();
|
||||||
|
$repository_id = null;
|
||||||
|
foreach ($futures as $id => $key) {
|
||||||
|
if ($key === $future_key) {
|
||||||
|
$repository_id = $id;
|
||||||
unset($futures[$id]);
|
unset($futures[$id]);
|
||||||
$retry_after[$id] = $this->resolveUpdateFuture(
|
break;
|
||||||
$pullable[$id],
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$retry_after[$repository_id] = $this->resolveUpdateFuture(
|
||||||
|
$pullable[$repository_id],
|
||||||
$future,
|
$future,
|
||||||
$min_sleep);
|
$min_sleep);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,8 @@ final class PhutilDaemonOverseer extends Phobject {
|
||||||
private $inAbruptShutdown;
|
private $inAbruptShutdown;
|
||||||
private $inGracefulShutdown;
|
private $inGracefulShutdown;
|
||||||
|
|
||||||
|
private $futurePool;
|
||||||
|
|
||||||
public function __construct(array $argv) {
|
public function __construct(array $argv) {
|
||||||
PhutilServiceProfiler::getInstance()->enableDiscardMode();
|
PhutilServiceProfiler::getInstance()->enableDiscardMode();
|
||||||
|
|
||||||
|
@ -160,13 +162,13 @@ EOHELP
|
||||||
public function run() {
|
public function run() {
|
||||||
$this->createDaemonPools();
|
$this->createDaemonPools();
|
||||||
|
|
||||||
|
$future_pool = $this->getFuturePool();
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if ($this->shouldReloadDaemons()) {
|
if ($this->shouldReloadDaemons()) {
|
||||||
$this->didReceiveSignal(SIGHUP);
|
$this->didReceiveSignal(SIGHUP);
|
||||||
}
|
}
|
||||||
|
|
||||||
$futures = array();
|
|
||||||
|
|
||||||
$running_pools = false;
|
$running_pools = false;
|
||||||
foreach ($this->getDaemonPools() as $pool) {
|
foreach ($this->getDaemonPools() as $pool) {
|
||||||
$pool->updatePool();
|
$pool->updatePool();
|
||||||
|
@ -180,7 +182,7 @@ EOHELP
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach ($pool->getFutures() as $future) {
|
foreach ($pool->getFutures() as $future) {
|
||||||
$futures[] = $future;
|
$future_pool->addFuture($future);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($pool->getDaemons()) {
|
if ($pool->getDaemons()) {
|
||||||
|
@ -190,9 +192,15 @@ EOHELP
|
||||||
|
|
||||||
$this->updateMemory();
|
$this->updateMemory();
|
||||||
|
|
||||||
$this->waitForDaemonFutures($futures);
|
if ($future_pool->hasFutures()) {
|
||||||
|
$future_pool->resolve();
|
||||||
|
} else {
|
||||||
|
if (!$this->shouldShutdown()) {
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!$futures && !$running_pools) {
|
if (!$future_pool->hasFutures() && !$running_pools) {
|
||||||
if ($this->shouldShutdown()) {
|
if ($this->shouldShutdown()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -202,23 +210,20 @@ EOHELP
|
||||||
exit($this->err);
|
exit($this->err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function getFuturePool() {
|
||||||
|
if (!$this->futurePool) {
|
||||||
|
$pool = new FuturePool();
|
||||||
|
|
||||||
private function waitForDaemonFutures(array $futures) {
|
// TODO: This only wakes if any daemons actually exit, or 1 second
|
||||||
assert_instances_of($futures, 'ExecFuture');
|
// passes. It would be a bit cleaner to wait on any I/O, but Futures
|
||||||
|
// currently can't do that.
|
||||||
|
|
||||||
if ($futures) {
|
$pool->getIteratorTemplate()
|
||||||
// TODO: This only wakes if any daemons actually exit. It would be a bit
|
|
||||||
// cleaner to wait on any I/O with Channels.
|
|
||||||
$iter = id(new FutureIterator($futures))
|
|
||||||
->setUpdateInterval(1);
|
->setUpdateInterval(1);
|
||||||
foreach ($iter as $future) {
|
|
||||||
break;
|
$this->futurePool = $pool;
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!$this->shouldShutdown()) {
|
|
||||||
sleep(1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return $this->futurePool;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function createDaemonPools() {
|
private function createDaemonPools() {
|
||||||
|
|
Loading…
Reference in a new issue