mirror of
https://we.phorge.it/source/arcanist.git
synced 2024-11-22 14:52:40 +01:00
Make "FutureIterator" queue management more formal
Summary: Depends on D21035. Ref T11968. This allows a "FutureIterator" to hold futures which are blocked because of unresolved dependencies, and makes the resolution process more structured. Test Plan: Ran unit tests, created this revision. Maniphest Tasks: T11968 Differential Revision: https://secure.phabricator.com/D21036
This commit is contained in:
parent
6b75562c3e
commit
cb80f69715
4 changed files with 266 additions and 125 deletions
|
@ -8,8 +8,11 @@
|
||||||
abstract class Future extends Phobject {
|
abstract class Future extends Phobject {
|
||||||
|
|
||||||
private $hasResult = false;
|
private $hasResult = false;
|
||||||
|
private $hasStarted = false;
|
||||||
|
private $hasEnded = false;
|
||||||
private $result;
|
private $result;
|
||||||
private $exception;
|
private $exception;
|
||||||
|
private $futureKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this future's process complete? Specifically, can this future be
|
* Is this future's process complete? Specifically, can this future be
|
||||||
|
@ -36,16 +39,28 @@ abstract class Future extends Phobject {
|
||||||
'timeout.'));
|
'timeout.'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($this->hasException()) {
|
||||||
|
throw $this->getException();
|
||||||
|
}
|
||||||
|
|
||||||
if (!$this->hasResult()) {
|
if (!$this->hasResult()) {
|
||||||
$graph = new FutureIterator(array($this));
|
$graph = new FutureIterator(array($this));
|
||||||
$graph->resolveAll();
|
$graph->resolveAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->hasException()) {
|
return $this->getResult();
|
||||||
throw $this->getException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->getResult();
|
final public function startFuture() {
|
||||||
|
if ($this->hasStarted) {
|
||||||
|
throw new Exception(
|
||||||
|
pht(
|
||||||
|
'Future has already started; futures can not start more '.
|
||||||
|
'than once.'));
|
||||||
|
}
|
||||||
|
$this->hasStarted = true;
|
||||||
|
|
||||||
|
$this->isReady();
|
||||||
}
|
}
|
||||||
|
|
||||||
final public function updateFuture() {
|
final public function updateFuture() {
|
||||||
|
@ -66,6 +81,23 @@ abstract class Future extends Phobject {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final public function endFuture() {
|
||||||
|
if (!$this->hasException() && !$this->hasResult()) {
|
||||||
|
throw new Exception(
|
||||||
|
pht(
|
||||||
|
'Trying to end a future which has no exception and no result. '.
|
||||||
|
'Futures must resolve before they can be ended.'));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->hasEnded) {
|
||||||
|
throw new Exception(
|
||||||
|
pht(
|
||||||
|
'Future has already ended; futures can not end more '.
|
||||||
|
'than once.'));
|
||||||
|
}
|
||||||
|
$this->hasEnded = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve a list of sockets which we can wait to become readable while
|
* Retrieve a list of sockets which we can wait to become readable while
|
||||||
* a future is resolving. If your future has sockets which can be
|
* a future is resolving. If your future has sockets which can be
|
||||||
|
@ -153,5 +185,27 @@ abstract class Future extends Phobject {
|
||||||
return ($this->exception !== null);
|
return ($this->exception !== null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final public function setFutureKey($key) {
|
||||||
|
if ($this->futureKey !== null) {
|
||||||
|
throw new Exception(
|
||||||
|
pht(
|
||||||
|
'Future already has a key ("%s") assigned.',
|
||||||
|
$key));
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->futureKey = $key;
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
final public function getFutureKey() {
|
||||||
|
static $next_key = 1;
|
||||||
|
|
||||||
|
if ($this->futureKey === null) {
|
||||||
|
$this->futureKey = sprintf('Future/%d', $next_key++);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->futureKey;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,17 +28,22 @@
|
||||||
* @task iterator Iterator Interface
|
* @task iterator Iterator Interface
|
||||||
* @task internal Internals
|
* @task internal Internals
|
||||||
*/
|
*/
|
||||||
final class FutureIterator extends Phobject implements Iterator {
|
final class FutureIterator
|
||||||
|
extends Phobject
|
||||||
|
implements Iterator {
|
||||||
|
|
||||||
protected $wait = array();
|
private $hold = array();
|
||||||
protected $work = array();
|
private $wait = array();
|
||||||
protected $futures = array();
|
private $work = array();
|
||||||
protected $key;
|
|
||||||
|
|
||||||
protected $limit;
|
private $futures = array();
|
||||||
|
private $key;
|
||||||
|
|
||||||
protected $timeout;
|
private $limit;
|
||||||
protected $isTimeout = false;
|
|
||||||
|
private $timeout;
|
||||||
|
private $isTimeout = false;
|
||||||
|
private $hasRewound = false;
|
||||||
|
|
||||||
|
|
||||||
/* -( Basics )------------------------------------------------------------- */
|
/* -( Basics )------------------------------------------------------------- */
|
||||||
|
@ -52,7 +57,15 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
*/
|
*/
|
||||||
public function __construct(array $futures) {
|
public function __construct(array $futures) {
|
||||||
assert_instances_of($futures, 'Future');
|
assert_instances_of($futures, 'Future');
|
||||||
$this->futures = $futures;
|
|
||||||
|
$respect_keys = !phutil_is_natural_list($futures);
|
||||||
|
|
||||||
|
foreach ($futures as $map_key => $future) {
|
||||||
|
if ($respect_keys) {
|
||||||
|
$future->setFutureKey($map_key);
|
||||||
|
}
|
||||||
|
$this->addFuture($future);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -63,8 +76,18 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
* @task basics
|
* @task basics
|
||||||
*/
|
*/
|
||||||
public function resolveAll() {
|
public function resolveAll() {
|
||||||
|
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
|
||||||
|
// interpret it to mean that we should iterate over whatever futures
|
||||||
|
// remain.
|
||||||
|
|
||||||
|
if ($this->hasRewound) {
|
||||||
|
while ($this->valid()) {
|
||||||
|
$this->next();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
iterator_to_array($this);
|
iterator_to_array($this);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add another future to the set of futures. This is useful if you have a
|
* Add another future to the set of futures. This is useful if you have a
|
||||||
|
@ -74,24 +97,20 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
* @param Future @{class:Future} to add to iterator
|
* @param Future @{class:Future} to add to iterator
|
||||||
* @task basics
|
* @task basics
|
||||||
*/
|
*/
|
||||||
public function addFuture(Future $future, $key = null) {
|
public function addFuture(Future $future) {
|
||||||
if ($key === null) {
|
$key = $future->getFutureKey();
|
||||||
$this->futures[] = $future;
|
|
||||||
$this->wait[] = last_key($this->futures);
|
if (isset($this->futures[$key])) {
|
||||||
} else if (!isset($this->futures[$key])) {
|
throw new Exception(
|
||||||
$this->futures[$key] = $future;
|
pht(
|
||||||
$this->wait[] = $key;
|
'This future graph already has a future with key "%s". Each '.
|
||||||
} else {
|
'future must have a unique key.',
|
||||||
throw new Exception(pht('Invalid key %s', $key));
|
$key));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start running the future if we don't have $this->limit futures running
|
$this->futures[$key] = $future;
|
||||||
// already. updateWorkingSet() won't start running the future if there's no
|
$this->hold[$key] = $key;
|
||||||
// limit, so we'll manually poke it here in that case.
|
|
||||||
$this->updateWorkingSet();
|
|
||||||
if (!$this->limit) {
|
|
||||||
$future->isReady();
|
|
||||||
}
|
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +169,15 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public function setMaximumWorkingSetSize($limit) {
|
||||||
|
$this->limit = $limit;
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getMaximumWorkingSetSize() {
|
||||||
|
return $this->limit;
|
||||||
|
}
|
||||||
|
|
||||||
/* -( Iterator Interface )------------------------------------------------- */
|
/* -( Iterator Interface )------------------------------------------------- */
|
||||||
|
|
||||||
|
|
||||||
|
@ -157,9 +185,12 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
* @task iterator
|
* @task iterator
|
||||||
*/
|
*/
|
||||||
public function rewind() {
|
public function rewind() {
|
||||||
$this->wait = array_keys($this->futures);
|
if ($this->hasRewound) {
|
||||||
$this->work = null;
|
throw new Exception(
|
||||||
$this->updateWorkingSet();
|
pht('Future graphs can not be rewound.'));
|
||||||
|
}
|
||||||
|
$this->hasRewound = true;
|
||||||
|
|
||||||
$this->next();
|
$this->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,95 +199,93 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
*/
|
*/
|
||||||
public function next() {
|
public function next() {
|
||||||
$this->key = null;
|
$this->key = null;
|
||||||
if (!count($this->wait)) {
|
|
||||||
|
$this->updateWorkingSet();
|
||||||
|
|
||||||
|
if (!$this->work) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$read_sockets = array();
|
|
||||||
$write_sockets = array();
|
|
||||||
|
|
||||||
$start = microtime(true);
|
$start = microtime(true);
|
||||||
$timeout = $this->timeout;
|
$timeout = $this->timeout;
|
||||||
$this->isTimeout = false;
|
$this->isTimeout = false;
|
||||||
|
|
||||||
$check = $this->getWorkingSet();
|
$working_set = array_select_keys($this->futures, $this->work);
|
||||||
|
|
||||||
$resolve = null;
|
while (true) {
|
||||||
do {
|
// Update every future first. This is a no-op on futures which have
|
||||||
$read_sockets = array();
|
// already resolved or failed, but we want to give futures an
|
||||||
$write_sockets = array();
|
// opportunity to make progress even if we can resolve something.
|
||||||
$can_use_sockets = true;
|
|
||||||
$wait_time = 1;
|
|
||||||
foreach ($check as $wait => $key) {
|
|
||||||
$future = $this->futures[$key];
|
|
||||||
|
|
||||||
|
foreach ($working_set as $future_key => $future) {
|
||||||
$future->updateFuture();
|
$future->updateFuture();
|
||||||
|
|
||||||
if ($future->hasException()) {
|
|
||||||
if ($resolve === null) {
|
|
||||||
$resolve = $wait;
|
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
|
// Check if any future has resolved or failed. If we have any such
|
||||||
|
// futures, we'll return the first one from the iterator.
|
||||||
|
|
||||||
|
$resolve_key = null;
|
||||||
|
foreach ($working_set as $future_key => $future) {
|
||||||
|
if ($future->hasException()) {
|
||||||
|
$resolve_key = $future_key;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($future->hasResult()) {
|
if ($future->hasResult()) {
|
||||||
if ($resolve === null) {
|
$resolve_key = $future_key;
|
||||||
$resolve = $wait;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$got_sockets = false;
|
// We've found a future to resolve, so we're done here for now.
|
||||||
$socks = $future->getReadSockets();
|
|
||||||
if ($socks) {
|
if ($resolve_key !== null) {
|
||||||
$got_sockets = true;
|
$this->moveFutureToDone($resolve_key);
|
||||||
foreach ($socks as $socket) {
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't have any futures to resolve yet. Check if we're reached
|
||||||
|
// an update interval.
|
||||||
|
|
||||||
|
$wait_time = 1;
|
||||||
|
if ($timeout !== null) {
|
||||||
|
$elapsed = microtime(true) - $start;
|
||||||
|
|
||||||
|
if ($elapsed > $timeout) {
|
||||||
|
$this->isTimeout = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$wait_time = min($wait_time, $timeout - $elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're going to wait. If possible, we'd like to wait with sockets.
|
||||||
|
// If we can't, we'll just sleep.
|
||||||
|
|
||||||
|
$read_sockets = array();
|
||||||
|
$write_sockets = array();
|
||||||
|
foreach ($working_set as $future_key => $future) {
|
||||||
|
$sockets = $future->getReadSockets();
|
||||||
|
foreach ($sockets as $socket) {
|
||||||
$read_sockets[] = $socket;
|
$read_sockets[] = $socket;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
$socks = $future->getWriteSockets();
|
$sockets = $future->getWriteSockets();
|
||||||
if ($socks) {
|
foreach ($sockets as $socket) {
|
||||||
$got_sockets = true;
|
|
||||||
foreach ($socks as $socket) {
|
|
||||||
$write_sockets[] = $socket;
|
$write_sockets[] = $socket;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any currently active future had neither read nor write sockets,
|
$use_sockets = ($read_sockets || $write_sockets);
|
||||||
// we can't wait for the current batch of items using sockets.
|
if ($use_sockets) {
|
||||||
if (!$got_sockets) {
|
foreach ($working_set as $future) {
|
||||||
$can_use_sockets = false;
|
|
||||||
} else {
|
|
||||||
$wait_time = min($wait_time, $future->getDefaultWait());
|
$wait_time = min($wait_time, $future->getDefaultWait());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if ($resolve === null) {
|
|
||||||
|
|
||||||
// Check for a setUpdateInterval() timeout.
|
|
||||||
if ($timeout !== null) {
|
|
||||||
$elapsed = microtime(true) - $start;
|
|
||||||
if ($elapsed > $timeout) {
|
|
||||||
$this->isTimeout = true;
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
$wait_time = $timeout - $elapsed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($can_use_sockets) {
|
|
||||||
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
|
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
|
||||||
} else {
|
} else {
|
||||||
usleep(1000);
|
usleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while ($resolve === null);
|
|
||||||
|
|
||||||
$this->key = $this->wait[$resolve];
|
|
||||||
unset($this->wait[$resolve]);
|
|
||||||
|
|
||||||
$this->updateWorkingSet();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -292,39 +321,96 @@ final class FutureIterator extends Phobject implements Iterator {
|
||||||
|
|
||||||
/* -( Internals )---------------------------------------------------------- */
|
/* -( Internals )---------------------------------------------------------- */
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @task internal
|
|
||||||
*/
|
|
||||||
protected function getWorkingSet() {
|
|
||||||
if ($this->work === null) {
|
|
||||||
return $this->wait;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $this->work;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @task internal
|
* @task internal
|
||||||
*/
|
*/
|
||||||
protected function updateWorkingSet() {
|
protected function updateWorkingSet() {
|
||||||
if (!$this->limit) {
|
$limit = $this->getMaximumWorkingSetSize();
|
||||||
|
$work_count = count($this->work);
|
||||||
|
|
||||||
|
// If we're already working on the maximum number of futures, we just have
|
||||||
|
// to wait for something to resolve. There's no benefit to updating the
|
||||||
|
// queue since we can never make any meaningful progress.
|
||||||
|
|
||||||
|
if ($limit) {
|
||||||
|
if ($work_count >= $limit) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$old = $this->work;
|
// If any futures that are currently held are no longer blocked by
|
||||||
$this->work = array_slice($this->wait, 0, $this->limit, true);
|
// dependencies, move them from "hold" to "wait".
|
||||||
|
|
||||||
// If we're using a limit, our futures are sleeping and need to be polled
|
foreach ($this->hold as $future_key) {
|
||||||
// to begin execution, so poll any futures which weren't in our working set
|
if (!$this->canMoveFutureToWait($future_key)) {
|
||||||
// before.
|
continue;
|
||||||
foreach ($this->work as $work => $key) {
|
}
|
||||||
if (!isset($old[$work])) {
|
|
||||||
$this->futures[$key]->isReady();
|
$this->moveFutureToWait($future_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
$wait_count = count($this->wait);
|
||||||
|
$hold_count = count($this->hold);
|
||||||
|
|
||||||
|
if (!$work_count && !$wait_count && $hold_count) {
|
||||||
|
throw new Exception(
|
||||||
|
pht(
|
||||||
|
'Future graph is stalled: some futures are held, but no futures '.
|
||||||
|
'are waiting or working. The graph can never resolve.'));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Figure out how many futures we can start. If we don't have a limit,
|
||||||
|
// we can start every waiting future. If we do have a limit, we can only
|
||||||
|
// start as many futures as we have slots for.
|
||||||
|
|
||||||
|
if ($limit) {
|
||||||
|
$work_limit = min($limit, $wait_count);
|
||||||
|
} else {
|
||||||
|
$work_limit = $wait_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're ready to start futures, start them now.
|
||||||
|
|
||||||
|
if ($work_limit) {
|
||||||
|
foreach ($this->wait as $future_key) {
|
||||||
|
$this->moveFutureToWork($future_key);
|
||||||
|
|
||||||
|
$work_limit--;
|
||||||
|
if (!$work_limit) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private function canMoveFutureToWait($future_key) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function moveFutureToWait($future_key) {
|
||||||
|
unset($this->hold[$future_key]);
|
||||||
|
$this->wait[$future_key] = $future_key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function moveFutureToWork($future_key) {
|
||||||
|
unset($this->wait[$future_key]);
|
||||||
|
$this->work[$future_key] = $future_key;
|
||||||
|
|
||||||
|
$this->futures[$future_key]->startFuture();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function moveFutureToDone($future_key) {
|
||||||
|
$this->key = $future_key;
|
||||||
|
unset($this->work[$future_key]);
|
||||||
|
|
||||||
|
// Before we return, do another working set update so we start any
|
||||||
|
// futures that are ready to go as soon as we can.
|
||||||
|
|
||||||
|
$this->updateWorkingSet();
|
||||||
|
|
||||||
|
$this->futures[$future_key]->endFuture();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for activity on one of several sockets.
|
* Wait for activity on one of several sockets.
|
||||||
|
|
|
@ -61,7 +61,7 @@ final class ExecFutureTestCase extends PhutilTestCase {
|
||||||
// NOTE: This tests interactions between the resolve() timeout and the
|
// NOTE: This tests interactions between the resolve() timeout and the
|
||||||
// resolution timeout, which are somewhat similar but not identical.
|
// resolution timeout, which are somewhat similar but not identical.
|
||||||
|
|
||||||
$future = $this->newSleep(32000)->start();
|
$future = $this->newSleep(32000);
|
||||||
$future->setTimeout(32000);
|
$future->setTimeout(32000);
|
||||||
|
|
||||||
// We expect this to return in 0.01s.
|
// We expect this to return in 0.01s.
|
||||||
|
@ -77,7 +77,7 @@ final class ExecFutureTestCase extends PhutilTestCase {
|
||||||
// do this, we'll hang when exiting until our subprocess exits (32000
|
// do this, we'll hang when exiting until our subprocess exits (32000
|
||||||
// seconds!)
|
// seconds!)
|
||||||
$future->setTimeout(0.01);
|
$future->setTimeout(0.01);
|
||||||
$future->resolve();
|
$iterator->resolveAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testTerminateWithoutStart() {
|
public function testTerminateWithoutStart() {
|
||||||
|
|
|
@ -15,7 +15,8 @@ abstract class ArcanistFutureLinter extends ArcanistLinter {
|
||||||
$limit = $this->getFuturesLimit();
|
$limit = $this->getFuturesLimit();
|
||||||
$this->futures = id(new FutureIterator(array()))->limit($limit);
|
$this->futures = id(new FutureIterator(array()))->limit($limit);
|
||||||
foreach ($this->buildFutures($paths) as $path => $future) {
|
foreach ($this->buildFutures($paths) as $path => $future) {
|
||||||
$this->futures->addFuture($future, $path);
|
$future->setFutureKey($path);
|
||||||
|
$this->futures->addFuture($future);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue