Mezzle/queuejitsu-scheduler

View on GitHub
src/Adapter/RedisAdapter.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php
/*
 * Copyright (c) 2017 - 2020 Martin Meredith
 * Copyright (c) 2017 Stickee Technology Limited
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

declare(strict_types=1);

namespace QueueJitsu\Scheduler\Adapter;

use Cron\CronExpression;
use Predis\Client;
use QueueJitsu\Job\Job;
use Ramsey\Uuid\Uuid;

/**
 * Class RedisAdapter
 *
 * @package QueueJitsu\Scheduler\Adapter
 */
class RedisAdapter implements AdapterInterface
{
    const AT_QUEUE_NAME = '_schdlr_at_';
    const CRON_QUEUE_NAME = '_schdlr_cron_';

    /**
     * @var \Predis\Client $client
     */
    private $client;

    /**
     * RedisAdapter constructor.
     *
     * @param \Predis\Client $client
     */
    public function __construct(Client $client)
    {
        $this->client = $client;
    }

    /**
     * enqueueAt
     *
     * @param int $at
     * @param Job $job
     */
    public function enqueueAt(int $at, Job $job): void
    {
        $data = $job->getPayload();

        $data['s_time'] = time();

        $key = sprintf('%s:%s', self::AT_QUEUE_NAME, $at);
        $this->client->rpush($key, [json_encode($data)]);
        $this->client->zadd(self::AT_QUEUE_NAME, [$at => $at]);
    }

    /**
     * enqueueCron
     *
     * @SuppressWarnings(PHPMD.StaticAccess)
     *
     * @param string $cron
     * @param Job $job
     *
     * @throws \RuntimeException
     */
    public function enqueueCron(string $cron, Job $job): void
    {
        $data = ['cron' => $cron, 'job' => $job->getPayload()];

        $id = Uuid::uuid4()->toString();

        $key = sprintf('%s:%s', self::CRON_QUEUE_NAME, $id);

        $this->client->set($key, json_encode($data));

        $this->updateCron($id, $cron);
    }

    /**
     * getNextJob
     *
     * @throws \RuntimeException
     *
     * @return null|Job
     */
    public function getNextJob(): ?Job
    {
        if ($this->hasAtJobsToProcess() && $this->hasCronJobsToProcess()) {
            return $this->findNextJob();
        }

        if ($this->hasAtJobsToProcess()) {
            return $this->getNextAtJob();
        }

        return $this->getNextCronJob();
    }

    /**
     * getNextAtTimestamp
     *
     * @return int|null
     */
    protected function getNextAtTimestamp(): ?int
    {
        $at = time();

        $items =
            $this->client->zrangebyscore(
                self::AT_QUEUE_NAME,
                '-inf',
                $at,
                ['limit', 0, 1]
            );

        if (empty($items)) {
            return null;
        }

        return (int)$items[0];
    }

    /**
     * findNextJob
     *
     * @throws \RuntimeException
     *
     * @return null|Job
     */
    protected function findNextJob(): ?Job
    {
        $next_at_timestamp = $this->getNextAtTimestamp();
        $cron_id = $this->getNextCronId();

        if ($cron_id === false) {
            return $this->getNextAtJob();
        }

        $next_cron_timestamp = $this->getCronTimestamp($cron_id);

        if ($next_at_timestamp <= $next_cron_timestamp) {
            return $this->getNextAtJob();
        }

        return $this->getNextCronJob();
    }

    /**
     * getNextAtJob
     *
     * @return null|Job
     */
    protected function getNextAtJob(): ?Job
    {
        $next_timestamp = $this->getNextAtTimestamp();

        if (!is_null($next_timestamp)) {
            return $this->getNextJobAtTimestamp($next_timestamp);
        }

        return null;
    }

    /**
     * updateCron
     *
     * @SuppressWarnings(PHPMD.StaticAccess)
     *
     * @param string $id
     * @param string $cron
     *
     * @throws \RuntimeException
     */
    private function updateCron(string $id, string $cron): void
    {
        $cronExpression = CronExpression::factory($cron);
        $next_run = $cronExpression->getNextRunDate()->getTimestamp();

        $this->client->zadd(self::CRON_QUEUE_NAME, [$id => $next_run]);
    }

    /**
     * hasAtJobsToProcess
     *
     * @return bool
     */
    private function hasAtJobsToProcess(): bool
    {
        return !is_null($this->getNextAtTimestamp());
    }

    /**
     * hasCronJobsToProcess
     *
     * @return bool
     */
    private function hasCronJobsToProcess(): bool
    {
        return $this->getNextCronId() !== false;
    }

    /**
     * getNextCronId
     *
     * @return string|false
     */
    private function getNextCronId()
    {
        $at = time();

        $items =
            $this->client->zrangebyscore(
                self::CRON_QUEUE_NAME,
                '-inf',
                $at,
                ['limit', 0, 1]
            );

        if (empty($items)) {
            return false;
        }

        return $items[0];
    }

    /**
     * getNextJobAtTimestamp
     *
     * @param int $timestamp
     *
     * @return Job
     */
    private function getNextJobAtTimestamp(int $timestamp): Job
    {
        $key = sprintf('%s:%s', self::AT_QUEUE_NAME, $timestamp);

        $item = json_decode($this->client->lpop($key), true);

        $this->cleanupTimestamp($timestamp);

        return new Job(
            $item['class'],
            $item['queue'],
            $item['args'],
            $item['id']
        );
    }

    /**
     * cleanupTimestamp
     *
     * @param int $timestamp
     */
    private function cleanupTimestamp(int $timestamp): void
    {
        $key = sprintf('%s:%s', self::AT_QUEUE_NAME, $timestamp);

        if ($this->client->llen($key)) {
            $this->client->del([$key]);
            $this->client->zrem(self::AT_QUEUE_NAME, $timestamp);
        }
    }

    /**
     * getCronTimestamp
     *
     * @param string $cron_id
     *
     * @return int|null
     */
    private function getCronTimestamp(string $cron_id): ?int
    {
        $items = $this->client->zscore(self::CRON_QUEUE_NAME, $cron_id);

        if (empty($items)) {
            return null;
        }

        return (int)$items[0];
    }

    /**
     * getNextCronJob
     *
     * @SuppressWarnings(PHPMD.StaticAccess)
     *
     * @throws \RuntimeException
     *
     * @return null|Job
     */
    private function getNextCronJob(): ?Job
    {
        $cron_id = $this->getNextCronId();

        if ($cron_id === false) {
            return null;
        }

        $key = sprintf('%s:%s', self::CRON_QUEUE_NAME, $cron_id);

        $data = json_decode($this->client->get($key), true);

        $this->updateCron($cron_id, $data['cron']);

        $job = $data['job'];

        return new Job($job['class'], $job['queue'], $job['args']);
    }
}