yiicod/yii2-jobqueue

View on GitHub
JobQueue.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace yiicod\jobqueue;

use Illuminate\Queue\Capsule\Manager;
use Illuminate\Queue\Connectors\ConnectorInterface;
use yii\base\BootstrapInterface;
use yii\base\Component;
use yiicod\jobqueue\connectors\MongoThreadConnector;

/**
 * Yii component for laravel 5 queues to work with mongodb
 *
 * @author Orlov Alexey <aaorlov88@gmail.com>
 * @author Virchenko Maksim <muslim1992@gmail.com>
 */
class JobQueue extends Component implements BootstrapInterface
{
    /**
     * Available connections
     *
     * @var array
     */
    public $connections = [
        'thread' => [
            'driver' => 'mongo-thread',
            'table' => 'yii_jobs_thread',
            'queue' => 'default',
            'expire' => 60,
            'limit' => 15, // Or 1
            'connection' => 'mongodb',
        ],
    ];

    /**
     * Encryption key
     *
     * @var string
     */
    public $privateKey = 'rc5lgpue80sr17nx';

    /**
     * Manager instance
     *
     * @var Manager
     */
    private $manager = null;

    /**
     * Initialize
     *
     * @param \yii\base\Application $app
     */
    public function bootstrap($app)
    {
        $this->connect();
    }

    /**
     * Connect queue manager for mongo database
     *
     * @return Manager
     */
    protected function connect()
    {
        /* @var  Manager manager */
        $this->manager = new Manager();

        $this->addConnector('mongo-thread', new MongoThreadConnector());

        foreach ($this->connections as $name => $params) {
            $this->manager->addConnection($params, $name);
        }

        //Set as global to access
        $this->manager->setAsGlobal();
    }

    /**
     * Add connector
     *
     * @param string $name
     * @param Closure $resolver
     */
    public function addConnector(string $name, ConnectorInterface $connector)
    {
        $this->manager->addConnector($name, function () use ($connector) {
            return $connector;
        });
    }

    /**
     * Get queue manager instance
     *
     * @return mixed
     */
    public function getQueueManager()
    {
        return $this->manager->getQueueManager();
    }

    /**
     * Push new job to queue
     *
     * @param mixed $job
     * @param array $data
     * @param string $queue
     * @param string $connection
     *
     * @return mixed
     */
    public static function push($job, $data = [], $queue = 'default', $connection = 'default')
    {
        return Manager::push($job, $data, $queue, $connection);
    }

    /**
     * Push new job to queue if this job is not exist
     *
     * @param mixed $job
     * @param array $data
     * @param string $queue
     * @param string $connection
     *
     * @return mixed
     */
    public static function pushUnique($job, $data = [], $queue = 'default', $connection = 'default')
    {
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
            return Manager::push($job, $data, $queue, $connection);
        }

        return null;
    }

    /**
     * Push a new an array of jobs onto the queue.
     *
     * @param  array $jobs
     * @param  mixed $data
     * @param  string $queue
     * @param  string $connection
     *
     * @return mixed
     */
    public static function bulk($jobs, $data = '', $queue = null, $connection = null)
    {
        return Manager::bulk($jobs, $data, $queue, $connection);
    }

    /**
     * Push a new job onto the queue after a delay.
     *
     * @param  \DateTime|int $delay
     * @param  string $job
     * @param  mixed $data
     * @param  string $queue
     * @param  string $connection
     *
     * @return mixed
     */
    public static function later($delay, $job, $data = '', $queue = null, $connection = null)
    {
        return Manager::later($delay, $job, $data, $queue, $connection);
    }

    /**
     * Push a new job into the queue after a delay if job does not exist.
     *
     * @param  \DateTime|int $delay
     * @param  string $job
     * @param  mixed $data
     * @param  string $queue
     * @param  string $connection
     *
     * @return mixed
     */
    public static function laterUnique($delay, $job, $data = '', $queue = null, $connection = null)
    {
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
            return Manager::later($delay, $job, $data, $queue, $connection);
        }

        return null;
    }
}