barracudanetworks/jobrunner

View on GitHub
src/JobRunner.php

Summary

Maintainability
B
6 hrs
Test Coverage
<?php

namespace Barracuda\JobRunner;

use ReflectionClass;
use Exception;
use InvalidArgumentException;

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class JobRunner
{
    /**
     * @var array
     */
    private $jobs = array();

    /**
     * @var LoggerInterface
     */
    private $logger;

    /**
     * @var \fork_daemon
     */
    private $fork_daemon;

    /**
     * @param \fork_daemon    $fork_daemon Instance of ForkDaemon.
     * @param LoggerInterface $logger      Optionally, a logger.
     */
    public function __construct(
        \fork_daemon $fork_daemon = null,
        LoggerInterface $logger = null
    )
    {
        $this->logger = is_null($logger) ? new NullLogger() : $logger;
        $this->fork_daemon = is_null($fork_daemon) ? new \fork_daemon() : $fork_daemon;
    }

    /**
     * Adds a job to the JobRunner instance.
     *
     * @param JobDefinition  $definition Job definition (e.g. interval).
     * @throws InvalidArgumentException If the given class does not subclass Job.
     * @return void
     */
    public function addJob(JobDefinition $definition)
    {
        $class = $definition->getClassName();
        $reflection = new ReflectionClass($class);
        if (!$reflection->isSubclassOf(Job::class))
        {
            $this->logger->error("{$reflection->getShortName()} does not subclass " . Job::class);

            throw new InvalidArgumentException("{$reflection->getShortName()} must subclass " . Job::class);
        }

        if (isset($this->jobs[$class]))
        {
            $this->logger->warning("{$reflection->getShortName()} is already registered, skipping");
            return;
        }

        // If we have a run_time and interval set, we will ignore the interval when checking if job can run.
        if (!is_null($definition->getInterval()) && !is_null($definition->getRunTime()))
        {
            $definition->setInterval(null);
            $this->logger->warning("Both run_time and interval are set for {$reflection->getShortName()} — " .
                "prioritizing run_time");
        }

        // Set internal definitions
        $definition->setLastRunTimeStart(null);
        $definition->setLastRunTimeFinish(null);
        $definition->setReflection($reflection);

        // Add to job list, using defaults where necessary
        $this->jobs[$class] = $definition;
        $this->createJobBuckets($class);

        $this->logger->info("Registered job {$reflection->getShortName()} -- " .
            ($definition->getEnabled() ? "enabled" : "disabled"));
    }

    /**
     * This is the main function that will run jobs. It should be called in the
     * main event loop.
     *
     * @return void
     */
    public function run()
    {
        $this->logger->debug("Looking for jobs to run");

        foreach ($this->jobs as $class => $definition)
        {
            // Check if it's time to run the job
            if ($this->canJobRun($class))
            {
                $this->queueJob($class);
            }
        }

        $this->logger->debug("No more jobs to run");
    }

    /**
     * Adds a job to the fork_daemon work list so we'll start it.
     *
     * @param string $class Job class to start.
     * @return void
     */
    protected function queueJob($class)
    {
        $this->logger->info("Adding job {$this->jobs[$class]->getReflection()->getShortName()} to work list");

        // Update runtime now, so that subsequent calls to run()
        // dont kick the job off multiple times
        $this->jobs[$class]->setLastRunTimeStart(time());

        $this->fork_daemon->addwork(array($class), $class, $class);
        $this->fork_daemon->process_work(false, $class);

    }

    /**
     * Called by fork_daemon when there is work to be processed.
     *
     * @param array $work Fork daemon can only add work as an array, so this
     *                    should have 1 item in it - the class name of a
     *                    registered Job.
     * @return void
     */
    public function processWork(array $work)
    {
        // There should only be one element, so we'll only operate with the first
        $class = array_pop($work);
        if (!isset($this->jobs[$class]))
        {
            $this->logger->warning("Unknown work unit in fork_daemon, is something else adding work?", $work);
            return;
        }

        $this->logger->info("Running job {$this->jobs[$class]->getReflection()->getShortName()}");

        try
        {
            // Try to run the job
            $job = $this->instantiateJob($class);
            if ($job instanceof Job)
            {
                // Pass relevant info to the job from the parent before calling start
                $job->setLastFinishRunTime($this->jobs[$class]->getLastRunTimeFinish());
                $job->setLastStartRunTime($this->jobs[$class]->getLastRunTimeStart());

                $job->start();
            }
        }
        // Catching the very general Exception here so that we might also catch
        // exceptions in the job's code.
        catch (Exception $e)
        {
            $this->logger->error("Exception while trying to run {$this->jobs[$class]->getReflection()->getShortName()}: " .
                $e->getMessage());

            return;
        }
    }

    /**
     * Instantiates a job class (if it's indeed a job).
     *
     * @param string $class The fully qualified path to the class.
     * @return object Instantiated object.
     */
    protected function instantiateJob($class)
    {
        // Make sure this is a real job
        $reflection = $this->getJob($class)->getReflection();

        // Create a new instance
        $job = $reflection->newInstance($this->logger);
        if ($job instanceof ForkingJob)
        {
            // If it's a ForkingJob, give it its own fork_daemon, using the same
            // class that JobRunner is using, and register the `prepareToFork()`
            // method to be called before ForkingJob forks children.
            $fork_daemon = (new ReflectionClass($this->fork_daemon))->newInstance();
            $fork_daemon->register_parent_prefork(array(
                [$job, 'prepareToFork'],
            ));

            // Setup the ForkingJob with the new fork_daemon instance
            $job->setUpForking($fork_daemon);
        }

        return $job;
    }

    /**
     * This function creates a bucket for each job in fork daemon so it is
     * easier to manage if it should run or not.
     *
     * @param string $class Job to create buckets for.
     * @return void
     */
    private function createJobBuckets($class)
    {
        $job = $this->getJob($class);

        $this->fork_daemon->add_bucket($class);
        $this->fork_daemon->max_children_set(1, $class);
        $this->fork_daemon->register_child_run(array($this, 'processWork'), $class);
        $this->fork_daemon->register_parent_child_exit(array($this, 'parentChildExit'), $class);
        $this->fork_daemon->child_max_run_time_set($job->getMaxRunTime(), $class);
    }

    /**
     * Returns true if a job can run, false otherwise.
     *
     * @param string         $class          The job to check.
     * @return bool
     */
    protected function canJobRun($class)
    {
        $job_definition = $this->jobs[$class];
        if ($job_definition->getEnabled() == false)
        {
            return false;
        }

        // If this job is already running, don't start it again
        if (count($this->fork_daemon->work_running($class)) != 0)
        {
            return false;
        }

        $last_run_time = $job_definition->getLastRunTimeStart();

        // If the job is supposed to run at a scheduled time
        if (!is_null($job_definition->getRunTime()))
        {
            $now = new \DateTime();

            // Check if the run time is now
            if ($job_definition->getRunTime() == $now->format('H:i'))
            {
                // If we haven't run before, we can run
                if (is_null($last_run_time))
                {
                    return true;
                }
                else
                {
                    $last_run = new \DateTime();
                    $last_run->setTimestamp($last_run_time);

                    $difference = $last_run->diff($now);

                    // If the last run wasn't this minute, we can run
                    if ($difference->days != 0 || $difference->h != 0 || $difference->i != 0)
                    {
                        return true;
                    }
                }
            }
            return false;
        }

        // If the job runs on an interval, check if it's ready to run
        if (!is_null($job_definition->getInterval()))
        {
            // If it hasn't run yet, run it!
            if (is_null($last_run_time))
            {
                return true;
            }

            if ((time() - $last_run_time) > $job_definition->getInterval())
            {
                return true;
            }
        }

        // No run condition hit
        return false;
    }

    /**
     * Update the last run time of the job after it is finished.
     *
     * @param int $pid The pid of the child exiting.
     * @return void
     */
    public function parentChildExit($pid)
    {
        // Bucket should be named after the job class
        $class = $this->fork_daemon->getForkedChildren()[$pid]['bucket'];

        $this->jobFinished($class);
    }

    /**
     * Called whenever a job exits (according to fork_daemon).
     *
     * @param string        $class          The job that finished.
     * @return void
     */
    protected function jobFinished($class)
    {
        $this->jobs[$class]->setLastRunTimeFinish(time());
        $this->logger->info("Job {$this->jobs[$class]->getReflection()->getShortName()} finished");
    }

    /**
     * Returns all jobs.
     * @return array
     */
    public function getJobs()
    {
        return $this->jobs;
    }

    /**
     * Returns a given job's definition.
     *
     * @param string $class The job class to lookup.
     * @throws InvalidArgumentException If the job isn't registered.
     * @return array
     */
    public function getJob($class)
    {
        if (!isset($this->jobs[$class]))
        {
            throw new InvalidArgumentException("{$class} is not a registered job");
        }

        return $this->jobs[$class];
    }

    /**
     * @return LoggerInterface
     */
    public function getLogger()
    {
        return $this->logger;
    }

    /**
     * @return \fork_daemon
     */
    public function getForkDaemon()
    {
        return $this->fork_daemon;
    }
}