stdin * stdout <- stdout * stderr <- stderr * * You can provide **read and write callbacks** which are invoked as data * is passed through this class. They allow you to inspect and modify traffic. * * IO Channel Passthru Command Channel * stdout -> willWrite -> stdin * stdin <- willRead <- stdout * stderr <- (identity) <- stderr * * Primarily, this means: * * - the **IO Channel** can be a @{class:PhutilProtocolChannel} if the * **write callback** can convert protocol messages into strings; and * - the **write callback** can inspect and reject requests over the channel, * e.g. to enforce policies. * * In practice, this is used when serving repositories to check each command * issued over SSH and determine if it is a read command or a write command. * Writes can then be checked for appropriate permissions. */ final class PhabricatorSSHPassthruCommand extends Phobject { private $commandChannel; private $ioChannel; private $errorChannel; private $execFuture; private $willWriteCallback; private $willReadCallback; public function setCommandChannelFromExecFuture(ExecFuture $exec_future) { $exec_channel = new PhutilExecChannel($exec_future); $exec_channel->setStderrHandler(array($this, 'writeErrorIOCallback')); $this->execFuture = $exec_future; $this->commandChannel = $exec_channel; return $this; } public function setIOChannel(PhutilChannel $io_channel) { $this->ioChannel = $io_channel; return $this; } public function setErrorChannel(PhutilChannel $error_channel) { $this->errorChannel = $error_channel; return $this; } public function setWillReadCallback($will_read_callback) { $this->willReadCallback = $will_read_callback; return $this; } public function setWillWriteCallback($will_write_callback) { $this->willWriteCallback = $will_write_callback; return $this; } public function writeErrorIOCallback(PhutilChannel $channel, $data) { $this->errorChannel->write($data); } public function execute() { $command_channel = $this->commandChannel; $io_channel = $this->ioChannel; $error_channel = $this->errorChannel; if (!$command_channel) { throw new Exception('Set a command channel before calling execute()!'); } if (!$io_channel) { throw new Exception('Set an IO channel before calling execute()!'); } if (!$error_channel) { throw new Exception('Set an error channel before calling execute()!'); } $channels = array($command_channel, $io_channel, $error_channel); // We want to limit the amount of data we'll hold in memory for this // process. See T4241 for a discussion of this issue in general. $buffer_size = (1024 * 1024); // 1MB $io_channel->setReadBufferSize($buffer_size); $command_channel->setReadBufferSize($buffer_size); // TODO: This just makes us throw away stderr after the first 1MB, but we // don't currently have the support infrastructure to buffer it correctly. // It's difficult to imagine this causing problems in practice, though. $this->execFuture->getStderrSizeLimit($buffer_size); while (true) { PhutilChannel::waitForAny($channels); $io_channel->update(); $command_channel->update(); $error_channel->update(); // If any channel is blocked on the other end, wait for it to flush before // we continue reading. For example, if a user is running `git clone` on // a 1GB repository, the underlying `git-upload-pack` may // be able to produce data much more quickly than we can send it over // the network. If we don't throttle the reads, we may only send a few // MB over the I/O channel in the time it takes to read the entire 1GB off // the command channel. That leaves us with 1GB of data in memory. while ($command_channel->isOpen() && $io_channel->isOpenForWriting() && ($command_channel->getWriteBufferSize() >= $buffer_size || $io_channel->getWriteBufferSize() >= $buffer_size || $error_channel->getWriteBufferSize() >= $buffer_size)) { PhutilChannel::waitForActivity(array(), $channels); $io_channel->update(); $command_channel->update(); $error_channel->update(); } // If the subprocess has exited and we've read everything from it, // we're all done. $done = !$command_channel->isOpenForReading() && $command_channel->isReadBufferEmpty(); $in_message = $io_channel->read(); if ($in_message !== null) { $in_message = $this->willWriteData($in_message); if ($in_message !== null) { $command_channel->write($in_message); } } $out_message = $command_channel->read(); if ($out_message !== null) { $out_message = $this->willReadData($out_message); if ($out_message !== null) { $io_channel->write($out_message); } } // If we have nothing left on stdin, close stdin on the subprocess. if (!$io_channel->isOpenForReading()) { $command_channel->closeWriteChannel(); } if ($done) { break; } // If the client has disconnected, kill the subprocess and bail. if (!$io_channel->isOpenForWriting()) { $this->execFuture ->setStdoutSizeLimit(0) ->setStderrSizeLimit(0) ->setReadBufferSize(null) ->resolveKill(); break; } } list($err) = $this->execFuture ->setStdoutSizeLimit(0) ->setStderrSizeLimit(0) ->setReadBufferSize(null) ->resolve(); return $err; } public function willWriteData($message) { if ($this->willWriteCallback) { return call_user_func($this->willWriteCallback, $this, $message); } else { if (strlen($message)) { return $message; } else { return null; } } } public function willReadData($message) { if ($this->willReadCallback) { return call_user_func($this->willReadCallback, $this, $message); } else { if (strlen($message)) { return $message; } else { return null; } } } }