1
0
Fork 0
mirror of https://we.phorge.it/source/arcanist.git synced 2024-09-20 00:49:11 +02:00

Rough cut of hgdaemon

Summary:
This need a bunch of work, but is a sort of minimum viable product for the hgdaemon.

The two ProtocolChannels implement the client and server halves of the protocol.

The Server implements the server logic, the Client implements the client logic.

Test Plan: Launched a server and client on the same repo, got hg output in ~2-3ms instead of 100ms+.

Reviewers: csilvers, btrahan, vrana

Reviewed By: csilvers

CC: aran

Differential Revision: https://secure.phabricator.com/D2665
This commit is contained in:
epriestley 2012-06-07 18:23:57 -07:00
parent 6433f7ff3d
commit f9d55d809f
7 changed files with 953 additions and 0 deletions

View file

@ -0,0 +1,48 @@
#!/usr/bin/env php
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
require_once dirname(dirname(__FILE__)).'/__init_script__.php';
$args = new PhutilArgumentParser($argv);
$args->parseStandardArguments();
$args->parse(
array(
array(
'name' => 'repository',
'wildcard' => true,
),
));
$repo = $args->getArg('repository');
if (count($repo) !== 1) {
throw new Exception("Specify exactly one working copy!");
}
$repo = head($repo);
$daemon = new ArcanistHgProxyClient($repo);
$t_start = microtime(true);
$result = $daemon->executeCommand(
array('log', '--template', '{node}', '--rev', 2));
$t_end = microtime(true);
var_dump($result);
echo "\nExecuted in ".((int)(1000 * ($t_end - $t_start)))."ms.\n";

View file

@ -0,0 +1,39 @@
#!/usr/bin/env php
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
require_once dirname(dirname(__FILE__)).'/__init_script__.php';
$args = new PhutilArgumentParser($argv);
$args->parseStandardArguments();
$args->parse(
array(
array(
'name' => 'repository',
'wildcard' => true,
),
));
$repo = $args->getArg('repository');
if (count($repo) !== 1) {
throw new Exception("Specify exactly one working copy!");
}
$repo = head($repo);
$daemon = new ArcanistHgProxyServer($repo);
$daemon->start();

View file

