diff --git a/src/applications/harbormaster/future/HarbormasterExecFuture.php b/src/applications/harbormaster/future/HarbormasterExecFuture.php index 9dc4e172fd..1e6d4a5c2c 100644 --- a/src/applications/harbormaster/future/HarbormasterExecFuture.php +++ b/src/applications/harbormaster/future/HarbormasterExecFuture.php @@ -25,9 +25,13 @@ final class HarbormasterExecFuture } public function isReady() { + if ($this->hasResult()) { + return true; + } + $future = $this->getFuture(); - $result = $future->isReady(); + $is_ready = $future->isReady(); list($stdout, $stderr) = $future->read(); $future->discardBuffers(); @@ -40,11 +44,14 @@ final class HarbormasterExecFuture $this->stderr->append($stderr); } - return $result; - } + if ($future->hasResult()) { + $this->setResult($future->getResult()); + } - protected function getResult() { - return $this->getFuture()->getResult(); + // TODO: This should probably be implemented as a FutureProxy; it will + // not currently propagate exceptions or sockets properly. + + return $is_ready; } } diff --git a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php index 125ee833f8..48eabf1545 100644 --- a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php +++ b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php @@ -73,6 +73,11 @@ final class PhabricatorRepositoryPullLocalDaemon $futures = array(); $queue = array(); + $future_pool = new FuturePool(); + + $future_pool->getIteratorTemplate() + ->setUpdateInterval($min_sleep); + $sync_wait = phutil_units('2 minutes in seconds'); $last_sync = array(); @@ -214,10 +219,14 @@ final class PhabricatorRepositoryPullLocalDaemon $display_name)); unset($queue[$id]); - $futures[$id] = $this->buildUpdateFuture( + + $future = $this->buildUpdateFuture( $repository, $no_discovery); + $futures[$id] = $future->getFutureKey(); + + $future_pool->addFuture($future); break; } } @@ -230,16 +239,14 @@ final class PhabricatorRepositoryPullLocalDaemon phutil_count($queue))); } - if ($futures) { - $iterator = id(new FutureIterator($futures)) - ->setUpdateInterval($min_sleep); + if ($future_pool->hasFutures()) { + while ($future_pool->hasFutures()) { + $future = $future_pool->resolve(); - foreach ($iterator as $id => $future) { $this->stillWorking(); if ($future === null) { $this->log(pht('Waiting for updates to complete...')); - $this->stillWorking(); if ($this->loadRepositoryUpdateMessages()) { $this->log(pht('Interrupted by pending updates!')); @@ -249,9 +256,18 @@ final class PhabricatorRepositoryPullLocalDaemon continue; } - unset($futures[$id]); - $retry_after[$id] = $this->resolveUpdateFuture( - $pullable[$id], + $future_key = $future->getFutureKey(); + $repository_id = null; + foreach ($futures as $id => $key) { + if ($key === $future_key) { + $repository_id = $id; + unset($futures[$id]); + break; + } + } + + $retry_after[$repository_id] = $this->resolveUpdateFuture( + $pullable[$repository_id], $future, $min_sleep); diff --git a/src/infrastructure/daemon/PhutilDaemonOverseer.php b/src/infrastructure/daemon/PhutilDaemonOverseer.php index 44e0c41384..77f6f6a20a 100644 --- a/src/infrastructure/daemon/PhutilDaemonOverseer.php +++ b/src/infrastructure/daemon/PhutilDaemonOverseer.php @@ -32,6 +32,8 @@ final class PhutilDaemonOverseer extends Phobject { private $inAbruptShutdown; private $inGracefulShutdown; + private $futurePool; + public function __construct(array $argv) { PhutilServiceProfiler::getInstance()->enableDiscardMode(); @@ -160,13 +162,13 @@ EOHELP public function run() { $this->createDaemonPools(); + $future_pool = $this->getFuturePool(); + while (true) { if ($this->shouldReloadDaemons()) { $this->didReceiveSignal(SIGHUP); } - $futures = array(); - $running_pools = false; foreach ($this->getDaemonPools() as $pool) { $pool->updatePool(); @@ -180,7 +182,7 @@ EOHELP } foreach ($pool->getFutures() as $future) { - $futures[] = $future; + $future_pool->addFuture($future); } if ($pool->getDaemons()) { @@ -190,9 +192,15 @@ EOHELP $this->updateMemory(); - $this->waitForDaemonFutures($futures); + if ($future_pool->hasFutures()) { + $future_pool->resolve(); + } else { + if (!$this->shouldShutdown()) { + sleep(1); + } + } - if (!$futures && !$running_pools) { + if (!$future_pool->hasFutures() && !$running_pools) { if ($this->shouldShutdown()) { break; } @@ -202,23 +210,20 @@ EOHELP exit($this->err); } + private function getFuturePool() { + if (!$this->futurePool) { + $pool = new FuturePool(); - private function waitForDaemonFutures(array $futures) { - assert_instances_of($futures, 'ExecFuture'); + // TODO: This only wakes if any daemons actually exit, or 1 second + // passes. It would be a bit cleaner to wait on any I/O, but Futures + // currently can't do that. - if ($futures) { - // TODO: This only wakes if any daemons actually exit. It would be a bit - // cleaner to wait on any I/O with Channels. - $iter = id(new FutureIterator($futures)) + $pool->getIteratorTemplate() ->setUpdateInterval(1); - foreach ($iter as $future) { - break; - } - } else { - if (!$this->shouldShutdown()) { - sleep(1); - } + + $this->futurePool = $pool; } + return $this->futurePool; } private function createDaemonPools() {