sfcod/jobqueue

View on GitHub
Failer/MongoFailedJobProvider.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php

namespace SfCod\QueueBundle\Failer;

use Exception;
use MongoDB\Collection;
use MongoDB\DeleteResult;
use SfCod\QueueBundle\Entity\Job;
use SfCod\QueueBundle\Service\MongoDriver;

/**
 * Mongo provider for failed jobs
 *
 * @author Virchenko Maksim <muslim1992@gmail.com>
 */
class MongoFailedJobProvider implements FailedJobProviderInterface
{
    /**
     * The database connection name.
     *
     * @var MongoDriver
     */
    protected $mongo;

    /**
     * The database collection.
     *
     * @var string
     */
    protected $collection;

    /**
     * Create a new database failed job provider.
     *
     * @param MongoDriver $mongo
     * @param string $collection
     */
    public function __construct(MongoDriver $mongo, string $collection = 'queue_jobs_failed')
    {
        $this->mongo = $mongo;
        $this->collection = $collection;
    }

    /**
     * Log a failed job into storage.
     *
     * @param string $connection
     * @param string $queue
     * @param string $payload
     * @param Exception $exception
     *
     * @return int|void|null
     */
    public function log(string $connection, string $queue, string $payload, Exception $exception)
    {
        $this->getCollection()->insertOne([
            'connection' => $connection,
            'queue' => $queue,
            'payload' => $payload,
            'exception' => $exception->getMessage(),
            'failed_at' => time(),
        ]);
    }

    /**
     * Get a list of all of the failed jobs.
     *
     * @return array
     */
    public function all(): array
    {
        $result = [];
        $jobs = $this->getCollection()->find([], [
            'sort' => ['_id' => -1],
        ]);

        foreach ($jobs as $job) {
            $result[] = $this->buildJob($job);
        }

        return $result;
    }

    /**
     * Get a single failed job.
     *
     * @param string $id
     *
     * @return Job
     */
    public function find(string $id)
    {
        $data = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);

        return $this->buildJob($data);
    }

    /**
     * Delete a single failed job from storage.
     *
     * @param string $id
     *
     * @return bool
     */
    public function forget(string $id)
    {
        $result = $this->getCollection()->deleteOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);

        if ($result instanceof DeleteResult) {
            return (bool)$result->getDeletedCount();
        }

        return true;
    }

    /**
     * Flush all of the failed jobs from storage.
     */
    public function flush()
    {
        $this->getCollection()->drop();
    }

    /**
     * Get a new query builder instance for the collection.
     *
     * @return Collection mongo collection
     */
    protected function getCollection(): Collection
    {
        return $this->mongo->getDatabase()->selectCollection($this->collection);
    }

    /**
     * Build job from database data
     *
     * @param $data
     *
     * @return Job
     */
    protected function buildJob($data): Job
    {
        $job = new Job();
        $job->setId($data->_id);
        $job->setQueue($data->queue);
        $job->setAttempts(isset($data->attempts) ? $data->attempts : 0);
        $job->setReserved(isset($data->reserved) ? $data->reserved : false);
        $job->setReservedAt(isset($data->reserved_at) ? $data->reserved_at : null);
        $job->setPayload(json_decode($data->payload, true));

        return $job;
    }
}