Mirocow/yii2-queue

View on GitHub
components/QueueComponent.php

Summary

Maintainability
D
1 day
Test Coverage
<?php

namespace mirocow\queue\components;

use Amp\Loop;
use mirocow\queue\exceptions\QueueException;
use mirocow\queue\models\MessageModel;
use yii\di\ServiceLocator;

/**
 * Main component of Yii queue
 *
 * Class QueueComponent
 * @package mirocow\queue\components
 * @property $regChannels ServiceLocator
 * @property $regWorkers ServiceLocator
 */
class QueueComponent extends \yii\base\Component implements \mirocow\queue\interfaces\QueueInterface
{

    /**
     * Version.
     *
     * @var string
     */
    const VERSION = '0.0.3';

    const OS_TYPE_LINUX = 0;

    const OS_TYPE_WIN = 1;

    /**
     * @var string
     */
    public $queueName = 'queue';

    /**
     * @var array
     */
    public $channels = [];

    /**
     * @var array
     */
    public $workers = [];

    /**
     * @var int
     */
    public $timer_tick = 10;

    /**
     * @var string
     */
    public $pidFile = '';

    /**
     * @var int
     */
    public $delayForIfRiseException = 300;

    /**
     * @var bool
     */
    public $daemonize = true;

    /**
     * @var bool
     */
    public $multithreading = true;

    /**
     * @var bool
     */
    public $repeatIfRiseException = false;

    /**
     * Daemonize.
     *
     * @var bool
     */
    protected static $_daemonize = false;

    /**
     * OS.
     *
     * @var string
     */
    protected static $_OS = self::OS_TYPE_LINUX;

    /**
     * Standard output stream
     * @var resource
     */
    protected static $_outputStream = null;

    /**
     * If $outputStream support decorated
     * @var bool
     */
    protected static $_outputDecorated = null;

    /**
     * @var null
     */
    protected $regWorkers = null;

    /**
     * @var null
     */
    protected $regChannels = null;

    /**
     * @var null
     */
    private $_pid = null;

    /**
     * @var array
     */
    private $_children = [];

    /**
     * @var bool
     */
    private $isChild = false;

    /**
     * @var bool
     */
    private $_active = true;

    /**
     * @throws QueueException
     */
    public function init()
    {
        parent::init();

        self::$_daemonize = $this->daemonize;

        if (!empty($this->channels)) {

            $this->regChannels = new ServiceLocator();

            foreach ($this->channels as $channelName => $channel) {
                $channel['queueName'] = $this->queueName;
                $channel['channelName'] = $channelName;
                $this->regChannels->set($channelName, $channel);
            }

        } else throw new QueueException("Empty channels!");

        if (!empty($this->workers)) {

            $this->regWorkers = new ServiceLocator();
            foreach ($this->workers as $workerName => $worker) {
                $worker['workerName'] = $workerName;
                $this->regWorkers->set($workerName, $worker);
            }

        } else throw new QueueException("Empty workers!");

    }

    /**
     * @param string $name
     * @return ChannelComponent
     * @throws QueueException
     */
    public function getChannel($name = 'default')
    {
        $name = empty($name) ? ($this->getChannelNamesList()[0] ?: '') : $name;

        if (isset($this->channels[$name])) {
            return $this->regChannels->{$name};
        } else {
            throw new QueueException("Channel `{$name}` not exist! Pls configure it before usage.");
        }
    }

    /**
     * @param $name
     * @return mixed
     * @throws QueueException
     */
    public function getWorker($name)
    {
        $name = empty($name) ? ($this->getChannelNamesList()[0] ?: '') : $name;

        if (isset($this->workers[$name])) {
            return $this->regWorkers->get($name);
        } else {
            throw new QueueException("Worker $name not exist!");
        }
    }

    /**
     * @return array
     */
    public function getChannelNamesList()
    {
        return array_keys($this->channels);
    }

    /**
     * @return array
     */
    public function getWorkerNamesList()
    {
        return array_keys($this->workers);
    }

    /**
     * @param $pid
     */
    public function setPid($pid)
    {
        $this->_pid = $pid;
    }

    /**
     * @return null
     */
    public function getPid()
    {
        return $this->_pid;
    }

    /**
     * @param MessageModel $messageModel
     * @var $watcherId
     * @var integer $pid
     */
    private function processMessage(MessageModel $messageModel, $watcherId = null, $pid)
    {
        /** @var WorkerComponent $worker */
        if ($worker = $this->getWorker($messageModel->worker)) {
            pcntl_signal_dispatch();
            try {
                $this->log("Process {$pid} are working...\n");
                $worker->setMessage($messageModel);
                $worker->setWatcherId($watcherId);
                $worker->run();
                $this->log("Process {$pid} finished\n");
            } catch (\Throwable $e) {
                $this->log("Rise exception \"" . $e->getMessage() . "\" in child {$pid} process\n");
                $this->log("File " . $e->getFile() . " (" . $e->getLine() . ")\n");
                if ($this->repeatIfRiseException) {
                    $channel->push($message, $this->delayForIfRiseException);
                }
                throw $e;
            } finally {
                unset($worker);
            }
        }
    }

