lib/Boris/EvalWorker.php
<?php
namespace Boris;
/**
* EvalWorker is responsible for evaluating PHP expressions in forked processes.
*/
class EvalWorker
{
const ABNORMAL_EXIT = 255;
const DONE = "\0";
const EXITED = "\1";
const FAILED = "\2";
const READY = "\3";
private $_socket;
private $_exports = array();
private $_startHooks = array();
private $_failureHooks = array();
private $_ppid;
private $_pid;
private $_cancelled;
private $_inspector;
private $_userExceptionHandler;
/**
* Create a new worker using the given socket for communication.
*
* @param resource $socket
*/
public function __construct($socket)
{
$this->_socket = $socket;
$this->_inspector = new DumpInspector();
stream_set_blocking($socket, 0);
}
/**
* Set local variables to be placed in the workers's scope.
*
* @param array|string $local
* @param mixed $value, if $local is a string
*/
public function setLocal($local, $value = null)
{
if (!is_array($local)) {
$local = array(
$local => $value
);
}
$this->_exports = array_merge($this->_exports, $local);
}
/**
* Set hooks to run inside the worker before it starts looping.
*
* @param array $hooks
*/
public function setStartHooks($hooks)
{
$this->_startHooks = $hooks;
}
/**
* Set hooks to run inside the worker after a fatal error is caught.
*
* @param array $hooks
*/
public function setFailureHooks($hooks)
{
$this->_failureHooks = $hooks;
}
/**
* Set an Inspector object for Boris to output return values with.
*
* @param object $inspector any object the responds to inspect($v)
*/
public function setInspector($inspector)
{
$this->_inspector = $inspector;
}
/**
* Start the worker.
*
* This method never returns.
*/
public function start()
{
$__scope = $this->_runHooks($this->_startHooks);
extract($__scope);
$this->_write($this->_socket, self::READY);
/* Note the naming of the local variables due to shared scope with the user here */
for (;;) {
declare (ticks = 1);
// don't exit on ctrl-c
pcntl_signal(SIGINT, SIG_IGN, true);
$this->_cancelled = false;
$__input = $this->_transform($this->_read($this->_socket));
if ($__input === null) {
continue;
}
$__response = self::DONE;
$this->_ppid = posix_getpid();
$this->_pid = pcntl_fork();
if ($this->_pid < 0) {
throw new \RuntimeException('Failed to fork child labourer');
} elseif ($this->_pid > 0) {
// kill the child on ctrl-c
pcntl_signal(SIGINT, array(
$this,
'cancelOperation'
), true);
pcntl_waitpid($this->_pid, $__status);
if (!$this->_cancelled && $__status != (self::ABNORMAL_EXIT << 8)) {
$__response = self::EXITED;
} else {
$this->_runHooks($this->_failureHooks);
$__response = self::FAILED;
}
} else {
// if the user has installed a custom exception handler, install a new
// one which calls it and then (if the custom handler didn't already exit)
// exits with the correct status.
// If not, leave the exception handler unset; we'll display
// an uncaught exception error and carry on.
$__oldexh = set_exception_handler(array(
$this,
'delegateExceptionHandler'
));
if ($__oldexh && !$this->_userExceptionHandler) {
$this->_userExceptionHandler = $__oldexh; // save the old handler (once)
} else {
restore_exception_handler();
}
// undo ctrl-c signal handling ready for user code execution
pcntl_signal(SIGINT, SIG_DFL, true);
$__pid = posix_getpid();
$__result = eval($__input);
if (posix_getpid() != $__pid) {
// whatever the user entered caused a forked child
// (totally valid, but we don't want that child to loop and wait for input)
exit(0);
}
if (preg_match('/\s*return\b/i', $__input)) {
fwrite(STDOUT, sprintf("%s\n", $this->_inspector->inspect($__result)));
}
$this->_expungeOldWorker();
}
$this->_write($this->_socket, $__response);
if ($__response == self::EXITED) {
exit(0);
}
}
}
/**
* While a child process is running, terminate it immediately.
*/
public function cancelOperation()
{
printf("Cancelling...\n");
$this->_cancelled = true;
posix_kill($this->_pid, SIGKILL);
pcntl_signal_dispatch();
}
/**
* Call the user-defined exception handler, then exit correctly.
*/
public function delegateExceptionHandler($ex)
{
call_user_func($this->_userExceptionHandler, $ex);
exit(self::ABNORMAL_EXIT);
}
// -- Private Methods
private function _runHooks($hooks)
{
extract($this->_exports);
foreach ($hooks as $__hook) {
if (is_string($__hook)) {
eval($__hook);
} elseif (is_callable($__hook)) {
call_user_func($__hook, $this, get_defined_vars());
} else {
throw new \RuntimeException(sprintf('Hooks must be closures or strings of PHP code. Got [%s].', gettype($__hook)));
}
// hooks may set locals
extract($this->_exports);
}
return get_defined_vars();
}
private function _expungeOldWorker()
{
posix_kill($this->_ppid, SIGTERM);
pcntl_signal_dispatch();
}
private function _write($socket, $data)
{
if (!fwrite($socket, $data)) {
throw new \RuntimeException('Socket error: failed to write data');
}
}
private function _read($socket)
{
$read = array(
$socket
);
$except = array(
$socket
);
if ($this->_select($read, $except) > 0) {
if ($read) {
return stream_get_contents($read[0]);
} else if ($except) {
throw new \UnexpectedValueException("Socket error: closed");
}
}
}
private function _select(&$read, &$except)
{
$write = null;
set_error_handler(function()
{
return true;
}, E_WARNING);
$result = stream_select($read, $write, $except, 10);
restore_error_handler();
return $result;
}
private function _transform($input)
{
if ($input === null) {
return null;
}
$transforms = array(
'exit' => 'exit(0)'
);
foreach ($transforms as $from => $to) {
$input = preg_replace('/^\s*' . preg_quote($from, '/') . '\s*;?\s*$/', $to . ';', $input);
}
return $input;
}
}