From e20dce875cf2990b6bc3eebec4219304d037ba9b Mon Sep 17 00:00:00 2001 From: epriestley Date: Wed, 26 Feb 2020 11:31:54 -0800 Subject: [PATCH] Resolve all futures inside FutureIterator Summary: Depends on D21032. Ref T11968. Currently, "Future" and "FutureIterator" can both resolve futures. Treat "Future->resolve()" as sugar on resolving an iterator of size 1. Test Plan: Ran tests, created this revision. Maniphest Tasks: T11968 Differential Revision: https://secure.phabricator.com/D21033 --- src/future/Future.php | 82 +++-------------------------------- src/future/FutureIterator.php | 60 +++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 81 deletions(-) diff --git a/src/future/Future.php b/src/future/Future.php index 34740a5b..e468d076 100644 --- a/src/future/Future.php +++ b/src/future/Future.php @@ -7,8 +7,6 @@ */ abstract class Future extends Phobject { - protected static $handlerInstalled = null; - protected $result; protected $exception; @@ -37,22 +35,13 @@ abstract class Future extends Phobject { 'timeout.')); } - $wait = $this->getDefaultWait(); - do { - $this->checkException(); - if ($this->isReady()) { - break; - } + $graph = new FutureIterator(array($this)); + $graph->resolveAll(); - $read = $this->getReadSockets(); - $write = $this->getWriteSockets(); + if ($this->exception) { + throw $this->exception; + } - if ($read || $write) { - self::waitForSockets($read, $write, $wait); - } - } while (true); - - $this->checkException(); return $this->getResult(); } @@ -65,17 +54,6 @@ abstract class Future extends Phobject { return $this->exception; } - - /** - * If an exception was set by setException(), throw it. - */ - private function checkException() { - if ($this->exception) { - throw $this->exception; - } - } - - /** * Retrieve a list of sockets which we can wait to become readable while * a future is resolving. If your future has sockets which can be @@ -101,56 +79,6 @@ abstract class Future extends Phobject { } - /** - * Wait for activity on one of several sockets. - * - * @param list List of sockets expected to become readable. - * @param list List of sockets expected to become writable. - * @param float Timeout, in seconds. - * @return void - */ - public static function waitForSockets( - array $read_list, - array $write_list, - $timeout = 1) { - if (!self::$handlerInstalled) { - // If we're spawning child processes, we need to install a signal handler - // here to catch cases like execing '(sleep 60 &) &' where the child - // exits but a socket is kept open. But we don't actually need to do - // anything because the SIGCHLD will interrupt the stream_select(), as - // long as we have a handler registered. - if (function_exists('pcntl_signal')) { - if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { - throw new Exception(pht('Failed to install signal handler!')); - } - } - self::$handlerInstalled = true; - } - - $timeout_sec = (int)$timeout; - $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); - - $exceptfds = array(); - $ok = @stream_select( - $read_list, - $write_list, - $exceptfds, - $timeout_sec, - $timeout_usec); - - if ($ok === false) { - // Hopefully, means we received a SIGCHLD. In the worst case, we degrade - // to a busy wait. - } - } - - public static function handleSIGCHLD($signo) { - // This function is a dummy, we just need to have some handler registered - // so that PHP will get interrupted during stream_select(). If we don't - // register a handler, stream_select() won't fail. - } - - /** * Retrieve the final result of the future. This method will be called after * the future is ready (as per @{method:isReady}) but before results are diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php index ac1eec97..933fe9d8 100644 --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -63,9 +63,7 @@ final class FutureIterator extends Phobject implements Iterator { * @task basics */ public function resolveAll() { - foreach ($this as $future) { - $future->resolve(); - } + iterator_to_array($this); } /** @@ -246,7 +244,7 @@ final class FutureIterator extends Phobject implements Iterator { } if ($can_use_sockets) { - Future::waitForSockets($read_sockets, $write_sockets, $wait_time); + $this->waitForSockets($read_sockets, $write_sockets, $wait_time); } else { usleep(1000); } @@ -324,4 +322,58 @@ final class FutureIterator extends Phobject implements Iterator { } } + + /** + * Wait for activity on one of several sockets. + * + * @param list List of sockets expected to become readable. + * @param list List of sockets expected to become writable. + * @param float Timeout, in seconds. + * @return void + */ + private function waitForSockets( + array $read_list, + array $write_list, + $timeout = 1.0) { + + static $handler_installed = false; + + if (!$handler_installed) { + // If we're spawning child processes, we need to install a signal handler + // here to catch cases like execing '(sleep 60 &) &' where the child + // exits but a socket is kept open. But we don't actually need to do + // anything because the SIGCHLD will interrupt the stream_select(), as + // long as we have a handler registered. + if (function_exists('pcntl_signal')) { + if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { + throw new Exception(pht('Failed to install signal handler!')); + } + } + $handler_installed = true; + } + + $timeout_sec = (int)$timeout; + $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); + + $exceptfds = array(); + $ok = @stream_select( + $read_list, + $write_list, + $exceptfds, + $timeout_sec, + $timeout_usec); + + if ($ok === false) { + // Hopefully, means we received a SIGCHLD. In the worst case, we degrade + // to a busy wait. + } + } + + public static function handleSIGCHLD($signo) { + // This function is a dummy, we just need to have some handler registered + // so that PHP will get interrupted during "stream_select()". If we don't + // register a handler, "stream_select()" won't fail. + } + + }