Failer/MongoFailedJobProvider.php
<?php
namespace SfCod\QueueBundle\Failer;
use Exception;
use MongoDB\Collection;
use MongoDB\DeleteResult;
use SfCod\QueueBundle\Entity\Job;
use SfCod\QueueBundle\Service\MongoDriver;
/**
* Mongo provider for failed jobs
*
* @author Virchenko Maksim <muslim1992@gmail.com>
*/
class MongoFailedJobProvider implements FailedJobProviderInterface
{
/**
* The database connection name.
*
* @var MongoDriver
*/
protected $mongo;
/**
* The database collection.
*
* @var string
*/
protected $collection;
/**
* Create a new database failed job provider.
*
* @param MongoDriver $mongo
* @param string $collection
*/
public function __construct(MongoDriver $mongo, string $collection = 'queue_jobs_failed')
{
$this->mongo = $mongo;
$this->collection = $collection;
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param string $queue
* @param string $payload
* @param Exception $exception
*
* @return int|void|null
*/
public function log(string $connection, string $queue, string $payload, Exception $exception)
{
$this->getCollection()->insertOne([
'connection' => $connection,
'queue' => $queue,
'payload' => $payload,
'exception' => $exception->getMessage(),
'failed_at' => time(),
]);
}
/**
* Get a list of all of the failed jobs.
*
* @return array
*/
public function all(): array
{
$result = [];
$jobs = $this->getCollection()->find([], [
'sort' => ['_id' => -1],
]);
foreach ($jobs as $job) {
$result[] = $this->buildJob($job);
}
return $result;
}
/**
* Get a single failed job.
*
* @param string $id
*
* @return Job
*/
public function find(string $id)
{
$data = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
return $this->buildJob($data);
}
/**
* Delete a single failed job from storage.
*
* @param string $id
*
* @return bool
*/
public function forget(string $id)
{
$result = $this->getCollection()->deleteOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
if ($result instanceof DeleteResult) {
return (bool)$result->getDeletedCount();
}
return true;
}
/**
* Flush all of the failed jobs from storage.
*/
public function flush()
{
$this->getCollection()->drop();
}
/**
* Get a new query builder instance for the collection.
*
* @return Collection mongo collection
*/
protected function getCollection(): Collection
{
return $this->mongo->getDatabase()->selectCollection($this->collection);
}
/**
* Build job from database data
*
* @param $data
*
* @return Job
*/
protected function buildJob($data): Job
{
$job = new Job();
$job->setId($data->_id);
$job->setQueue($data->queue);
$job->setAttempts(isset($data->attempts) ? $data->attempts : 0);
$job->setReserved(isset($data->reserved) ? $data->reserved : false);
$job->setReservedAt(isset($data->reserved_at) ? $data->reserved_at : null);
$job->setPayload(json_decode($data->payload, true));
return $job;
}
}