sfcod/jobqueue

View on GitHub
Worker/Worker.php

Summary

Maintainability
A
35 mins
Test Coverage
<?php

namespace SfCod\QueueBundle\Worker;

use Exception;
use SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
use SfCod\QueueBundle\Event\JobFailedEvent;
use SfCod\QueueBundle\Event\JobProcessedEvent;
use SfCod\QueueBundle\Event\JobProcessingEvent;
use SfCod\QueueBundle\Event\WorkerStoppingEvent;
use SfCod\QueueBundle\Exception\FatalThrowableException;
use SfCod\QueueBundle\Exception\MaxAttemptsExceededException;
use SfCod\QueueBundle\Failer\FailedJobProviderInterface;
use SfCod\QueueBundle\Handler\ExceptionHandlerInterface;
use SfCod\QueueBundle\Job\JobContractInterface;
use SfCod\QueueBundle\Queue\QueueInterface;
use SfCod\QueueBundle\Service\JobProcess;
use SfCod\QueueBundle\Service\QueueManager;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Process\Process;
use Throwable;

/**
 * Thread worker for job queues
 *
 * @author Virchenko Maksim <muslim1992@gmail.com>
 */
class Worker
{
    /**
     * Events
     */
    const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job';
    const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job';
    const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job';
    const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job';
    const EVENT_STOP = 'job_queue_worker.stop';

    /**
     * QueueManager instance
     *
     * @var QueueManager
     */
    private $queueManager;

    /**
     * Logger instance
     *
     * @var ExceptionHandlerInterface
     */
    private $exceptions;

    /**
     * Failer instance
     *
     * @var FailedJobProviderInterface
     */
    private $failer;

    /**
     * @var EventDispatcherInterface
     */
    private $dispatcher;

    /**
     * @var JobProcess
     */
    private $process;

    /**
     * Worker constructor.
     *
     * @param QueueManager $queueManager
     * @param JobProcess $process
     * @param FailedJobProviderInterface $failer
     * @param ExceptionHandlerInterface $exceptions
     * @param EventDispatcherInterface $dispatcher
     */
    public function __construct(QueueManager $queueManager,
                                JobProcess $process,
                                FailedJobProviderInterface $failer,
                                ExceptionHandlerInterface $exceptions,
                                EventDispatcherInterface $dispatcher)
    {
        $this->queueManager = $queueManager;
        $this->process = $process;
        $this->failer = $failer;
        $this->exceptions = $exceptions;
        $this->dispatcher = $dispatcher;
    }

    /**
     * Listen to the given queue in a loop.
     *
     * @param string $connectionName
     * @param string $queue
     * @param Options $options
     */
    public function daemon(string $connectionName, string $queue, Options $options)
    {
        while (true) {
            if (false === $this->runNextJob($connectionName, $queue, $options)) {
                $this->sleep($options->sleep);
            }

            if ($this->memoryExceeded($options->memory)) {
                $this->stop();
            }
        }
    }

    /**
     * Process the next job on the queue.
     *
     * @param string $connectionName
     * @param string $queue
     * @param Options $options
     *
     * @return bool
     */
    public function runNextJob(string $connectionName, string $queue, Options $options)
    {
        $connection = $this->queueManager->connection($connectionName);
        $job = $this->getNextJob($connection, $queue);

        // If we're able to pull a job off of the stack, we will process it and then return
        // from this method. If there is no job on the queue, we will "sleep" the worker
        // for the specified number of seconds, then keep processing jobs after sleep.
        if ($job instanceof JobContractInterface && $connection->canRunJob($job)) {
            $connection->markJobAsReserved($job);
            $this->runInBackground($job, $options);

            return true;
        }

        return false;
    }

    /**
     * Process the next job on the queue.
     *
     * @param string $connectionName
     * @param string $queue
     * @param string $id
     * @param Options $options
     */
    public function runJobById(string $connectionName, string $queue, string $id, Options $options)
    {
        try {
            $connection = $this->queueManager->connection($connectionName);
            $job = $connection->getJobById($queue, $id);

            // If we're able to pull a job off of the stack, we will process it and then return
            // from this method. If there is no job on the queue, we will "sleep" the worker
            // for the specified number of seconds, then keep processing jobs after sleep.
            if ($job instanceof JobContractInterface) {
                if (false === $job->reserved()) {
                    $connection->markJobAsReserved($job);
                }

                $this->process($connectionName, $job, $options);

                return;
            }
        } catch (Exception $e) {
            $this->exceptions->report($e);
        } catch (Throwable $e) {
            $this->exceptions->report(new FatalThrowableException($e));
        }

        $this->sleep($options->sleep);
    }

    /**
     * Make a Process for the Artisan command for the job id.
     *
     * @param JobContractInterface $job
     * @param Options $options
     */
    public function runInBackground(JobContractInterface $job, Options $options)
    {
        $process = $this->process->getProcess($job, $options);

        $process->run();
    }

    /** Process the given job from the queue.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param Options $options
     *
     * @return void
     *
     * @throws \Throwable
     */
    public function process(string $connectionName, JobContractInterface $job, Options $options)
    {
        try {
            // First we will raise the before job event and determine if the job has already ran
            // over the its maximum attempt limit, which could primarily happen if the job is
            // continually timing out and not actually throwing any exceptions from itself.
            $this->raiseBeforeJobEvent($connectionName, $job);

            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int)$options->maxTries
            );

