artur-graniszewski/ZEUS-for-PHP

View on GitHub
src/Zeus/Kernel/ProcessManager/Process.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php

namespace Zeus\Kernel\ProcessManager;

use Zend\EventManager\EventInterface;
use Zend\EventManager\EventManagerInterface;
use Zend\Log\LoggerInterface;
use Zeus\Kernel\IpcServer\Message;
use Zeus\Kernel\ProcessManager\Status\ProcessState;

/**
 * Class Process
 * @package Zeus\Kernel\ProcessManager
 * @internal
 */
final class Process
{
    /** @var int Time to live before terminating (# of requests left till the auto-shutdown) */
    protected $ttl;

    /** @var ProcessState */
    protected $status;

    /** @var string */
    protected $processId;

    /** @var EventManagerInterface */
    protected $events;

    /** @var LoggerInterface */
    protected $logger;

    /** @var SchedulerEvent */
    protected $event;

    /**
     * @param string $processId
     * @return $this
     */
    public function setId($processId)
    {
        $this->processId = $processId;

        return $this;
    }

    /**
     * @param LoggerInterface $logger
     * @return $this
     */
    public function setLogger(LoggerInterface $logger)
    {
        $this->logger = $logger;

        return $this;
    }

    /**
     * @param EventManagerInterface $eventManager
     * @return $this
     */
    public function attach(EventManagerInterface $eventManager)
    {
        $this->events = $eventManager;
        $this->events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) {
            $config = $event->getScheduler()->getConfig();
            $event->setProcess($this);
            $this->setId($event->getParam('uid'));
            $this->setConfig($config);
            $this->status = new ProcessState($config->getServiceName());
            $this->event = $event;
        }, SchedulerEvent::PRIORITY_INITIALIZE);

        $this->events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) {
            $this->mainLoop();
        }, SchedulerEvent::PRIORITY_FINALIZE);
        return $this;
    }

    /**
     * @param ConfigInterface $config
     * @return $this
     */
    public function setConfig(ConfigInterface $config)
    {
        // set time to live counter
        $this->ttl = $config->getMaxProcessTasks();

        return $this;
    }

    /**
     * Process constructor.
     */
    public function __construct()
    {
        set_exception_handler([$this, 'terminateProcess']);
    }

    /**
     * @param int $type
     * @param mixed $message
     * @return $this
     */
    protected function sendMessage($type, $message)
    {
        $payload = [
            'isEvent' => false,
            'type' => $type,
            'priority' => $type,
            'message' => $message,
            'extra' => [
                'uid' => $this->getId(),
                'logger' => __CLASS__,
                'status' => $this->status->toArray()
            ]
        ];

        $event = $this->event;
        $event->setParams($payload);
        $event->setName(SchedulerEvent::EVENT_PROCESS_MESSAGE);
        $this->events->triggerEvent($event);

        return $this;
    }

    /**
     * @param string $statusDescription
     * @return $this
     */
    public function setRunning($statusDescription = null)
    {
        if ($this->status->getCode() === ProcessState::RUNNING) {
            $now = time();
            if ($statusDescription === $this->status->getStatusDescription() && $this->status->getTime() === $now) {
                return $this;
            }

            $this->status->setTime($now);
        } else {
            $this->getStatus()->incrementNumberOfFinishedTasks(1);
        }

        $this->status->setStatusDescription($statusDescription);
        $this->sendStatus(ProcessState::RUNNING, $statusDescription);
        $event = $this->event;
        $event->setName(SchedulerEvent::EVENT_PROCESS_RUNNING);
        $event->setParams($this->status->toArray());
        $this->events->triggerEvent($event);

        return $this;
    }

    /**
     * @param string $statusDescription
     * @return $this
     */
    public function setWaiting($statusDescription = null)
    {
        if ($this->status->getCode() === ProcessState::WAITING
            &&
            $statusDescription === $this->status->getStatusDescription()
        ) {
            return $this;
        }

        $this->status->setStatusDescription($statusDescription);
        $event = $this->event;
        $event->setName(SchedulerEvent::EVENT_PROCESS_WAITING);
        $event->setParams($this->status->toArray());
        $this->events->triggerEvent($event);
        $this->sendStatus(ProcessState::WAITING, $statusDescription);

        return $this;
    }

    /**
     * @param \Exception $exception
     * @return $this
     */
    protected function reportException($exception)
    {
        $this->logger->err(sprintf("Exception (%d): %s in %s on line %d",
            $exception->getCode(),
            addcslashes($exception->getMessage(), "\t\n\r\0\x0B"),
            $exception->getFile(),
            $exception->getLine()
        ));
        $this->logger->debug(sprintf("Stack Trace:\n%s", $exception->getTraceAsString()));

        return $this;
    }

    /**
     * @param \Exception|\Throwable|null $exception
     */
    protected function terminateProcess($exception = null)
    {
        // process is terminating, time to live equals zero
        $this->logger->debug(sprintf("Shutting down after finishing %d tasks", $this->status->getNumberOfFinishedTasks()));

        $this->ttl = 0;

        $this->sendStatus(ProcessState::EXITING);

        $payload = $this->status->toArray();

        if ($exception) {
            $payload['exception'] = $exception;
        }

        $event = $this->event;
        $event->setName(SchedulerEvent::EVENT_PROCESS_EXIT);
        $event->setParams($payload);

        $this->events->triggerEvent($event);
    }

    /**
     * Listen for incoming requests.
     *
     * @return $this
     */
    public function mainLoop()
    {
        $this->events->attach(SchedulerEvent::EVENT_PROCESS_LOOP, function(EventInterface $event) {
            $this->sendStatus($this->status->getCode());
        });

        $exception = null;
        $this->setWaiting();

        // handle only a finite number of requests and terminate gracefully to avoid potential memory/resource leaks
        while ($this->ttl - $this->status->getNumberOfFinishedTasks() > 0) {
            $exception = null;
            try {
                $event = $this->event;
                $event->setName(SchedulerEvent::EVENT_PROCESS_LOOP);
                $event->setParams($this->status->toArray());
                $this->events->triggerEvent($event);
            } catch (\Exception $exception) {
                $this->reportException($exception);
            } catch (\Throwable $exception) {
                $this->reportException($exception);
            }
            $this->setWaiting($this->getStatus()->getStatusDescription());
        }

        $this->terminateProcess();
    }

    /**
     * @return int
     */
    public function getId()
    {
        return $this->processId;
    }

    /**
     * @param int $statusCode
     * @param string $statusDescription
     * @return $this
     */
    protected function sendStatus($statusCode, $statusDescription = null)
    {
        $oldStatus = $this->status->getCode();
        $this->status->setCode($statusCode);
        $this->status->updateStatus();

        // send new status to Scheduler only if it changed
        if ($oldStatus !== $statusCode) {
            $this->sendMessage(Message::IS_STATUS, $statusDescription ? $statusDescription : '');
        }

        return $this;
    }

    /**
     * @return EventManagerInterface
     */
    public function getEventManager()
    {
        return $this->events;
    }

    /**
     * @return ProcessState
     */
    public function getStatus()
    {
        return $this->status;
    }
}