    /**
     * @param array $message
     * @param $watcherId
     * @param ChannelComponent $channel
     * @throws \Exception
     * @throws \Throwable
     * @throws \mirocow\queue\exceptions\ChannelException
     */
    protected function child($message = [], $watcherId, ChannelComponent $channel)
    {
        if ($this->multithreading) {
            $pid = pcntl_fork();
        } else {
            $pid = null;
        }

        if ($pid == -1) {
            throw new \Exception("Can`t fork process");
        } else if ($pid) {
            $this->_children[] = $pid;
        } else {
            $pid = posix_getpid();

            if ($this->multithreading) {
                $this->log("Child process starting with PID {$pid} ...\n");
                $this->isChild = true;
            } else {
                $this->log("Worker process starting with PID {$pid} ...\n");
            }

            $this->setSignalHandlers([$this, 'childSignalHandler']);
            $this->processMessage($message, $watcherId, $pid);
            if ($this->isChild) {
                exit(0);
            }
        }
    }

    /**
     * Wait till child process send gignal
     */
    protected function waitChildren()
    {
        while (($signaled_pid = pcntl_waitpid(-1, $status, WNOHANG)) || count($this->_children) > 0) {
            pcntl_signal_dispatch();
            if ($signaled_pid == -1) {
                $this->_children = [];
                break;
            } elseif ($signaled_pid) {
                self::log("Child {$signaled_pid} done\n");
                unset($this->_children[$signaled_pid]);
            }
        }
    }

    /**
     * Main singnal handler
     * @param $signo
     */
    public function mainSignalHandler($signo)
    {
        switch ($signo) {
            case SIGTERM:
            case SIGINT:
            case SIGHUP;
                $this->log("Main process catch signal {$signo}\n");
                $this->sendSignal(SIGINT);
                Loop::stop();
                break;
            default:
                $this->log("Signal {$signo} does not have handlers\n");
        }
    }

    /**
     * Child singanal handler
     * @param $signo
     */
    public function childSignalHandler($signo)
    {
        switch ($signo) {
            case SIGTERM:
            case SIGINT:
            case SIGHUP;
                $this->log("Child process catch signal {$signo}\n");
                break;
            default:
                $this->log("Signal {$signo} does not have handlers\n");
        }
    }

    /**
     * Start daemon
     */
    public function start()
    {
        Loop::run(function () {

            if (true !== $this->addSignals()) {
                throw new \Exception('No signals!');
            }

            self::daemonize();

            $this->setPid(getmypid());

            if ($this->pidFile) {
                file_put_contents($this->pidFile, $this->getPid());
            }

            self::displayUI($this);

            Loop::repeat($this->timer_tick, function ($watcherId) {
                if($this->run(null, $watcherId)){
                    Loop::stop();
                }
            });

        });
    }

    /**
     * Run queue
     * @param string|null $channelName
     * @throws QueueException
     */
    public function run($channelName = null, $watcherId = null)
    {
        if (!$channelName) {
            $channels = $this->getChannelNamesList();
        } else {
            $channels = [$channelName];
        }

        foreach ($channels as $channelName) {
            /** @var ChannelComponent $channel */
            $channel = $this->getChannel($channelName);

            pcntl_signal_dispatch();
            try {
                if ($message = $channel->pop()) {
                    $this->child($message, $watcherId, $channel);
                    $this->setSignalHandlers([$this, 'mainSignalHandler']);
                    if($this->multithreading) {
                        $this->waitChildren();
                    }
                }
            } catch (\Throwable $e) {
                \Yii::error($e, __METHOD__);
                $this->log("Rise exception \"" . $e->getMessage() . "\n");
                if ($this->isChild) {
                    exit(0);
                } else {
                    return true;
                }
            } finally {
                unset($channel);
            }
            // Notify the children of the completion of work
            $this->sendSignal(SIGINT);
            // Set all signals
            pcntl_signal_dispatch();
        }
    }


    /**
     * Set pcntl signal handler
     * @param $handler
     * @return bool
     * @throws \Exception
     */
    protected function setSignalHandlers($handler)
    {
        if (!function_exists('pcntl_signal_dispatch')) {
            throw new \Exception('Not installed function pcntl_signal_dispatch');
        }
        if (!function_exists('pcntl_signal')) {
            throw new \Exception('Not installed function pcntl_signal');
        }
        if (
            pcntl_signal(SIGTERM, $handler) &&
            pcntl_signal(SIGINT, $handler) &&
            pcntl_signal(SIGHUP, $handler)
        ) {
            return true;
        } else {
            throw new \Exception('Not set handler pcntl_signal');
        }
    }

