laravel/framework

View on GitHub
src/Illuminate/Bus/DynamoBatchRepository.php

Summary

Maintainability
D
2 days
Test Coverage
<?php

namespace Illuminate\Bus;

use Aws\DynamoDb\DynamoDbClient;
use Aws\DynamoDb\Marshaler;
use Carbon\CarbonImmutable;
use Closure;
use Illuminate\Support\Str;

class DynamoBatchRepository implements BatchRepository
{
    /**
     * The batch factory instance.
     *
     * @var \Illuminate\Bus\BatchFactory
     */
    protected $factory;

    /**
     * The database connection instance.
     *
     * @var \Aws\DynamoDb\DynamoDbClient
     */
    protected $dynamoDbClient;

    /**
     * The application name.
     *
     * @var string
     */
    protected $applicationName;

    /**
     * The table to use to store batch information.
     *
     * @var string
     */
    protected $table;

    /**
     * The time-to-live value for batch records.
     *
     * @var int
     */
    protected $ttl;

    /**
     * The name of the time-to-live attribute for batch records.
     *
     * @var string
     */
    protected $ttlAttribute;

    /**
     * The DynamoDB marshaler instance.
     *
     * @var \Aws\DynamoDb\Marshaler
     */
    protected $marshaler;

    /**
     * Create a new batch repository instance.
     */
    public function __construct(
        BatchFactory $factory,
        DynamoDbClient $dynamoDbClient,
        string $applicationName,
        string $table,
        ?int $ttl,
        ?string $ttlAttribute
    ) {
        $this->factory = $factory;
        $this->dynamoDbClient = $dynamoDbClient;
        $this->applicationName = $applicationName;
        $this->table = $table;
        $this->ttl = $ttl;
        $this->ttlAttribute = $ttlAttribute;
        $this->marshaler = new Marshaler;
    }