            // Here we will fire off the job and let it process. We will catch any exceptions so
            // they can be reported to the developers logs, etc. Once the job is finished the
            // proper events will be fired to let any listeners know this job has finished.
            $job->fire();

            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Exception $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        } catch (Throwable $e) {
            $this->handleJobException(
                $connectionName, $job, $options, new FatalThrowableException($e)
            );
        }
    }

    /**
     * Sleep the script for a given number of seconds.
     *
     * @param int $seconds
     *
     * @return void
     */
    public function sleep(int $seconds)
    {
        sleep($seconds);
    }

    /**
     * Determine if the memory limit has been exceeded.
     *
     * @param int $memoryLimit
     *
     * @return bool
     */
    public function memoryExceeded(int $memoryLimit)
    {
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
    }

    /**
     * Stop listening and bail out of the script.
     */
    public function stop()
    {
        $this->dispatcher->dispatch(new WorkerStoppingEvent(), self::EVENT_STOP);

        exit(0);
    }

    /**
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
     *
     * This will likely be because the job previously exceeded a timeout.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param int $maxTries
     *
     * @return void
     */
    protected function markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries)
    {
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

        $timeoutAt = $job->timeoutAt();

        if ($timeoutAt && time() <= $timeoutAt) {
            return;
        }

        if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
            return;
        }

        $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
            'A queued job has been attempted too many times or run too long. The job may have previously timed out.'
        ));

        throw $e;
    }

    /**
     * Mark the given job as failed and raise the relevant event.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param Exception $e
     */
    protected function failJob(string $connectionName, JobContractInterface $job, Exception $e)
    {
        if ($job->isDeleted()) {
            return;
        }

        try {
            // If the job has failed, we will delete it, call the "failed" method and then call
            // an event indicating the job has failed so it can be logged if needed. This is
            // to allow every developer to better keep monitor of their failed queue jobs.
            $job->delete();

            $job->failed($e);
        } finally {
            $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e);
            $this->raiseFailedJobEvent($connectionName, $job, $e);
        }
    }

    /**
     * Handle an exception that occurred while the job was running.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param Options $options
     * @param Exception $e
     *
     * @return void
     *
     * @throws Exception
     */
    protected function handleJobException(string $connectionName, JobContractInterface $job, Options $options, Exception $e)
    {
        try {
            // First, we will go ahead and mark the job as failed if it will exceed the maximum
            // attempts it is allowed to run the next time we process it. If so we will just
            // go ahead and mark it as failed now so we do not have to release this again.
            if (!$job->hasFailed()) {
                $this->markJobAsFailedIfWillExceedMaxAttempts(
                    $connectionName, $job, (int)$options->maxTries, $e
                );
            }

            $this->raiseExceptionOccurredJobEvent(
                $connectionName, $job, $e
            );
        } finally {
            // If we catch an exception, we will attempt to release the job back onto the queue
            // so it is not lost entirely. This'll let the job be retried at a later time by
            // another listener (or this same one). We will re-throw this exception after.
            if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
                $job->release($options->delay);
            }
        }

        throw $e;
    }

    /**
     * Mark the given job as failed if it has exceeded the maximum allowed attempts.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param int $maxTries
     * @param Exception $e
     *
     * @return void
     */
    protected function markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries, Exception $e)
    {
        $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

        if ($job->timeoutAt() && $job->timeoutAt() <= time()) {
            $this->failJob($connectionName, $job, $e);
        }

        if ($maxTries > 0 && $job->attempts() >= $maxTries) {
            $this->failJob($connectionName, $job, $e);
        }
    }

    /**
     * Get the next job from the queue connection.
     *
     * @param QueueInterface $connection
     * @param string $queue
     *
     * @return JobContractInterface|null
     */
    protected function getNextJob(QueueInterface $connection, string $queue): ?JobContractInterface
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (!is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }
        } catch (Exception $e) {
            $this->exceptions->report($e);
        } catch (Throwable $e) {
            $this->exceptions->report($e = new FatalThrowableException($e));
        }

        return null;
    }

    /**
     * Raise the before queue job event.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     */
    protected function raiseBeforeJobEvent(string $connectionName, JobContractInterface $job)
    {
        $this->dispatcher->dispatch(new JobProcessingEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB);
    }

    /**
     * Raise the after queue job event.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     */
    protected function raiseAfterJobEvent(string $connectionName, JobContractInterface $job)
    {
        $this->dispatcher->dispatch(new JobProcessedEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB);
    }

    /**
     * Raise the exception occurred queue job event.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param Exception $e
     */
    protected function raiseExceptionOccurredJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
    {
        $this->dispatcher->dispatch(new JobExceptionOccurredEvent($connectionName, $job, $e), self::EVENT_RAISE_EXCEPTION_OCCURED_JOB);
    }

    /**
     * Raise the failed queue job event.
     *
     * @param string $connectionName
     * @param JobContractInterface $job
     * @param Exception $e
     */
    protected function raiseFailedJobEvent(string $connectionName, JobContractInterface $job, Exception $e)
    {
        $this->dispatcher->dispatch(new JobFailedEvent($connectionName, $job, $e), self::EVENT_RAISE_FAILED_JOB);
    }
}