    /**
     * Set amphp signal hendler
     * @return bool
     */
    private function addSignals()
    {
        if (php_sapi_name() === "phpdbg") {
            // phpdbg captures SIGINT so don't bother inside the debugger
            return;
        }

        Loop::onSignal(SIGINT, function () {
            $this->log("Server has killed\n");
            Loop::stop();
        });
        Loop::onSignal(SIGTERM, function () {
            $this->log("Server has killed\n");
            Loop::stop();
        });
        Loop::onSignal(SIGHUP, function () {
            $this->log("Server has killed\n");
            Loop::stop();
        });

        return true;
    }

    /**
     * Send signal to children processes
     * @param int $signo\
     */
    private function sendSignal($signo = SIGINT)
    {
        foreach ($this->_children as $_childrenPid) {
            $this->log("Send signal {$signo} to child process {$_childrenPid}\n");
            if(posix_kill($_childrenPid, $signo)){
                $this->log("Child process $_childrenPid signal {$signo} catched\n");
            }
        }
    }

    /**
     * Output debug message (work only YII_DEBUG = true)
     * @param $message
     */
    private function log($message)
    {
        static::safeEcho($message, true);
    }

    /**
     * Run as deamon mode.
     *
     * @throws Exception
     */
    protected static function daemonize()
    {
        if (!static::$_daemonize || static::$_OS !== self::OS_TYPE_LINUX) {
            return;
        }
        umask(0);
        $pid = pcntl_fork();
        if (-1 === $pid) {
            throw new Exception('fork fail');
        } elseif ($pid > 0) {
            exit(0);
        }
        if (-1 === posix_setsid()) {
            throw new Exception("setsid fail");
        }
        // Fork again avoid SVR4 system regain the control of terminal.
        $pid = pcntl_fork();
        if (-1 === $pid) {
            throw new Exception("fork fail");
        } elseif (0 !== $pid) {
            exit(0);
        }
    }

    /**
     * Display staring UI.
     *
     * @param $instance
     * @return void
     */
    protected static function displayUI($instance)
    {
        global $argv;
        if (in_array('-q', $argv)) {
            return;
        }

        $pid = getmypid();

        static::safeEcho("----------------------- YII2-QUEUE -----------------------------\r\n");
        static::safeEcho('Yii2-queue version: ' . static::VERSION . "          PHP version:" . PHP_VERSION . "\r\n");
        static::safeEcho('Process ID: '. $pid . "\r\n");
        static::safeEcho('Channels: '. count($instance->channels) . "\r\n");
        static::safeEcho('Workers: '. count($instance->workers) . "\r\n");
        static::safeEcho('Default queue name: ' . $instance->queueName . "\r\n");
        static::safeEcho("----------------------------------------------------------------\n");

        if (static::$_OS !== self::OS_TYPE_LINUX) {
            return;
        }

        if (static::$_daemonize) {
            static::safeEcho("Input \"kill -2 {$pid}\" to stop. Start success.\n\n");
        } else {
            static::safeEcho("Press Ctrl+C to stop. Start success.\n");
        }
        
    }

    /**
     * Safe Echo.
     * @param $msg
     * @param bool $decorated
     * @return bool
     */
    protected static function safeEcho($msg, $decorated = false)
    {
        $stream = static::outputStream();
        if (!$stream) {
            return false;
        }
        if (!$decorated) {
            $line = $white = $green = $end = '';
            if (static::$_outputDecorated) {
                $line = "\033[1A\n\033[K";
                $white = "\033[47;30m";
                $green = "\033[32;40m";
                $end = "\033[0m";
            }
            $msg = str_replace(['<n>', '<w>', '<g>'], [$line, $white, $green], $msg);
            $msg = str_replace(['</n>', '</w>', '</g>'], $end, $msg);
        } elseif (!static::$_outputDecorated) {
            return false;
        }
        fwrite($stream, $msg);
        fflush($stream);
        return true;
    }

    /**
     * @param null $stream
     * @return bool|resource
     */
    protected static function outputStream($stream = null)
    {
        if (!$stream) {
            $stream = static::$_outputStream ? static::$_outputStream : STDOUT;
        }
        if (!$stream || !is_resource($stream) || 'stream' !== get_resource_type($stream)) {
            return false;
        }
        $stat = fstat($stream);
        if (($stat['mode'] & 0170000) === 0100000) {
            // file
            static::$_outputDecorated = false;
        } else {
            static::$_outputDecorated =
                static::$_OS === self::OS_TYPE_LINUX &&
                function_exists('posix_isatty') &&
                posix_isatty($stream);
        }
        return static::$_outputStream = $stream;
    }

}