    /**
     * Retrieve a list of batches.
     *
     * @param  int  $limit
     * @param  mixed  $before
     * @return \Illuminate\Bus\Batch[]
     */
    public function get($limit = 50, $before = null)
    {
        $condition = 'application = :application';

        if ($before) {
            $condition = 'application = :application AND id < :id';
        }

        $result = $this->dynamoDbClient->query([
            'TableName' => $this->table,
            'KeyConditionExpression' => $condition,
            'ExpressionAttributeValues' => array_filter([
                ':application' => ['S' => $this->applicationName],
                ':id' => array_filter(['S' => $before]),
            ]),
            'Limit' => $limit,
            'ScanIndexForward' => false,
        ]);

        return array_map(
            fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)),
            $result['Items']
        );
    }

    /**
     * Retrieve information about an existing batch.
     *
     * @param  string  $batchId
     * @return \Illuminate\Bus\Batch|null
     */
    public function find(string $batchId)
    {
        if ($batchId === '') {
            return null;
        }

        $b = $this->dynamoDbClient->getItem([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
        ]);

        if (! isset($b['Item'])) {
            // If we didn't find it via a standard read, attempt consistent read...
            $b = $this->dynamoDbClient->getItem([
                'TableName' => $this->table,
                'Key' => [
                    'application' => ['S' => $this->applicationName],
                    'id' => ['S' => $batchId],
                ],
                'ConsistentRead' => true,
            ]);

            if (! isset($b['Item'])) {
                return null;
            }
        }

        $batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true);

        if ($batch) {
            return $this->toBatch($batch);
        }
    }

    /**
     * Store a new pending batch.
     *
     * @param  \Illuminate\Bus\PendingBatch  $batch
     * @return \Illuminate\Bus\Batch
     */
    public function store(PendingBatch $batch)
    {
        $id = (string) Str::orderedUuid();

        $batch = [
            'id' => $id,
            'name' => $batch->name,
            'total_jobs' => 0,
            'pending_jobs' => 0,
            'failed_jobs' => 0,
            'failed_job_ids' => [],
            'options' => $this->serialize($batch->options ?? []),
            'created_at' => time(),
            'cancelled_at' => null,
            'finished_at' => null,
        ];

        if (! is_null($this->ttl)) {
            $batch[$this->ttlAttribute] = time() + $this->ttl;
        }

        $this->dynamoDbClient->putItem([
            'TableName' => $this->table,
            'Item' => $this->marshaler->marshalItem(
                array_merge(['application' => $this->applicationName], $batch)
            ),
        ]);

        return $this->find($id);
    }

    /**
     * Increment the total number of jobs within the batch.
     *
     * @param  string  $batchId
     * @param  int  $amount
     * @return void
     */
    public function incrementTotalJobs(string $batchId, int $amount)
    {
        $update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';

        if ($this->ttl) {
            $update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':val' => ['N' => "$amount"],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));
    }

    /**
     * Decrement the total number of pending jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return \Illuminate\Bus\UpdatedBatchJobCounts
     */
    public function decrementPendingJobs(string $batchId, string $jobId)
    {
        $update = 'SET pending_jobs = pending_jobs - :inc';

        if ($this->ttl !== null) {
            $update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
        }

        $batch = $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':inc' => ['N' => '1'],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));

        $values = $this->marshaler->unmarshalItem($batch['Attributes']);

        return new UpdatedBatchJobCounts(
            $values['pending_jobs'],
            $values['failed_jobs']
        );
    }

    /**
     * Increment the total number of failed jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return \Illuminate\Bus\UpdatedBatchJobCounts
     */
    public function incrementFailedJobs(string $batchId, string $jobId)
    {
        $update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';

        if ($this->ttl !== null) {
            $update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
        }

        $batch = $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':jobId' => $this->marshaler->marshalValue([$jobId]),
                ':inc' => ['N' => '1'],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));

        $values = $this->marshaler->unmarshalItem($batch['Attributes']);

        return new UpdatedBatchJobCounts(
            $values['pending_jobs'],
            $values['failed_jobs']
        );
    }

    /**
     * Mark the batch that has the given ID as finished.
     *
     * @param  string  $batchId
     * @return void
     */
    public function markAsFinished(string $batchId)
    {
        $update = 'SET finished_at = :timestamp';

        if ($this->ttl !== null) {
            $update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':timestamp' => ['N' => (string) time()],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    /**
     * Cancel the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    public function cancel(string $batchId)
    {
        $update = 'SET cancelled_at = :timestamp, finished_at = :timestamp';

        if ($this->ttl !== null) {
            $update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':timestamp' => ['N' => (string) time()],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    /**
     * Delete the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    public function delete(string $batchId)
    {
        $this->dynamoDbClient->deleteItem([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
        ]);
    }

    /**
     * Execute the given Closure within a storage specific transaction.
     *
     * @param  \Closure  $callback
     * @return mixed
     */
    public function transaction(Closure $callback)
    {
        return $callback();
    }

    /**
     * Rollback the last database transaction for the connection.
     *
     * @return void
     */
    public function rollBack()
    {
    }

    /**
     * Convert the given raw batch to a Batch object.
     *
     * @param  object  $batch
     * @return \Illuminate\Bus\Batch
     */
    protected function toBatch($batch)
    {
        return $this->factory->make(
            $this,
            $batch->id,
            $batch->name,
            (int) $batch->total_jobs,
            (int) $batch->pending_jobs,
            (int) $batch->failed_jobs,
            $batch->failed_job_ids,
            $this->unserialize($batch->options) ?? [],
            CarbonImmutable::createFromTimestamp($batch->created_at, date_default_timezone_get()),
            $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at, date_default_timezone_get()) : $batch->cancelled_at,
            $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at, date_default_timezone_get()) : $batch->finished_at
        );
    }

    /**
     * Create the underlying DynamoDB table.
     *
     * @return void
     */
    public function createAwsDynamoTable(): void
    {
        $definition = [
            'TableName' => $this->table,
            'AttributeDefinitions' => [
                [
                    'AttributeName' => 'application',
                    'AttributeType' => 'S',
                ],
                [
                    'AttributeName' => 'id',
                    'AttributeType' => 'S',
                ],
            ],
            'KeySchema' => [
                [
                    'AttributeName' => 'application',
                    'KeyType' => 'HASH',
                ],
                [
                    'AttributeName' => 'id',
                    'KeyType' => 'RANGE',
                ],
            ],
            'BillingMode' => 'PAY_PER_REQUEST',
        ];

        $this->dynamoDbClient->createTable($definition);

        if (! is_null($this->ttl)) {
            $this->dynamoDbClient->updateTimeToLive([
                'TableName' => $this->table,
                'TimeToLiveSpecification' => [
                    'AttributeName' => $this->ttlAttribute,
                    'Enabled' => true,
                ],
            ]);
        }
    }

    /**
     * Delete the underlying DynamoDB table.
     */
    public function deleteAwsDynamoTable(): void
    {
        $this->dynamoDbClient->deleteTable([
            'TableName' => $this->table,
        ]);
    }

    /**
     * Get the expiry time based on the configured time-to-live.
     *
     * @return string|null
     */
    protected function getExpiryTime(): ?string
    {
        return is_null($this->ttl) ? null : (string) (time() + $this->ttl);
    }

    /**
     * Get the expression attribute name for the time-to-live attribute.
     *
     * @return array
     */
    protected function ttlExpressionAttributeName(): array
    {
        return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute];
    }

    /**
     * Serialize the given value.
     *
     * @param  mixed  $value
     * @return string
     */
    protected function serialize($value)
    {
        return serialize($value);
    }

    /**
     * Unserialize the given value.
     *
     * @param  string  $serialized
     * @return mixed
     */
    protected function unserialize($serialized)
    {
        return unserialize($serialized);
    }

    /**
     * Get the underlying DynamoDB client instance.
     *
     * @return \Aws\DynamoDb\DynamoDbClient
     */
    public function getDynamoClient(): DynamoDbClient
    {
        return $this->dynamoDbClient;
    }

    /**
     * The name of the table that contains the batch records.
     *
     * @return string
     */
    public function getTable(): string
    {
        return $this->table;
    }
}