byjg/phpthread

View on GitHub
src/Handler/ForkHandler.php

Summary

Maintainability
A
55 mins
Test Coverage
<?php

namespace ByJG\PHPThread\Handler;

use ByJG\Cache\CacheContext;
use ByJG\Cache\Engine\ShmopCacheEngine;
use RuntimeException;

/**
 * Native Implementation of Threads in PHP.
 *
 * A class to spawn a thread. Only works in *nix environments,
 * as Windows platform is missing libpcntl.
 *
 * Forks the process.
 */
class ForkHandler implements ThreadInterface
{
    protected $threadKey;
    private $closure;
    private $pid;


    private $maxSharedMemorySize = null;
    private $defaultPermission = null;

    /**
     * constructor method
     *
     * @param int $maxSharedMemorySize
     * @param string $defaultPermission
     */
    public function __construct($maxSharedMemorySize, $defaultPermission)
    {
        if (!function_exists('pcntl_fork')) {
            throw new RuntimeException('PHP was compiled without --enable-pcntl or you are running on Windows.');
        }

        if (empty($maxSharedMemorySize)) {
            $maxSharedMemorySize = 0x100000;
        }

        if (empty($defaultPermission)) {
            $defaultPermission = '0700';
        }

        $this->maxSharedMemorySize = $maxSharedMemorySize;
        $this->defaultPermission = $defaultPermission;
    }

    /**
     * Private function for set the method will be forked;
     *
     * @param \Closure $closure
     * @return mixed|void
     */
    public function setClosure(\Closure $closure)
    {
        $this->closure = $closure;
    }

    /**
     * Start the thread
     *
     * @throws RuntimeException
     */
    public function execute()
    {
        $this->threadKey = 'thread_' . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999);

        if (($this->pid = pcntl_fork()) == -1) {
            throw new RuntimeException('Couldn\'t fork the process');
        }

        if ($this->pid) {
            // Parent
            //pcntl_wait($status); //Protect against Zombie children
        } else {
            // Child.
            pcntl_signal(SIGTERM, array($this, 'signalHandler'));
            $args = func_get_args();

            try {
                $return = call_user_func_array($this->closure, (array)$args);

                if (!is_null($return)) {
                    $this->saveResult($return);
                }
            // Executed only in PHP 7, will not match in PHP 5.x
            } catch (\Throwable $t) {
                $this->saveResult($t);
            // Executed only in PHP 5. Remove when PHP 5.x is no longer necessary.
            } catch (\Exception $ex) {
                $this->saveResult($ex);
            }

            exit(0);
        }
    }

    /**
     * @return \ByJG\Cache\Engine\ShmopCacheEngine
     */
    protected function getSharedMemoryEngine()
    {
        return new ShmopCacheEngine(
            [
                'max-size' => $this->maxSharedMemorySize,
                'default-permission' => $this->defaultPermission
            ]
        );
    }

    /**
     * Save the thread result in a shared memory block
     *
     * @param mixed $object Need to be serializable
     */
    protected function saveResult($object)
    {
        $this->getSharedMemoryEngine()->set($this->threadKey, $object);
    }

    /**
     * Get the thread result from the shared memory block and erase it
     *
     * @return mixed
     * @throws \Error
     * @throws object
     */
    public function getResult()
    {
        if (is_null($this->threadKey)) {
            return null;
        }

        $key = $this->threadKey;
        $this->threadKey = null;

        $cache = $this->getSharedMemoryEngine();
        $result = $cache->get($key);
        $cache->release($key);

        if (is_object($result) &&
            ($result instanceof \Exception
                || $result instanceof \Throwable
                || $result instanceof \Error
            )
        ) {
            throw $result;
        }

        return $result;
    }

    /**
     * Kill a thread
     *
     * @param int $signal
     * @param bool $wait
     */
    public function stop($signal = SIGKILL, $wait = false)
    {
        if ($this->isAlive()) {
            posix_kill($this->pid, $signal);

            $status = null;
            if ($wait) {
                pcntl_waitpid($this->pid, $status);
            }
        }
    }

    /**
     * Check if the forked process is alive
     * @return bool
     */
    public function isAlive()
    {
        $status = null;
        return (pcntl_waitpid($this->pid, $status, WNOHANG) === 0);
    }

    /**
     * Handle the signal to the thread
     *
     * @param int $signal
     */
    private function signalHandler($signal)
    {
        switch ($signal) {
            case SIGTERM:
                exit(0);
        }
    }

    public function waitFinish()
    {
        pcntl_wait($status);
        if ($this->isAlive()) {
            $this->waitFinish();
        }
    }

    public function getClassName()
    {
        return ForkHandler::class;
    }
}