mirror of
https://we.phorge.it/source/arcanist.git
synced 2024-11-22 06:42:41 +01:00
Move Phage to FuturePool
Summary: Ref T11968. Phage has another "sustained pool of Futures" use case, and needs some slight adjustments after Future API changes. Test Plan: Ran `bin/phage status ...`, got a clean result instead of a JSON decoding failure. Maniphest Tasks: T11968 Differential Revision: https://secure.phabricator.com/D21058
This commit is contained in:
parent
099c2ae648
commit
32005f26a4
7 changed files with 76 additions and 21 deletions
|
@ -866,6 +866,7 @@ phutil_register_library_map(array(
|
|||
'phutil_decode_mime_header' => 'utils/utils.php',
|
||||
'phutil_deprecated' => 'init/lib/moduleutils.php',
|
||||
'phutil_describe_type' => 'utils/utils.php',
|
||||
'phutil_encode_log' => 'utils/utils.php',
|
||||
'phutil_error_listener_example' => 'error/phlog.php',
|
||||
'phutil_escape_uri' => 'utils/utils.php',
|
||||
'phutil_escape_uri_path_component' => 'utils/utils.php',
|
||||
|
|
|
@ -43,7 +43,16 @@ final class PhutilJSONProtocolChannel extends PhutilProtocolChannel {
|
|||
* @task protocol
|
||||
*/
|
||||
protected function encodeMessage($message) {
|
||||
$message = json_encode($message);
|
||||
if (!is_array($message)) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'JSON protocol message must be an array, got some other '.
|
||||
'type ("%s").',
|
||||
phutil_describe_type($message)));
|
||||
}
|
||||
|
||||
$message = phutil_json_encode($message);
|
||||
|
||||
$len = sprintf(
|
||||
'%0'.self::SIZE_LENGTH.'.'.self::SIZE_LENGTH.'d',
|
||||
strlen($message));
|
||||
|
@ -67,6 +76,21 @@ final class PhutilJSONProtocolChannel extends PhutilProtocolChannel {
|
|||
$len = substr($this->buf, 0, self::SIZE_LENGTH);
|
||||
$this->buf = substr($this->buf, self::SIZE_LENGTH);
|
||||
|
||||
if (!preg_match('/^\d+\z/', $len)) {
|
||||
$full_buffer = $len.$this->buf;
|
||||
$full_length = strlen($full_buffer);
|
||||
|
||||
throw new Exception(
|
||||
pht(
|
||||
'Protocol channel expected %s-character, zero-padded '.
|
||||
'numeric frame length, got something else ("%s"). Full '.
|
||||
'buffer (of length %s) begins: %s',
|
||||
new PhutilNumber(self::SIZE_LENGTH),
|
||||
phutil_encode_log($len),
|
||||
new PhutilNumber($full_length),
|
||||
phutil_encode_log(substr($len.$this->buf, 0, 128))));
|
||||
}
|
||||
|
||||
$this->mode = self::MODE_OBJECT;
|
||||
$this->byteLengthOfNextChunk = (int)$len;
|
||||
break;
|
||||
|
|
|
@ -229,7 +229,7 @@ final class PhutilDeferredLog extends Phobject {
|
|||
if ($saw_percent) {
|
||||
$saw_percent = false;
|
||||
if (array_key_exists($c, $map)) {
|
||||
$result .= addcslashes($map[$c], "\0..\37\\\177..\377");
|
||||
$result .= phutil_encode_log($map[$c]);
|
||||
} else {
|
||||
$result .= '-';
|
||||
}
|
||||
|
|
|
@ -34,6 +34,10 @@ final class FuturePool
|
|||
return $this;
|
||||
}
|
||||
|
||||
public function getFutures() {
|
||||
return $this->futures;
|
||||
}
|
||||
|
||||
public function hasFutures() {
|
||||
return (bool)$this->futures;
|
||||
}
|
||||
|
|
|
@ -4,27 +4,28 @@ final class PhagePHPAgent extends Phobject {
|
|||
|
||||
private $stdin;
|
||||
private $master;
|
||||
private $exec = array();
|
||||
private $futurePool;
|
||||
|
||||
public function __construct($stdin) {
|
||||
$this->stdin = $stdin;
|
||||
}
|
||||
|
||||
public function execute() {
|
||||
$future_pool = $this->getFuturePool();
|
||||
|
||||
while (true) {
|
||||
if ($this->exec) {
|
||||
$iterator = new FutureIterator($this->exec);
|
||||
$iterator->setUpdateInterval(0.050);
|
||||
foreach ($iterator as $key => $future) {
|
||||
if ($future_pool->hasFutures()) {
|
||||
while ($future_pool->hasFutures()) {
|
||||
$future = $future_pool->resolve();
|
||||
|
||||
if ($future === null) {
|
||||
foreach ($this->exec as $read_key => $read_future) {
|
||||
$this->readFuture($read_key, $read_future);
|
||||
foreach ($future_pool->getFutures() as $read_future) {
|
||||
$this->readFuture($read_future);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
} else {
|
||||
$this->resolveFuture($key, $future);
|
||||
}
|
||||
$this->resolveFuture($future);
|
||||
}
|
||||
} else {
|
||||
PhutilChannel::waitForAny(array($this->getMaster()));
|
||||
|
@ -34,6 +35,22 @@ final class PhagePHPAgent extends Phobject {
|
|||
}
|
||||
}
|
||||
|
||||
private function getFuturePool() {
|
||||
if (!$this->futurePool) {
|
||||
$this->futurePool = $this->newFuturePool();
|
||||
}
|
||||
return $this->futurePool;
|
||||
}
|
||||
|
||||
private function newFuturePool() {
|
||||
$future_pool = new FuturePool();
|
||||
|
||||
$future_pool->getIteratorTemplate()
|
||||
->setUpdateInterval(0.050);
|
||||
|
||||
return $future_pool;
|
||||
}
|
||||
|
||||
private function getMaster() {
|
||||
if (!$this->master) {
|
||||
$raw_channel = new PhutilSocketChannel(
|
||||
|
@ -77,9 +94,10 @@ final class PhagePHPAgent extends Phobject {
|
|||
$future->setTimeout(ceil($timeout));
|
||||
}
|
||||
|
||||
$future->isReady();
|
||||
$future->setFutureKey($key);
|
||||
|
||||
$this->exec[$key] = $future;
|
||||
$this->getFuturePool()
|
||||
->addFuture($future);
|
||||
break;
|
||||
case 'EXIT':
|
||||
$this->terminateAgent();
|
||||
|
@ -87,8 +105,9 @@ final class PhagePHPAgent extends Phobject {
|
|||
}
|
||||
}
|
||||
|
||||
private function readFuture($key, ExecFuture $future) {
|
||||
private function readFuture(ExecFuture $future) {
|
||||
$master = $this->getMaster();
|
||||
$key = $future->getFutureKey();
|
||||
|
||||
list($stdout, $stderr) = $future->read();
|
||||
$future->discardBuffers();
|
||||
|
@ -114,7 +133,8 @@ final class PhagePHPAgent extends Phobject {
|
|||
}
|
||||
}
|
||||
|
||||
private function resolveFuture($key, ExecFuture $future) {
|
||||
private function resolveFuture(ExecFuture $future) {
|
||||
$key = $future->getFutureKey();
|
||||
$result = $future->resolve();
|
||||
$master = $this->getMaster();
|
||||
|
||||
|
@ -127,8 +147,6 @@ final class PhagePHPAgent extends Phobject {
|
|||
'stderr' => $result[2],
|
||||
'timeout' => (bool)$future->getWasKilledByTimeout(),
|
||||
));
|
||||
|
||||
unset($this->exec[$key]);
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
|
@ -136,9 +154,12 @@ final class PhagePHPAgent extends Phobject {
|
|||
}
|
||||
|
||||
private function terminateAgent() {
|
||||
foreach ($this->exec as $key => $future) {
|
||||
$pool = $this->getFuturePool();
|
||||
|
||||
foreach ($pool->getFutures() as $future) {
|
||||
$future->resolveKill();
|
||||
}
|
||||
|
||||
exit(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ final class PhagePHPAgentBootloader extends PhageAgentBootloader {
|
|||
'xsprintf/PhutilCommandString.php',
|
||||
'future/Future.php',
|
||||
'future/FutureIterator.php',
|
||||
'future/FuturePool.php',
|
||||
'future/exec/PhutilExecutableFuture.php',
|
||||
'future/exec/ExecFuture.php',
|
||||
'future/exec/CommandException.php',
|
||||
|
|
|
@ -1926,3 +1926,7 @@ function phutil_is_noninteractive() {
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
function phutil_encode_log($message) {
|
||||
return addcslashes($message, "\0..\37\\\177..\377");
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue