gdbots/ncr-php

View on GitHub
src/Repository/DynamoDb/DynamoDbNcr.php

Summary

Maintainability
F
5 days
Test Coverage
<?php
declare(strict_types=1);

namespace Gdbots\Ncr\Repository\DynamoDb;

use Aws\CommandPool;
use Aws\DynamoDb\DynamoDbClient;
use Aws\Exception\AwsException;
use Aws\ResultInterface;
use Gdbots\Ncr\Exception\NodeNotFound;
use Gdbots\Ncr\Exception\OptimisticCheckFailed;
use Gdbots\Ncr\Exception\RepositoryIndexNotFound;
use Gdbots\Ncr\Exception\RepositoryOperationFailed;
use Gdbots\Ncr\IndexQuery;
use Gdbots\Ncr\IndexQueryResult;
use Gdbots\Ncr\Ncr;
use Gdbots\Pbj\Marshaler\DynamoDb\ItemMarshaler;
use Gdbots\Pbj\Message;
use Gdbots\Pbj\SchemaQName;
use Gdbots\Pbj\Util\ClassUtil;
use Gdbots\Pbj\Util\NumberUtil;
use Gdbots\Pbj\WellKnown\NodeRef;
use Gdbots\Pbjx\Event\EnrichContextEvent;
use Gdbots\Pbjx\PbjxEvents;
use Gdbots\Schemas\Pbjx\Enum\Code;
use GuzzleHttp\Promise\PromiseInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\EventDispatcher\EventDispatcher;

final class DynamoDbNcr implements Ncr
{
    private DynamoDbClient $client;
    private EventDispatcher $dispatcher;
    private TableManager $tableManager;
    private array $config;
    private LoggerInterface $logger;
    private ItemMarshaler $marshaler;
    private ?IndexQueryFilterProcessor $filterProcessor = null;

    public function __construct(
        DynamoDbClient $client,
        EventDispatcher $dispatcher,
        TableManager $tableManager,
        array $config = [],
        ?LoggerInterface $logger = null
    ) {
        // defaults
        $config += [
            'batch_size'  => 100,
            'concurrency' => 25,
        ];

        $config['batch_size'] = NumberUtil::bound($config['batch_size'], 2, 100);
        $config['concurrency'] = NumberUtil::bound($config['concurrency'], 1, 50);

        $this->client = $client;
        $this->dispatcher = $dispatcher;
        $this->tableManager = $tableManager;
        $this->config = $config;
        $this->logger = $logger ?: new NullLogger();
        $this->marshaler = new ItemMarshaler();
    }

    public function createStorage(SchemaQName $qname, array $context = []): void
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($qname, $context);
        $this->tableManager->getNodeTable($qname)->create($this->client, $tableName);
    }

    public function describeStorage(SchemaQName $qname, array $context = []): string
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($qname, $context);
        return $this->tableManager->getNodeTable($qname)->describe($this->client, $tableName);
    }

    public function hasNode(NodeRef $nodeRef, bool $consistent = false, array $context = []): bool
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($nodeRef->getQName(), $context);

        try {
            $response = $this->client->getItem([
                'ConsistentRead'           => $consistent,
                'TableName'                => $tableName,
                'ProjectionExpression'     => '#node_ref',
                'ExpressionAttributeNames' => ['#node_ref' => NodeTable::HASH_KEY_NAME],
                'Key'                      => [NodeTable::HASH_KEY_NAME => ['S' => $nodeRef->toString()]],
            ]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ResourceNotFoundException' === $errorName) {
                    return false;
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new RepositoryOperationFailed(
                sprintf(
                    '%s while checking for [%s] in DynamoDb table [%s].',
                    $errorName,
                    $nodeRef,
                    $tableName
                ),
                $code,
                $e
            );
        }

        return isset($response['Item']);
    }

    public function getNode(NodeRef $nodeRef, bool $consistent = false, array $context = []): Message
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($nodeRef->getQName(), $context);

        try {
            $response = $this->client->getItem([
                'ConsistentRead' => $consistent,
                'TableName'      => $tableName,
                'Key'            => [NodeTable::HASH_KEY_NAME => ['S' => $nodeRef->toString()]],
            ]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ResourceNotFoundException' === $errorName) {
                    throw NodeNotFound::forNodeRef($nodeRef, $e);
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new RepositoryOperationFailed(
                sprintf(
                    '%s while getting [%s] from DynamoDb table [%s].',
                    $errorName,
                    $nodeRef,
                    $tableName
                ),
                $code,
                $e
            );
        }

        if (!isset($response['Item']) || empty($response['Item'])) {
            throw NodeNotFound::forNodeRef($nodeRef);
        }

        $skipValidation = filter_var($context['skip_validation'] ?? !$consistent, FILTER_VALIDATE_BOOLEAN);
        $this->marshaler->skipValidation($skipValidation);

        try {
            $node = $this->marshaler->unmarshal($response['Item']);
        } catch (\Throwable $e) {
            $this->logger->error(
                'Item returned from DynamoDb table [{table_name}] for [{node_ref}] could not be unmarshaled.',
                [
                    'exception'  => $e,
                    'item'       => $response['Item'],
                    'context'    => $context,
                    'table_name' => $tableName,
                    'node_ref'   => (string)$nodeRef,
                ]
            );

            $this->marshaler->skipValidation(false);
            throw NodeNotFound::forNodeRef($nodeRef, $e);
        }

        $this->marshaler->skipValidation(false);
        return $node;
    }

    public function getNodes(array $nodeRefs, bool $consistent = false, array $context = []): array
    {
        $context = $this->enrichContext(__FUNCTION__, $context);

        if (empty($nodeRefs)) {
            return [];
        } elseif (count($nodeRefs) === 1) {
            try {
                $nodeRef = array_shift($nodeRefs);
                return [(string)$nodeRef => $this->getNode($nodeRef, $consistent, $context)];
            } catch (NodeNotFound $e) {
                return [];
            } catch (\Throwable $e) {
                throw $e;
            }
        }

        $batch = (new BatchGetItemRequest($this->client))
            ->batchSize($this->config['batch_size'])
            ->concurrency($this->config['concurrency'])
            ->consistentRead($consistent)
            ->onError(function (AwsException $e) use ($context) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                $this->logger->error(
                    sprintf('%s while processing BatchGetItemRequest.', $errorName),
                    ['exception' => $e, 'context' => $context]
                );
            });

        foreach ($nodeRefs as $nodeRef) {
            $tableName = $this->tableManager->getNodeTableName($nodeRef->getQName(), $context);
            $batch->addItemKey($tableName, [NodeTable::HASH_KEY_NAME => ['S' => $nodeRef->toString()]]);
        }

        $skipValidation = filter_var($context['skip_validation'] ?? !$consistent, FILTER_VALIDATE_BOOLEAN);
        $this->marshaler->skipValidation($skipValidation);
        $nodes = [];

        foreach ($batch->getItems() as $item) {
            try {
                $nodeRef = NodeRef::fromString($item[NodeTable::HASH_KEY_NAME]['S']);
                $nodes[$nodeRef->toString()] = $this->marshaler->unmarshal($item);
            } catch (\Throwable $e) {
                $this->logger->error(
                    'Item returned from DynamoDb table could not be unmarshaled.',
                    ['exception' => $e, 'item' => $item, 'context' => $context]
                );
            }
        }

        $this->marshaler->skipValidation(false);
        return $nodes;
    }

    public function putNode(Message $node, ?string $expectedEtag = null, array $context = []): void
    {
        $node->freeze();
        $nodeRef = $node->generateNodeRef();

        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($nodeRef->getQName(), $context);
        $table = $this->tableManager->getNodeTable($nodeRef->getQName());
        $this->marshaler->skipValidation(false);

        $params = ['TableName' => $tableName];
        if (null !== $expectedEtag) {
            $params['ExpressionAttributeValues'] = [':v_etag' => ['S' => (string)$expectedEtag]];
            $params['ConditionExpression'] = 'etag = :v_etag';
        }

        try {
            $item = $this->marshaler->marshal($node);
            $item[NodeTable::HASH_KEY_NAME] = ['S' => $nodeRef->toString()];
            $table->beforePutItem($item, $node);
            $params['Item'] = $item;
            $this->client->putItem($params);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ConditionalCheckFailedException' === $errorName) {
                    throw new OptimisticCheckFailed(
                        sprintf(
                            'NodeRef [%s] in DynamoDb table [%s] did not have expected etag [%s].',
                            $nodeRef,
                            $tableName,
                            $expectedEtag
                        ),
                        $e
                    );
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new RepositoryOperationFailed(
                sprintf(
                    '%s while putting [%s] into DynamoDb table [%s].',
                    $errorName,
                    $nodeRef,
                    $tableName
                ),
                $code,
                $e
            );
        }
    }

    public function deleteNode(NodeRef $nodeRef, array $context = []): void
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($nodeRef->getQName(), $context);

        try {
            $this->client->deleteItem([
                'TableName' => $tableName,
                'Key'       => [NodeTable::HASH_KEY_NAME => ['S' => $nodeRef->toString()]],
            ]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ResourceNotFoundException' === $errorName) {
                    // if it's already deleted, it's fine
                    return;
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new RepositoryOperationFailed(
                sprintf(
                    '%s while deleting [%s] from DynamoDb table [%s].',
                    $errorName,
                    $nodeRef,
                    $tableName
                ),
                $code,
                $e
            );
        }
    }

    public function findNodeRefs(IndexQuery $query, array $context = []): IndexQueryResult
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->tableManager->getNodeTableName($query->getQName(), $context);
        $table = $this->tableManager->getNodeTable($query->getQName());

        if (!$table->hasIndex($query->getAlias())) {
            throw new RepositoryIndexNotFound(
                sprintf(
                    '%s::Index [%s] does not exist on table [%s].',
                    ClassUtil::getShortName($table),
                    $query->getAlias(),
                    $tableName
                )
            );
        }

        $params = $table->getIndex($query->getAlias())->createQuery($query);
        $params['TableName'] = $tableName;
        $unprocessedFilters = [];

        /*
         * When unprocessed_filters exist we must fetch the attributes from the
         * index so we can evaluate them in php before returning the result.
         */
        if (isset($params['unprocessed_filters'])) {
            $unprocessedFilters = $params['unprocessed_filters'];
            unset($params['unprocessed_filters']);
        } else {
            $params['ProjectionExpression'] = '#node_ref';
        }

        try {
            $response = $this->client->query($params);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new RepositoryOperationFailed(
                sprintf(
                    '%s on IndexQuery [%s] on DynamoDb table [%s].',
                    $errorName,
                    $query->getAlias(),
                    $tableName
                ),
                $code,
                $e
            );
        }

        if (!isset($response['Items']) || empty($response['Items'])) {
            return new IndexQueryResult($query);
        }

        if ($unprocessedFilters) {
            if (null === $this->filterProcessor) {
                $this->filterProcessor = new IndexQueryFilterProcessor();
            }
            $response['Items'] = $this->filterProcessor->filter($response['Items'], $unprocessedFilters);
        }

        $nodeRefs = [];
        foreach ($response['Items'] as $item) {
            try {
                $nodeRefs[] = NodeRef::fromString($item[NodeTable::HASH_KEY_NAME]['S']);
            } catch (\Throwable $e) {
                $this->logger->error(
                    'NodeRef returned from IndexQuery [{index_alias}] on DynamoDb table [{table_name}] is invalid.',
                    [
                        'exception'   => $e,
                        'item'        => $item,
                        'context'     => $context,
                        'index_alias' => $query->getAlias(),
                        'index_query' => $query->toArray(),
                        'table_name'  => $tableName,
                    ]
                );
            }
        }

        if (isset($response['LastEvaluatedKey'])/* && $response['Count'] !== $response['ScannedCount']*/) {
            $nextCursor = base64_encode(json_encode($response['LastEvaluatedKey']));
        } else {
            $nextCursor = null;
        }

        return new IndexQueryResult($query, array_slice($nodeRefs, 0, $query->getCount()), $nextCursor);
    }

    public function pipeNodes(SchemaQName $qname, array $context = []): \Generator
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $context['node_refs_only'] = false;
        $generator = $this->doPipeNodes($qname, $context);

        /** @var \SplQueue $queue */
        $queue = $generator->current();

        do {
            $generator->next();
            while (!$queue->isEmpty()) {
                yield $queue->dequeue();
            }
        } while ($generator->valid());
    }

    public function pipeNodeRefs(SchemaQName $qname, array $context = []): \Generator
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $context['node_refs_only'] = true;
        $generator = $this->doPipeNodes($qname, $context);

        /** @var \SplQueue $queue */
        $queue = $generator->current();

        do {
            $generator->next();
            while (!$queue->isEmpty()) {
                yield $queue->dequeue();
            }
        } while ($generator->valid());
    }

    private function doPipeNodes(SchemaQName $qname, array $context): \Generator
    {
        static $alreadyPiped = [];

        $tableName = $context['table_name'] ?? $this->tableManager->getNodeTableName($qname, $context);
        $reindexing = filter_var($context['reindexing'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $reindexAll = filter_var($context['reindex_all'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $skipErrors = filter_var($context['skip_errors'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $skipValidation = filter_var($context['skip_validation'] ?? true, FILTER_VALIDATE_BOOLEAN);
        $totalSegments = NumberUtil::bound($context['total_segments'] ?? 16, 1, 64);
        $concurrency = NumberUtil::bound($context['concurrency'] ?? 25, 1, 100);

        $this->marshaler->skipValidation($skipValidation);
        $queue = new \SplQueue();
        yield $queue;

        if ($reindexing && isset($alreadyPiped[$tableName])) {
            // multiple qnames can be in the same table.
            return;
        }

        $alreadyPiped[$tableName] = true;
        $params = [
            'ExpressionAttributeNames'  => [
                '#node_ref' => NodeTable::HASH_KEY_NAME,
            ],
            'ExpressionAttributeValues' => [
                ':v_qname' => ['S' => $qname->toString() . ':'],
            ],
        ];

        $filterExpressions = $reindexAll ? [] : ['begins_with(#node_ref, :v_qname)'];

        foreach (['s16', 's32', 's64', 's128', 's256'] as $shard) {
            if (isset($context[$shard])) {
                $params['ExpressionAttributeNames']["#{$shard}"] = "__{$shard}";
                $params['ExpressionAttributeValues'][":v_{$shard}"] = ['N' => (string)((int)$context[$shard])];
                $filterExpressions[] = "#{$shard} = :v_{$shard}";
            }
        }

        if (isset($context['status'])) {
            $params['ExpressionAttributeNames']['#status'] = 'status';
            $params['ExpressionAttributeValues'][':v_status'] = ['S' => (string)$context['status']];
            $filterExpressions[] = '#status = :v_status';
        }

        if ($context['node_refs_only']) {
            $params['ProjectionExpression'] = '#node_ref';
        }

        $params['TableName'] = $tableName;
        $params['TotalSegments'] = $totalSegments;
        if (!empty($filterExpressions)) {
            $params['FilterExpression'] = implode(' AND ', $filterExpressions);
        } else {
            unset($params['ExpressionAttributeNames']);
            unset($params['ExpressionAttributeValues']);
        }

        $pending = [];
        $iter2seg = ['prev' => [], 'next' => []];
        for ($segment = 0; $segment < $totalSegments; $segment++) {
            $params['Segment'] = $segment;
            $iter2seg['prev'][] = $segment;
            $pending[] = $this->client->getCommand('Scan', $params);
        }

        $fulfilled = function (ResultInterface $result, int|string $iterKey) use (
            $qname, $queue, $tableName, $context, $params, &$pending, &$iter2seg
        ) {
            $segment = $iter2seg['prev'][$iterKey];

            foreach ($result['Items'] as $item) {
                $node = null;
                try {
                    $nodeRef = NodeRef::fromString($item[NodeTable::HASH_KEY_NAME]['S']);
                    if (!$context['node_refs_only']) {
                        $node = $this->marshaler->unmarshal($item);
                    }
                } catch (\Throwable $e) {
                    $this->logger->error(
                        'Item returned from DynamoDb table [{table_name}] segment [{segment}] ' .
                        'for QName [{qname}] could not be unmarshaled.',
                        [
                            'exception'  => $e,
                            'item'       => $item,
                            'context'    => $context,
                            'table_name' => $tableName,
                            'segment'    => $segment,
                            'qname'      => (string)$qname,
                        ]
                    );

                    continue;
                }

                if ($context['node_refs_only']) {
                    $queue->enqueue($nodeRef);
                } else {
                    $queue->enqueue($node);
                }
            }

            if ($result['LastEvaluatedKey']) {
                $params['Segment'] = $segment;
                $params['ExclusiveStartKey'] = $result['LastEvaluatedKey'];
                $pending[] = $this->client->getCommand('Scan', $params);
                $iter2seg['next'][] = $segment;
            } else {
                $this->logger->info(
                    'Scan of DynamoDb table [{table_name}] segment [{segment}] for QName [{qname}] is complete.',
                    [
                        'context'    => $context,
                        'table_name' => $tableName,
                        'segment'    => $segment,
                        'qname'      => (string)$qname,
                    ]
                );
            }
        };

        $rejected = function (AwsException $exception, int|string $iterKey, PromiseInterface $aggregatePromise) use (
            $qname, $tableName, $context, $skipErrors, &$iter2seg
        ) {
            $segment = $iter2seg['prev'][$iterKey];

            $errorName = $exception->getAwsErrorCode() ?: ClassUtil::getShortName($exception);
            if ('ProvisionedThroughputExceededException' === $errorName) {
                $code = Code::RESOURCE_EXHAUSTED->value;
            } else {
                $code = Code::UNAVAILABLE->value;
            }

            if ($skipErrors) {
                $this->logger->error(
                    sprintf(
                        '%s while scanning DynamoDb table [{table_name}] segment [{segment}] for QName [{qname}].',
                        $errorName
                    ),
                    [
                        'exception'  => $exception,
                        'context'    => $context,
                        'table_name' => $tableName,
                        'segment'    => $segment,
                        'qname'      => (string)$qname,
                    ]
                );

                return;
            }

            $this->marshaler->skipValidation(false);
            $aggregatePromise->reject(
                new RepositoryOperationFailed(
                    sprintf(
                        '%s while scanning DynamoDb table [%s] segment [%s] for QName [%s].',
                        $errorName,
                        $tableName,
                        $segment,
                        (string)$qname
                    ),
                    $code,
                    $exception
                )
            );
        };

        while (count($pending) > 0) {
            $commands = $pending;
            $pending = [];
            $pool = new CommandPool($this->client, $commands, [
                'before'      => function () {
                    gc_collect_cycles();
                },
                'fulfilled'   => $fulfilled,
                'rejected'    => $rejected,
                'concurrency' => $concurrency,
            ]);
            $pool->promise()->wait();
            $iter2seg['prev'] = $iter2seg['next'];
            $iter2seg['next'] = [];
            yield;
        }

        yield;
        $this->marshaler->skipValidation(false);
    }

    protected function enrichContext(string $operation, array $context): array
    {
        if (isset($context['already_enriched'])) {
            return $context;
        }

        $event = new EnrichContextEvent('ncr', $operation, $context);
        $context = $this->dispatcher->dispatch($event, PbjxEvents::ENRICH_CONTEXT)->all();
        $context['already_enriched'] = true;
        return $context;
    }
}