lightster/hodor

View on GitHub
src/Hodor/JobQueue/Superqueue.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php

namespace Hodor\JobQueue;

use Hodor\Database\Adapter\FactoryInterface;
use Hodor\Database\Adapter\SuperqueuerInterface;

class Superqueue
{
    /**
     * @var FactoryInterface
     */
    private $database;

    /**
     * @var WorkerQueueFactory
     */
    private $worker_queue_factory;

    /**
     * @var int
     */
    private $jobs_queued = 0;

    /**
     * @param SuperqueuerInterface $database
     * @param WorkerQueueFactory $worker_queue_factory
     */
    public function __construct(SuperqueuerInterface $database, WorkerQueueFactory $worker_queue_factory)
    {
        $this->database = $database;
        $this->worker_queue_factory = $worker_queue_factory;
    }

    /**
     * @return bool
     */
    public function requestProcessLock()
    {
        return $this->database->requestAdvisoryLock('superqueuer', 'default');
    }

    /**
     * @return int
     */
    public function queueJobsFromDatabaseToWorkerQueue()
    {
        $job_generator = $this->database->getJobsToRunGenerator();
        $total_jobs_queued = 0;
        foreach ($job_generator as $job) {
            $this->batchJob($job);
            ++$total_jobs_queued;
        }

        $this->publishBatch();

        return $total_jobs_queued;
    }

    /**
     * @param array $job
     */
    private function batchJob(array $job)
    {
        $db = $this->database;

        if (0 === $this->jobs_queued) {
            $this->worker_queue_factory->beginBatch();
            $db->beginBatch();
        }

        $meta = $db->markJobAsQueued($job);

        $queue = $this->worker_queue_factory->getQueue($job['queue_name']);
        $queue->push($job['job_name'], $job['job_params'], $meta);

        ++$this->jobs_queued;

        $this->publishBatchIfNecessary();
    }

    private function publishBatchIfNecessary()
    {
        if ($this->jobs_queued >= $this->getBatchSize()) {
            $this->publishBatch();
        }
    }

    private function publishBatch()
    {
        if ($this->jobs_queued <= 0) {
            return;
        }

        // the database transaction needs to be committed before the
        // message is pushed to Rabbit MQ to prevent jobs from being
        // processed by workers before they have been moved to buffered_jobs
        $this->database->publishBatch();
        $this->worker_queue_factory->publishBatch();

        $this->jobs_queued = 0;
    }

    /**
     * @return int
     */
    private function getBatchSize()
    {
        return 250;
    }
}