Mezzle/queuejitsu

View on GitHub
src/Worker/AbstractWorker.php

Summary

Maintainability
A
35 mins
Test Coverage
<?php
/*
 * Copyright (c) 2017 - 2020 Martin Meredith
 * Copyright (c) 2017 Stickee Technology Limited
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

declare(strict_types=1);

namespace QueueJitsu\Worker;

use Laminas\EventManager\EventManagerAwareInterface;
use Laminas\EventManager\EventManagerAwareTrait;
use Psr\Log\LoggerInterface;
use QueueJitsu\Exception\ForkFailureException;

/**
 * Class AbstractWorker
 *
 * @package QueueJitsu\Worker
 */
abstract class AbstractWorker implements EventManagerAwareInterface
{
    use EventManagerAwareTrait;

    /**
     * @var bool|null|int $child
     */
    protected $child;

    /**
     * @var bool $finish
     */
    protected $finish = false;

    /**
     * @var string|false $hostname
     */
    protected $hostname;

    /**
     * @var string $id
     */
    protected $id;

    /**
     * @var int $interval
     */
    protected $interval = 5;

    /**
     * @var \Psr\Log\LoggerInterface $log
     */
    protected $log;

    /**
     * @var \QueueJitsu\Worker\WorkerManager $manager
     */
    protected $manager;

    /**
     * @var bool $paused
     */
    protected $paused = false;

    /**
     * @var string $worker_name
     */
    protected $worker_name;

    /**
     * AbstractWorker constructor.
     *
     * @param \Psr\Log\LoggerInterface $log
     * @param \QueueJitsu\Worker\WorkerManager $manager
     */
    public function __construct(LoggerInterface $log, WorkerManager $manager)
    {
        $this->log = $log;
        $this->hostname = gethostname();
        $this->worker_name = sprintf('%s:%d', $this->hostname, getmypid());
        $this->id =
            sprintf('%s:%s', $this->worker_name, $this->getWorkerIdentifier());
        $this->manager = $manager;
    }

    /**
     * __invoke
     *
     * @param int $interval
     */
    public function __invoke($interval = 5): void
    {
        $this->interval = $interval;

        $this->updateProcLine('Starting');
        $this->startup();

        while (true) {
            if ($this->finish) {
                break;
            }

            if ($this->paused) {
                $this->sleep();

                continue;
            }

            $this->loop();
        }

        $this->manager->unregisterWorker($this->getId());
    }

    /**
     * getId
     *
     * @return string
     */
    public function getId(): string
    {
        return $this->id;
    }

    /**
     * continueProcessing
     */
    public function continueProcessing(): void
    {
        $this->log->info('CONT received; resuming job processing');
        $this->paused = false;
    }

    /**
     * pauseProcessing
     */
    public function pauseProcessing(): void
    {
        $this->log->info('USR2 received; pausing job processing');
        $this->paused = true;
    }

    /**
     * shutdownNow
     */
    public function shutdownNow(): void
    {
        $this->log->warning('Forced Shutdown Started');
        $this->shutdown();
        $this->killChild();
    }

    /**
     * shutdown
     */
    public function shutdown(): void
    {
        $this->finish = true;
        $this->log->info('Exiting...');
    }

    /**
     * killChild
     */
    public function killChild(): void
    {
        if (is_null($this->child)) {
            $this->log->debug('No child to kill');

            return;
        }

        $this->log->debug(sprintf('Finding child at %d', $this->child));

        $output = null;
        $return_code = null;

        // Check if pid is running
        $executed =
            exec(
                sprintf('ps -o pid,state -p %d', $this->child),
                $output,
                $return_code
            );

        if ($executed && $return_code !== 1) {
            $this->log->debug(sprintf('Killing child at %d', $this->child));
            posix_kill((int)$this->child, SIGKILL);
            $this->child = null;

            return;
        }

        $this->log->error(
            sprintf('Child %d not found, restarting', $this->child)
        );
        $this->shutdown();
    }

    /**
     * workingOn
     *
     * @return string
     */
    abstract protected function getWorkerIdentifier(): string;

    /**
     * updateProcLine
     *
     * @param string $status
     */
    protected function updateProcLine(string $status): void
    {
        if (function_exists('cli_set_process_title') && posix_getuid() === 0) {
            cli_set_process_title(
                sprintf('qjitsu-%s: %s', $this->getWorkerType(), $status)
            );
        }
    }

    /**
     * getWorkerType
     *
     * @return string
     */
    protected function getWorkerType(): string
    {
        return 'worker';
    }

    /**
     * startup
     */
    protected function startup(): void
    {
        $this->log->info(sprintf('Starting worker %s', $this->id));

        $this->registerSignalHandlers();
        $this->pruneDeadWorkers();

        $this->getEventManager()->trigger('beforeFirstFork', $this);
        $this->manager->registerWorker($this->id);
    }

    /**
     * registerSignalHandlers
     *
     * @return bool
     */
    protected function registerSignalHandlers(): bool
    {
        if (!function_exists('pcntl_signal')) {
            $this->log->warning(
                'Signal Handling is not supported on this system'
            );

            return false;
        }

        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, [$this, 'shutdownNow']);
        pcntl_signal(SIGINT, [$this, 'shutdownNow']);
        pcntl_signal(SIGQUIT, [$this, 'shutdown']);
        pcntl_signal(SIGUSR1, [$this, 'killChild']);
        pcntl_signal(SIGUSR2, [$this, 'pauseProcessing']);
        pcntl_signal(SIGCONT, [$this, 'continueProcessing']);

        $this->log->debug('Registered Signals');

        return true;
    }

    /**
     * pruneDeadWorkers
     */
    protected function pruneDeadWorkers(): void
    {
        $this->manager->pruneDeadWorkers();
    }

    /**
     * sleep
     */
    protected function sleep(): void
    {
        $this->log->debug(sprintf('Sleeping for %d', $this->interval));

        $waitString = sprintf('Waiting for %s', $this->getWorkerIdentifier());
        $procline = $this->paused ? 'Paused' : $waitString;
        $this->updateProcLine($procline);

        usleep($this->interval * 1000000);
    }

    /**
     * loop
     */
    abstract protected function loop(): void;

    /**
     * finishedWorking
     */
    protected function finishedWorking(): void
    {
        $this->manager->finishedWorking($this);
    }

    /**
     * fork
     *
     * @throws \QueueJitsu\Exception\ForkFailureException
     *
     * @return bool|int
     */
    protected function fork()
    {
        if (!function_exists('pcntl_fork')) {
            return false;
        }

        $pid = pcntl_fork();

        if ($pid === -1) {
            /** @noinspection ExceptionsAnnotatingAndHandlingInspection */
            throw new ForkFailureException('Unable to for a child worker');
        }

        return $pid;
    }

    /**
     * setTask
     *
     * @param mixed $data
     */
    protected function setTask($data): void
    {
        $this->manager->setTask($this, $data);
    }
}