mirror of
https://we.phorge.it/source/arcanist.git
synced 2024-11-22 06:42:41 +01:00
Resolve all futures inside FutureIterator
Summary: Depends on D21032. Ref T11968. Currently, "Future" and "FutureIterator" can both resolve futures. Treat "Future->resolve()" as sugar on resolving an iterator of size 1. Test Plan: Ran tests, created this revision. Maniphest Tasks: T11968 Differential Revision: https://secure.phabricator.com/D21033
This commit is contained in:
parent
3df48c9257
commit
e20dce875c
2 changed files with 61 additions and 81 deletions
|
@ -7,8 +7,6 @@
|
||||||
*/
|
*/
|
||||||
abstract class Future extends Phobject {
|
abstract class Future extends Phobject {
|
||||||
|
|
||||||
protected static $handlerInstalled = null;
|
|
||||||
|
|
||||||
protected $result;
|
protected $result;
|
||||||
protected $exception;
|
protected $exception;
|
||||||
|
|
||||||
|
@ -37,22 +35,13 @@ abstract class Future extends Phobject {
|
||||||
'timeout.'));
|
'timeout.'));
|
||||||
}
|
}
|
||||||
|
|
||||||
$wait = $this->getDefaultWait();
|
$graph = new FutureIterator(array($this));
|
||||||
do {
|
$graph->resolveAll();
|
||||||
$this->checkException();
|
|
||||||
if ($this->isReady()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
$read = $this->getReadSockets();
|
if ($this->exception) {
|
||||||
$write = $this->getWriteSockets();
|
throw $this->exception;
|
||||||
|
}
|
||||||
|
|
||||||
if ($read || $write) {
|
|
||||||
self::waitForSockets($read, $write, $wait);
|
|
||||||
}
|
|
||||||
} while (true);
|
|
||||||
|
|
||||||
$this->checkException();
|
|
||||||
return $this->getResult();
|
return $this->getResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,17 +54,6 @@ abstract class Future extends Phobject {
|
||||||
return $this->exception;
|
return $this->exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If an exception was set by setException(), throw it.
|
|
||||||
*/
|
|
||||||
private function checkException() {
|
|
||||||
if ($this->exception) {
|
|
||||||
throw $this->exception;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve a list of sockets which we can wait to become readable while
|
* Retrieve a list of sockets which we can wait to become readable while
|
||||||
* a future is resolving. If your future has sockets which can be
|
* a future is resolving. If your future has sockets which can be
|
||||||
|
@ -101,56 +79,6 @@ abstract class Future extends Phobject {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait for activity on one of several sockets.
|
|
||||||
*
|
|
||||||
* @param list List of sockets expected to become readable.
|
|
||||||
* @param list List of sockets expected to become writable.
|
|
||||||
* @param float Timeout, in seconds.
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
public static function waitForSockets(
|
|
||||||
array $read_list,
|
|
||||||
array $write_list,
|
|
||||||
$timeout = 1) {
|
|
||||||
if (!self::$handlerInstalled) {
|
|
||||||
// If we're spawning child processes, we need to install a signal handler
|
|
||||||
// here to catch cases like execing '(sleep 60 &) &' where the child
|
|
||||||
// exits but a socket is kept open. But we don't actually need to do
|
|
||||||
// anything because the SIGCHLD will interrupt the stream_select(), as
|
|
||||||
// long as we have a handler registered.
|
|
||||||
if (function_exists('pcntl_signal')) {
|
|
||||||
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
|
|
||||||
throw new Exception(pht('Failed to install signal handler!'));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self::$handlerInstalled = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
$timeout_sec = (int)$timeout;
|
|
||||||
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
|
|
||||||
|
|
||||||
$exceptfds = array();
|
|
||||||
$ok = @stream_select(
|
|
||||||
$read_list,
|
|
||||||
$write_list,
|
|
||||||
$exceptfds,
|
|
||||||
$timeout_sec,
|
|
||||||
$timeout_usec);
|
|
||||||
|
|
||||||
if ($ok === false) {
|
|
||||||
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
|
|
||||||
// to a busy wait.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function handleSIGCHLD($signo) {
|
|
||||||
// This function is a dummy, we just need to have some handler registered
|
|
||||||
// so that PHP will get interrupted during stream_select(). If we don't
|
|
||||||
// register a handler, stream_select() won't fail.
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the final result of the future. This method will be called after
|
* Retrieve the final result of the future. This method will be called after
|
||||||
* the future is ready (as per @{method:isReady}) but before results are
|
* the future is ready (as per @{method:isReady}) but before results are
|
||||||
|
|
|
@ -63,9 +63,7 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
* @task basics
|
* @task basics
|
||||||
*/
|
*/
|
||||||
public function resolveAll() {
|
public function resolveAll() {
|
||||||
foreach ($this as $future) {
|
iterator_to_array($this);
|
||||||
$future->resolve();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -246,7 +244,7 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($can_use_sockets) {
|
if ($can_use_sockets) {
|
||||||
Future::waitForSockets($read_sockets, $write_sockets, $wait_time);
|
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
|
||||||
} else {
|
} else {
|
||||||
usleep(1000);
|
usleep(1000);
|
||||||
}
|
}
|
||||||
|
@ -324,4 +322,58 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for activity on one of several sockets.
|
||||||
|
*
|
||||||
|
* @param list List of sockets expected to become readable.
|
||||||
|
* @param list List of sockets expected to become writable.
|
||||||
|
* @param float Timeout, in seconds.
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
private function waitForSockets(
|
||||||
|
array $read_list,
|
||||||
|
array $write_list,
|
||||||
|
$timeout = 1.0) {
|
||||||
|
|
||||||
|
static $handler_installed = false;
|
||||||
|
|
||||||
|
if (!$handler_installed) {
|
||||||
|
// If we're spawning child processes, we need to install a signal handler
|
||||||
|
// here to catch cases like execing '(sleep 60 &) &' where the child
|
||||||
|
// exits but a socket is kept open. But we don't actually need to do
|
||||||
|
// anything because the SIGCHLD will interrupt the stream_select(), as
|
||||||
|
// long as we have a handler registered.
|
||||||
|
if (function_exists('pcntl_signal')) {
|
||||||
|
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
|
||||||
|
throw new Exception(pht('Failed to install signal handler!'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$handler_installed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
$timeout_sec = (int)$timeout;
|
||||||
|
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
|
||||||
|
|
||||||
|
$exceptfds = array();
|
||||||
|
$ok = @stream_select(
|
||||||
|
$read_list,
|
||||||
|
$write_list,
|
||||||
|
$exceptfds,
|
||||||
|
$timeout_sec,
|
||||||
|
$timeout_usec);
|
||||||
|
|
||||||
|
if ($ok === false) {
|
||||||
|
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
|
||||||
|
// to a busy wait.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function handleSIGCHLD($signo) {
|
||||||
|
// This function is a dummy, we just need to have some handler registered
|
||||||
|
// so that PHP will get interrupted during "stream_select()". If we don't
|
||||||
|
// register a handler, "stream_select()" won't fail.
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue