lightster/hodor

View on GitHub
src/Hodor/Database/Adapter/Postgres/BufferWorker.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Hodor\Database\Adapter\Postgres;

use Hodor\Database\Adapter\BufferWorkerInterface;
use Lstr\YoPdo\YoPdo;

class BufferWorker implements BufferWorkerInterface
{
    /**
     * @var Connection
     */
    private $connection;

    /**
     * @param Connection $connection
     */
    public function __construct(Connection $connection)
    {
        $this->connection = $connection;
    }

    /**
     * @param string $queue_name
     * @param array $job
     */
    public function bufferJob($queue_name, array $job)
    {
        $table_name = 'buffered_jobs';
        $status_column = 'inserted';
        if (isset($job['options']['run_after'])) {
            $table_name = 'scheduled_jobs';
            $status_column = 'scheduled';
        }

        $row = [
            'queue_name'            => $queue_name,
            'job_name'              => $job['name'],
            'job_params'            => json_encode($job['params'], JSON_FORCE_OBJECT),
            'buffered_at'           => $job['meta']['buffered_at'],
            'buffered_from'         => $job['meta']['buffered_from'],
            "{$status_column}_from" => gethostname(),
        ];

        if (isset($job['options']['run_after'])) {
            $row['run_after'] = $job['options']['run_after'];
        }
        if (isset($job['options']['job_rank'])) {
            $row['job_rank'] = $job['options']['job_rank'];
        }
        if (isset($job['options']['mutex_id'])) {
            $row['mutex_id'] = $job['options']['mutex_id'];
        }

        $this->getYoPdo()->transaction()->begin('buffer-job');
        $this->getYoPdo()->insert($table_name, $row);
        $this->getYoPdo()->transaction()->accept('buffer-job');
    }

    /**
     * @return YoPdo
     */
    private function getYoPdo()
    {
        return $this->connection->getYoPdo();
    }
}