@ -52,6 +52,10 @@ phutil_register_library_map(array(
'ArcanistGitAPI' => 'repository/api/ArcanistGitAPI.php',
'ArcanistGitHookPreReceiveWorkflow' => 'workflow/ArcanistGitHookPreReceiveWorkflow.php',
'ArcanistHelpWorkflow' => 'workflow/ArcanistHelpWorkflow.php',
'ArcanistHgClientChannel' => 'hgdaemon/ArcanistHgClientChannel.php',
'ArcanistHgProxyClient' => 'hgdaemon/ArcanistHgProxyClient.php',
'ArcanistHgProxyServer' => 'hgdaemon/ArcanistHgProxyServer.php',
'ArcanistHgServerChannel' => 'hgdaemon/ArcanistHgServerChannel.php',
'ArcanistHookAPI' => 'repository/hookapi/ArcanistHookAPI.php',
'ArcanistInstallCertificateWorkflow' => 'workflow/ArcanistInstallCertificateWorkflow.php',
'ArcanistJSHintLinter' => 'lint/linter/ArcanistJSHintLinter.php',
@ -164,6 +168,8 @@ phutil_register_library_map(array(
'ArcanistGitAPI' => 'ArcanistRepositoryAPI',
'ArcanistGitHookPreReceiveWorkflow' => 'ArcanistBaseWorkflow',
'ArcanistHelpWorkflow' => 'ArcanistBaseWorkflow',
'ArcanistHgClientChannel' => 'PhutilProtocolChannel',
'ArcanistHgServerChannel' => 'PhutilProtocolChannel',
'ArcanistInstallCertificateWorkflow' => 'ArcanistBaseWorkflow',
'ArcanistJSHintLinter' => 'ArcanistLinter',
'ArcanistLandWorkflow' => 'ArcanistBaseWorkflow',

View file

@ -0,0 +1,186 @@
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Channel to a Mercurial "cmdserver" client. For a detailed description of the
* "cmdserver" protocol, see @{class:ArcanistHgServerChannel}. This channel
* implements the other half of the protocol: it decodes messages from the
* client and encodes messages from the server.
*
* Because the proxy server speaks the exact same protocol that Mercurial
* does and fully decodes both sides of the protocol, we need this half of the
* decode/encode to talk to clients. Without it, we wouldn't be able to
* determine when a client request had completed and was ready for transmission
* to the Mercurial server.
*
* (Technically, we could get away without re-encoding messages from the
* server, but the serialization is not complicated and having a general
* implementation of encoded/decode for both the client and server dialects
* seemed useful.)
*
* @task protocol Protocol Implementation
*/
final class ArcanistHgClientChannel extends PhutilProtocolChannel {
const MODE_COMMAND = 'command';
const MODE_LENGTH = 'length';
const MODE_ARGUMENTS = 'arguments';
private $command;
private $byteLengthOfNextChunk;
private $buf = '';
private $mode = self::MODE_COMMAND;
/* -( Protocol Implementation )-------------------------------------------- */
/**
* Encode a message for transmission to the client. The message should be
* a pair with the channel name and the a block of data, like this:
*
* array('o', '<some data...>');
*
* We encode it like this:
*
* o
* 1234 # Length, as a 4-byte unsigned long.
* <data: 1234 bytes>
*
* For a detailed description of the cmdserver protocol, see
* @{class:ArcanistHgServerChannel}.
*
* @param pair<string,string> The <channel, data> pair to encode.
* @return string Encoded string for transmission to the client.
*
* @task protocol
*/
protected function encodeMessage($argv) {
if (!is_array($argv) || count($argv) !== 2) {
throw new Exception("Message should be <channel, data>.");
}
$channel = head($argv);
$data = last($argv);
$len = strlen($data);
$len = pack('N', $len);
return "{$channel}{$len}{$data}";
}
/**
* Decode a message received from the client. The message looks like this:
*
* runcommand\n
* 8 # Length, as a 4-byte unsigned long.
* log\0
* -l\0
* 5
*
* We decode it into a list in PHP, which looks like this:
*
* array(
* 'runcommand',
* 'log',
* '-l',
* '5',
* );
*
* @param string Bytes from the server.
* @return list<list<string>> Zero or more complete commands.
*
* @task protocol
*/
protected function decodeStream($data) {
$this->buf .= $data;
// The first part is terminated by "\n", so we don't always know how many
// bytes we need to look for. This makes parsing a bit of a pain.
$messages = array();
do {
$continue_parsing = false;
switch ($this->mode) {
case self::MODE_COMMAND:
// We're looking for "\n", which indicates the end of the command
// name, like "runcommand". Next, we'll expect a length.
$pos = strpos($this->buf, "\n");
if ($pos === false) {
break;
}
$this->command = substr($this->buf, 0, $pos);
$this->buf = substr($this->buf, $pos + 1);
$this->mode = self::MODE_LENGTH;
$continue_parsing = true;
break;
case self::MODE_LENGTH:
// We're looking for a byte length, as a 4-byte big-endian unsigned
// integer. Next, we'll expect that many bytes of data.
if (strlen($this->buf) < 4) {
break;
}
$len = substr($this->buf, 0, 4);
$len = unpack('N', $len);
$len = head($len);
$this->buf = substr($this->buf, 4);
$this->mode = self::MODE_ARGUMENTS;
$this->byteLengthOfNextChunk = $len;
$continue_parsing = true;
break;
case self::MODE_ARGUMENTS:
// We're looking for the data itself, which is a block of bytes
// of the given length. These are arguments delimited by "\0". Next
// we'll expect another command.
if (strlen($this->buf) < $this->byteLengthOfNextChunk) {
break;
}
$data = substr($this->buf, 0, $this->byteLengthOfNextChunk);
$this->buf = substr($this->buf, $this->byteLengthOfNextChunk);
$message = array_merge(array($this->command), explode("\0", $data));
$this->mode = self::MODE_COMMAND;
$this->command = null;
$this->byteLengthOfNextChunk = null;
$messages[] = $message;
$continue_parsing = true;
break;
}
} while ($continue_parsing);
return $messages;
}
}

View file

@ -0,0 +1,169 @@
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Client for an @{class:ArcanistHgProxyServer}. This client talks to a PHP
* process which serves as a proxy in front of a Mercurial server process.
* The PHP proxy allows multiple clients to use the same Mercurial server.
*
* This class presents an API which is similar to the hg command-line API.
*
* Each client is bound to a specific working copy:
*
* $working_copy = '/path/to/some/hg/working/copy/';
* $client = new ArcanistHgProxyClient($working_copy);
*
* For example, to run `hg log -l 5` via a client:
*
* $command = array('log', '-l', '5');
* list($err, $stdout, $stderr) = $client->executeCommand($command);
*
* The advantage of using this complex mechanism is that commands run in this
* way do not need to pay the startup overhead for hg and the Python runtime,
* which is often on the order of 100ms or more per command.
*
* @task construct Construction
* @task exec Executing Mercurial Commands
* @task internal Internals
*/
final class ArcanistHgProxyClient {
private $workingCopy;
private $server;
/* -( Construction )------------------------------------------------------- */
/**
* Build a new client. This client is bound to a working copy. A server
* must already be running on this working copy for the client to work.
*
* @param string Path to a Mercurial working copy.
*
* @task construct
*/
public function __construct($working_copy) {
$this->workingCopy = Filesystem::resolvePath($working_copy);
}
/* -( Executing Merucurial Commands )-------------------------------------- */
/**
* Execute a command (given as a list of arguments) via the command server.
*
* @param list<string> A list of command arguments, like "log", "-l", "5".
* @return tuple<int, string, string> Return code, stdout and stderr.
*
* @task exec
*/
public function executeCommand(array $argv) {
if (!$this->server) {
$this->server = $this->connectToDaemon();
}
$server = $this->server;
// Note that we're adding "runcommand" to make the server run the command.
// Theoretically the server supports other capabilities, but in practice
// we are only concerend with "runcommand".
$server->write(array_merge(array('runcommand'), $argv));
// We'll get back one or more blocks of response data, ending with an 'r'
// block which indicates the return code. Reconstitute these into stdout,
// stderr and a return code.
$stdout = '';
$stderr = '';
$err = 0;
$done = false;
while ($message = $server->waitForMessage()) {
// The $server channel handles decoding of the wire format and gives us
// messages which look like this:
//
// array('o', '<data...>');
list($channel, $data) = $message;
switch ($channel) {
case 'o':
$stdout .= $data;
break;
case 'e':
$stderr .= $data;
break;
case 'd':
// TODO: Do something with this? This is the 'debug' channel.
break;
case 'r':
// NOTE: This little dance is because the value is emitted as a
// big-endian signed 32-bit long. PHP has no flag to unpack() that
// can unpack these, so we unpack a big-endian unsigned long, then
// repack it as a machine-order unsigned long, then unpack it as
// a machine-order signed long. This appears to produce the desired
// result.
$err = head(unpack('N', $data));
$err = pack('L', $err);
$err = head(unpack('l', $err));
$done = true;
break;
}
if ($done) {
break;
}
}
return array($err, $stdout, $stderr);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
private function connectToDaemon() {
$errno = null;
$errstr = null;
$socket_path = ArcanistHgProxyServer::getPathToSocket($this->workingCopy);
$socket = stream_socket_client('unix://'.$socket_path, $errno, $errstr);
if ($errno || !$socket) {
throw new Exception(
"Unable to connect socket! Error #{$errno}: {$errstr}");
}
$channel = new PhutilSocketChannel($socket);
$server = new ArcanistHgServerChannel($channel);
// The protocol includes a "hello" message with capability and encoding
// information. Read and discard it, we use only the "runcommand" capability
// which is guaranteed to be available.
$hello = $server->waitForMessage();
return $server;
}
}

View file

@ -0,0 +1,312 @@
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Server which @{class:ArcanistHgProxyClient} clients connect to. This
* server binds to a Mercurial working copy and creates a Mercurial process and
* a unix domain socket in that working copy. It listens for connections on
* the socket, reads commands from them, and forwards their requests to the
* Mercurial process. It then returns responses to the original clients.
*
* Note that this server understands the underlying protocol and completely
* decodes messages from both the client and server before re-encoding them
* and relaying them to their final destinations. It must do this (at least
* in part) to determine where messages begin and end. Additionally, this proxy
* sends and receives the Mercurial cmdserver protocol exactly, without
* any extensions or sneakiness.
*
* The advantage of this mechanism is that it avoids the overhead of starting
* a Mercurial process for each Mercurial command, which can exceed 100ms per
* invocation. This server can also accept connections from multiple clients
* and serve them from a single Mercurial server process.
*
* @task construct Construction
* @task server Serving Requests
* @task client Managing Clients
* @task hg Managing Mercurial
* @task internal Internals
*/
final class ArcanistHgProxyServer {
private $workingCopy;
private $socket;
private $hello;
/* -( Construction )------------------------------------------------------- */
/**
* Build a new server. This server is bound to a working copy. The server
* is inactive until you @{method:start} it.
*
* @param string Path to a Mercurial working copy.
*
* @task construct
*/
public function __construct($working_copy) {
$this->workingCopy = Filesystem::resolvePath($working_copy);
}
/* -( Serving Requests )--------------------------------------------------- */
/**
* Start the server. This method does not return.
*
* @return never
*
* @task server
*/
public function start() {
// Create the unix domain socket in the working copy to listen for clients.
$socket = $this->startWorkingCopySocket();
$this->socket = $socket;
// TODO: Daemonize here.
// Start the Mercurial process which we'll forward client requests to.
$hg = $this->startMercurialProcess();
$clients = array();
$this->log(null, 'Listening');
while (true) {
// Wait for activity on any active clients, the Mercurial process, or
// the listening socket where new clients connect.
PhutilChannel::waitForAny(
array_merge($clients, array($hg)),
array(
'read' => array($socket),
'except' => array($socket),
));
if (!$hg->update()) {
throw new Exception("Server exited unexpectedly!");
}
// Accept any new clients.
while ($client = $this->acceptNewClient($socket)) {
$clients[] = $client;
$key = last_key($clients);
$client->setName($key);
$this->log($client, 'Connected');
}
// Update all the active clients.
foreach ($clients as $key => $client) {
$ok = $this->updateClient($client, $hg);
if (!$ok) {
$this->log($client, 'Disconnected');
unset($clients[$key]);
}
}
}
}
/**
* Update one client, processing any commands it has sent us. We fully
* process all commands we've received here before returning to the main
* server loop.
*
* @param ArcanistHgClientChannel The client to update.
* @param ArcanistHgServerChannel The Mercurial server.
*
* @task server
*/
private function updateClient(
ArcanistHgClientChannel $client,
ArcanistHgServerChannel $hg) {
if (!$client->update()) {
// Client has disconnected, don't bother proceeding.
return false;
}
// Read a command from the client if one is available. Note that we stop
// updating other clients or accepting new connections while processing a
// command, since there isn't much we can do with them until the server
// finishes executing this command.
$message = $client->read();
if (!$message) {
return true;
}
$this->log($client, '$ '.$message[0].' '.$message[1]);
$t_start = microtime(true);
// Forward the command to the server.
$hg->write($message);
while (true) {
PhutilChannel::waitForAny(array($client, $hg));
if (!$client->update() || !$hg->update()) {
// If either the client or server has exited, bail.
return false;
}
$response = $hg->read();
if (!$response) {
continue;
}
// Forward the response back to the client.
$client->write($response);
// If the response was on the 'r'esult channel, it indicates the end
// of the command output. We can process the next command (if any
// remain) or go back to accepting new connections and servicing
// other clients.
if ($response[0] == 'r') {
// Update the client immediately to try to get the bytes on the wire
// as quickly as possible. This gives us slightly more throughput.
$client->update();
break;
}
}
// Log the elapsed time.
$t_end = microtime(true);
$t = 1000000 * ($t_end - $t_start);
$this->log($client, '< '.number_format($t, 0).'us');
return true;
}
/* -( Managing Clients )--------------------------------------------------- */
/**
* @task client
*/
public static function getPathToSocket($working_copy) {
return $working_copy.'/.hg/hgdaemon-socket';
}
/**
* @task client
*/
private function startWorkingCopySocket() {
$errno = null;
$errstr = null;
$socket_path = self::getPathToSocket($this->workingCopy);
$socket_uri = 'unix://'.$socket_path;
$socket = @stream_socket_server($socket_uri, $errno, $errstr);
if ($errno || !$socket) {
Filesystem::remove($socket_path);
$socket = @stream_socket_server($socket_uri, $errno, $errstr);
}
if ($errno || !$socket) {
throw new Exception(
"Unable to start socket! Error #{$errno}: {$errstr}");
}
$ok = stream_set_blocking($socket, 0);
if ($ok === false) {
throw new Exception("Unable to set socket nonblocking!");
}
return $socket;
}
/**
* @task client
*/
private function acceptNewClient($socket) {
// NOTE: stream_socket_accept() always blocks, even when the socket has
// been set nonblocking.
$new_client = @stream_socket_accept($socket, $timeout = 0);
if (!$new_client) {
return;
}
$channel = new PhutilSocketChannel($new_client);
$client = new ArcanistHgClientChannel($channel);
$client->write($this->hello);
return $client;
}
/* -( Managing Mercurial )------------------------------------------------- */
/**
* Starts a Mercurial process which can actually handle requests.
*
* @return ArcanistHgServerChannel Channel to the Mercurial server.
* @task hg
*/
private function startMercurialProcess() {
// NOTE: "cmdserver.log=-" makes Mercurial use the 'd'ebug channel for
// log messages.
$command = 'HGPLAIN=1 hg --config cmdserver.log=- serve --cmdserver pipe';
$future = new ExecFuture($command);
$future->setCWD($this->workingCopy);
$channel = new PhutilExecChannel($future);
$hg = new ArcanistHgServerChannel($channel);
// The server sends a "hello" message with capability and encoding
// information. Save it and forward it to clients when they connect.
$this->hello = $hg->waitForMessage();
return $hg;
}
/* -( Internals )---------------------------------------------------------- */
/**
* Close and remove the unix domain socket in the working copy.
*
* @task internal
*/
public function __destruct() {
if ($this->socket) {
@stream_socket_shutdown($this->socket);
@fclose($this->socket);
Filesystem::remove(self::getPathToSocket($this->workingCopy));
$this->socket = null;
}
}
private function log($client, $message) {
if ($client) {
$message = '[Client '.$client->getName().'] '.$message;
} else {
$message = '[Server] '.$message;
}
echo $message."\n";
}
}

View file

@ -0,0 +1,193 @@
<?php
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Channel to a Mercurial "cmdserver" server. Messages sent to the server
* look like this:
*
* runcommand\n
* 8 # Length, as a 4-byte unsigned long.
* log\0
* -l\0
* 5
*
* In PHP, the format of these messages is an array of arguments:
*
* array(
* 'runcommand',
* 'log',
* '-l',
* '5',
* );
*
* The server replies with messages that look like this:
*
* o
* 1234 # Length, as a 4-byte unsigned long.
* <data: 1234 bytes>
*
* The first character in a message from the server is the "channel". Mercurial
* channels have nothing to do with Phutil channels; they are more similar to
* stdout/stderr. Mercurial has four primary channels:
*
* 'o'utput, like stdout
* 'e'rror, like stderr
* 'r'esult, like return codes
* 'd'ebug, like an external log file
*
* In PHP, the format of these messages is a pair, with the channel and then
* the data:
*
* array('o', '<data...>');
*
* In general, we send "runcommand" requests, and the server responds with
* a series of messages on the "output" channel and then a single response
* on the "result" channel to indicate that output is complete.
*
* @task protocol Protocol Implementation
*/
final class ArcanistHgServerChannel extends PhutilProtocolChannel {
const MODE_CHANNEL = 'channel';
const MODE_LENGTH = 'length';
const MODE_BLOCK = 'block';
private $mode = self::MODE_CHANNEL;
private $byteLengthOfNextChunk = 1;
private $buf = '';
/* -( Protocol Implementation )-------------------------------------------- */
/**
* Encode a message for transmission to the server. The message should be
* formatted as an array, like this:
*
* array(
* 'runcommand',
* 'log',
* '-l',
* '5',
* );
*
*
* We will return the cmdserver version of this:
*
* runcommand\n
* 8 # Length, as a 4-byte unsigned long.
* log\0
* -l\0
* 5
*
* @param list<string> List of command arguments.
* @return string Encoded string for transmission to the server.
*
* @task protocol
*/
protected function encodeMessage($argv) {
if (!is_array($argv)) {
throw new Exception("Message to Mercurial server should be an array.");
}
$command = head($argv);
$args = array_slice($argv, 1);
$args = implode("\0", $args);
$len = strlen($args);
$len = pack('N', $len);
return "{$command}\n{$len}{$args}";
}
/**
* Decode a message received from the server. The message looks like this:
*
* o
* 1234 # Length, as a 4-byte unsigned long.
* <data: 1234 bytes>
*
* ...where 'o' is the "channel" the message is being sent over.
*
* We decode into a pair in PHP, which looks like this:
*
* array('o', '<data...>');
*
* @param string Bytes from the server.
* @return list<pair<string,string>> Zero or more complete messages.
*
* @task protocol
*/
protected function decodeStream($data) {
$this->buf .= $data;
// We always know how long the next chunk is, so this parser is fairly
// easy to implement.
$messages = array();
while ($this->byteLengthOfNextChunk <= strlen($this->buf)) {
$chunk = substr($this->buf, 0, $this->byteLengthOfNextChunk);
$this->buf = substr($this->buf, $this->byteLengthOfNextChunk);
switch ($this->mode) {
case self::MODE_CHANNEL:
// We've received the channel name, one of 'o', 'e', 'r' or 'd' for
// 'output', 'error', 'result' or 'debug' respectively. This is a
// single byte long. Next, we'll expect a length.
$this->channel = $chunk;
$this->byteLengthOfNextChunk = 4;
$this->mode = self::MODE_LENGTH;
break;
case self::MODE_LENGTH:
// We've received the length of the data, as a 4-byte big-endian
// unsigned integer. Next, we'll expect the data itself.
$this->byteLengthOfNextChunk = head(unpack('N', $chunk));
$this->mode = self::MODE_BLOCK;
break;
case self::MODE_BLOCK:
// We've received the data itself, which is a block of bytes of the
// given length. We produce a message from the channel and the data
// and return it. Next, we expect another channel name.
$message = array($this->channel, $chunk);
$this->byteLengthOfNextChunk = 1;
$this->mode = self::MODE_CHANNEL;
$this->channel = null;
$messages[] = $message;
break;
}
}
// Return zero or more messages, which might look something like this:
//
// array(
// array('o', '<...>'),
// array('o', '<...>'),
// array('r', '<...>'),
// );
return $messages;
}
}