Queue/RedisQueue.php
<?php
namespace SfCod\QueueBundle\Queue;
use DateInterval;
use DateTime;
use Predis\Client;
use Predis\Collection\Iterator\HashKey;
use SfCod\QueueBundle\Base\JobResolverInterface;
use SfCod\QueueBundle\Base\RandomizeTrait;
use SfCod\QueueBundle\Entity\Job;
use SfCod\QueueBundle\Job\JobContract;
use SfCod\QueueBundle\Job\JobContractInterface;
use SfCod\QueueBundle\Service\RedisDriver;
/**
* Class RedisQueue
*
* @author Virchenko Maksim <muslim1992@gmail.com>
*
* @package SfCod\QueueBundle\Queue
*/
class RedisQueue extends Queue
{
use RandomizeTrait;
/**
* Job resolver
*
* @var JobResolverInterface
*/
private $resolver;
/**
* @var RedisDriver
*/
private $redis;
/**
* The collection that holds the jobs.
*
* @var string
*/
private $collection;
/**
* The name of the default queue.
*
* @var string
*/
private $queue = 'default';
/**
* The expiration time of a job.
*
* @var int|null
*/
private $expire = 60;
/**
* @var int
*/
private $limit = 15;
/**
* Create a new redis queue instance.
*
* @param JobResolverInterface $resolver
* @param RedisDriver $redis
* @param string $collection
* @param string $queue
* @param int $expire
* @param int $limit
*/
public function __construct(
JobResolverInterface $resolver,
RedisDriver $redis,
string $collection = 'queue_jobs',
string $queue = 'default',
int $expire = 60,
int $limit = 15
) {
$this->resolver = $resolver;
$this->redis = $redis;
$this->collection = $collection;
$this->expire = $expire;
$this->queue = $queue;
$this->limit = $limit;
}
/**
* Get the size of the queue.
*
* @param string|null $queue
*
* @return int
*/
public function size(?string $queue = null): int
{
return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf');
}
/**
* Get redis client
*
* @return Client
*/
private function getClient(): Client
{
return $this->redis->getClient();
}
/**
* Build collection:queue:postfix key
*
* @param string|null $queue
* @param string|null $postfix
*
* @return string
*/
private function buildKey(?string $queue = 'default', ?string $postfix = null)
{
return "$this->collection:$queue" . ($postfix ? ":$postfix" : '');
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string|null $queue
*
* @return mixed
*
* @throws \Exception
*/
public function push(string $job, array $data = [], ?string $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string|null $queue
* @param array $options
*
* @return mixed
*
* @throws \Exception
*/
public function pushRaw(string $payload, ?string $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
}
/**
* Push job to database
*
* @param DateInterval|int $delay
* @param string|null $queue
* @param string $payload
* @param int $attempts
*
* @throws \Exception
*/
private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0)
{
$id = $this->getRandomId();
$pipeline = $this->getClient()->pipeline(['atomic' => true])
->hset(
$this->buildKey($queue, 'payload'),
$id,
$payload
)
->zadd($this->buildKey($queue), [
$id => $this->getAvailableAt($delay),
]);
if ($attempts > 0) {
$pipeline->zadd($this->buildKey($queue, 'attempted'), [
$id => $attempts,
]);
}
$pipeline->execute();
}
/**
* Get the "available at" UNIX timestamp.
*
* @param DateInterval|int $delay
*
* @return int
*/
private function getAvailableAt($delay = 0)
{
return $delay instanceof DateInterval
? (new DateTime())->add($delay)->getTimestamp()
: $this->currentTime() + $delay;
}
/**
* Push a new job onto the queue after a delay.
*
* @param DateInterval|int $delay
* @param string $job
* @param array $data
* @param string|null $queue
*
* @return mixed
*
* @throws \Exception
*/
public function later($delay, string $job, array $data = [], ?string $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
}
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
*
* @return JobContractInterface|null
*/
public function pop(?string $queue = null): ?JobContractInterface
{
$ids = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, $this->limit]]);
if (empty($ids)) {
return null;
}
foreach ($ids as $id) {
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
$isAvailable = null === $reservedAt;
$isReservedButExpired = false === ($reservedAt > ($this->currentTime() - $this->expire));
// Take first available or reserved but expired job
if ($isAvailable || $isReservedButExpired) {
return $this->getJobById($queue, $id);
}
}
return null;
}
/**
* Get job by its id
*
* @param string $queue
* @param string $id
*
* @return JobContractInterface|null
*/
public function getJobById(string $queue, string $id): ?JobContractInterface
{
$job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id);
if (!$job) {
return null;
} else {
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
$attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id);
return new JobContract(
$this->resolver,
$this,
$this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt)
);
}
}
/**
* Build job from database record
*
* @param string $id
* @param string $queue
* @param int $attempts
* @param array $payload
* @param int|null $reservedAt
*
* @return Job
*/
private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job
{
$job = new Job();
$job->setId($id);
$job->setAttempts($attempts);
$job->setQueue($queue);
$job->setReserved((bool)$reservedAt);
$job->setReservedAt($reservedAt);
$job->setPayload($payload);
return $job;
}
/**
* Check if job exists in the queue.
*
* @param string $job
* @param array $data
* @param string|null $queue
*
* @return bool
*/
public function exists(string $job, array $data = [], ?string $queue = null): bool
{
$cursor = new HashKey($this->getClient(), $this->buildKey($queue, 'payload'));
$payload = $this->createPayload($job, $data);
foreach ($cursor as $key => $value) {
if ($value === $payload) {
return true;
}
}
return false;
}
/**
* Check if can run process depend on limits
*
* @param JobContractInterface $job
*
* @return bool
*/
public function canRunJob(JobContractInterface $job): bool
{
return $this->getClient()->zcount(
$this->buildKey($job->getQueue(), 'reserved'),
'-inf',
'+inf'
) < $this->limit || $job->reserved();
}
/**
* Mark the given job ID as reserved.
*
* @param JobContractInterface $job
*
* @throws \Exception
*/
public function markJobAsReserved(JobContractInterface $job)
{
$this->getClient()->pipeline(['atomic' => true])
->zadd($this->buildKey($job->getQueue(), 'reserved'), [
$job->getJobId() => $this->currentTime(),
])
->zincrby($this->buildKey($job->getQueue(), 'attempted'), 1, $job->getJobId())
->execute();
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
*
* @return bool
*
* @throws \Exception
*/
public function deleteReserved(string $queue, string $id): bool
{
$this->getClient()->pipeline(['atomic' => true])
->hdel($this->buildKey($queue, 'payload'), [$id])
->zrem($this->buildKey($queue, 'reserved'), $id)
->zrem($this->buildKey($queue, 'attempted'), $id)
->zrem($this->buildKey($queue), $id)
->execute();
return true;
}
/**
* Release a reserved job back onto the queue.
*
* @param JobContractInterface $job
* @param DateInterval|int $delay
*
* @return mixed
*
* @throws \Exception
*/
public function release(JobContractInterface $job, $delay)
{
return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts());
}
}