gdbots/pbjx-php

View on GitHub
src/EventStore/DynamoDb/DynamoDbEventStore.php

Summary

Maintainability
F
6 days
Test Coverage
<?php
declare(strict_types=1);

namespace Gdbots\Pbjx\EventStore\DynamoDb;

use Aws\CommandPool;
use Aws\DynamoDb\DynamoDbClient;
use Aws\Exception\AwsException;
use Aws\ResultInterface;
use Gdbots\Pbj\Marshaler\DynamoDb\ItemMarshaler;
use Gdbots\Pbj\Message;
use Gdbots\Pbj\Util\ClassUtil;
use Gdbots\Pbj\Util\NumberUtil;
use Gdbots\Pbj\WellKnown\Identifier;
use Gdbots\Pbj\WellKnown\Microtime;
use Gdbots\Pbjx\Event\EnrichContextEvent;
use Gdbots\Pbjx\EventStore\EventStore;
use Gdbots\Pbjx\EventStore\StreamSlice;
use Gdbots\Pbjx\Exception\EventNotFound;
use Gdbots\Pbjx\Exception\EventStoreOperationFailed;
use Gdbots\Pbjx\Exception\OptimisticCheckFailed;
use Gdbots\Pbjx\Pbjx;
use Gdbots\Pbjx\PbjxEvents;
use Gdbots\Schemas\Pbjx\Enum\Code;
use Gdbots\Schemas\Pbjx\StreamId;
use GuzzleHttp\Promise\PromiseInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\EventDispatcher\EventDispatcher;

class DynamoDbEventStore implements EventStore
{
    /**
     * The name of the DynamoDb table to write to.  This is the default value
     * and can change based on context provided.
     */
    protected string $tableName;
    protected Pbjx $pbjx;
    protected DynamoDbClient $client;
    protected EventDispatcher $dispatcher;
    protected LoggerInterface $logger;
    protected ItemMarshaler $marshaler;

    public function __construct(
        Pbjx $pbjx,
        DynamoDbClient $client,
        string $tableName,
        EventDispatcher $dispatcher,
        ?LoggerInterface $logger = null
    ) {
        $this->pbjx = $pbjx;
        $this->client = $client;
        $this->tableName = $tableName;
        $this->dispatcher = $dispatcher;
        $this->logger = $logger ?: new NullLogger();
        $this->marshaler = new ItemMarshaler();
    }

    final public function createStorage(array $context = []): void
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $table = new EventStoreTable();
        $table->create($this->client, $this->getTableNameForWrite($context));
    }

    final public function describeStorage(array $context = []): string
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $table = new EventStoreTable();
        return $table->describe($this->client, $this->getTableNameForWrite($context));
    }

    final public function getEvent(Identifier $eventId, array $context = []): Message
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->getTableNameForRead($context);
        $key = $this->getItemKeyByEventId($eventId, $context);
        if (null === $key) {
            throw new EventNotFound();
        }

        try {
            $response = $this->client->getItem(['TableName' => $tableName, 'Key' => $key]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ResourceNotFoundException' === $errorName) {
                    throw new EventNotFound();
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new EventStoreOperationFailed(
                sprintf(
                    '%s while getting [%s] from DynamoDb table [%s] using key [%s].',
                    $errorName,
                    $eventId,
                    $tableName,
                    json_encode($key)
                ),
                $code,
                $e
            );
        }

        if (!isset($response['Item']) || empty($response['Item'])) {
            throw new EventNotFound();
        }

        try {
            $this->marshaler->skipValidation(false);
            $event = $this->marshaler->unmarshal($response['Item']);
        } catch (\Throwable $e) {
            $this->logger->error(
                'Item returned from DynamoDb table [{table_name}] for [{event_id}] could not be unmarshaled.',
                [
                    'exception'  => $e,
                    'item'       => $response['Item'],
                    'context'    => $context,
                    'table_name' => $tableName,
                    'event_id'   => (string)$eventId,
                ]
            );

            throw new EventNotFound();
        }

        return $event;
    }

    final public function getEvents(array $eventIds, array $context = []): array
    {
        $context = $this->enrichContext(__FUNCTION__, $context);

        // todo: optimize using batchGetItem
        $events = [];

        foreach ($eventIds as $eventId) {
            try {
                $event = $this->getEvent($eventId, $context);
                $events[(string)$event->get('event_id')] = $event;
            } catch (EventNotFound $nf) {
                // missing events are not exception worthy at this time
            }
        }

        return $events;
    }

    final public function deleteEvent(Identifier $eventId, array $context = []): void
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->getTableNameForWrite($context);
        $key = $this->getItemKeyByEventId($eventId, $context);
        if (null === $key) {
            return;
        }

        try {
            $this->client->deleteItem(['TableName' => $tableName, 'Key' => $key]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ResourceNotFoundException' === $errorName) {
                    // if it's already deleted, it's fine
                    return;
                } elseif ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new EventStoreOperationFailed(
                sprintf(
                    '%s while deleting [%s] from DynamoDb table [%s] using key [%s].',
                    $errorName,
                    $eventId,
                    $tableName,
                    json_encode($key)
                ),
                $code,
                $e
            );
        }
    }

    final public function getStreamSlice(StreamId $streamId, ?Microtime $since = null, int $count = 25, bool $forward = true, bool $consistent = false, array $context = []): StreamSlice
    {
        $context['stream_id'] = $streamId;
        $context = $this->enrichContext(__FUNCTION__, $context);
        $tableName = $this->getTableNameForRead($context);
        $reindexing = filter_var($context['reindexing'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $skipValidation = filter_var($context['skip_validation'] ?? true, FILTER_VALIDATE_BOOLEAN);
        $count = NumberUtil::bound($count, 1, 100);

        if ($forward) {
            $since = null !== $since ? $since->toString() : '0';
        } else {
            $since = null !== $since ? $since->toString() : Microtime::create()->toString();
        }

        $params = [
            'TableName'                 => $tableName,
            'ExpressionAttributeNames'  => [
                '#hash'  => EventStoreTable::HASH_KEY_NAME,
                '#range' => EventStoreTable::RANGE_KEY_NAME,
            ],
            'KeyConditionExpression'    => sprintf('#hash = :v_id AND #range %s :v_date', $forward ? '>' : '<'),
            'ExpressionAttributeValues' => [
                ':v_id'   => ['S' => $this->streamIdToHashKey($streamId)],
                ':v_date' => ['N' => $since],
            ],
            'ScanIndexForward'          => $forward,
            'Limit'                     => $count,
            'ConsistentRead'            => $consistent,
        ];
        $filterExpressions = [];

        if ($reindexing) {
            $params['ExpressionAttributeNames']['#indexed'] = EventStoreTable::INDEXED_KEY_NAME;
            $filterExpressions[] = 'attribute_exists(#indexed)';
        }

        if (isset($context['curie'])) {
            $params['ExpressionAttributeNames']['#schema'] = '_schema';
            $params['ExpressionAttributeValues'][':v_curie'] = ['S' => trim($context['curie'], '*')];
            $filterExpressions[] = 'contains(#schema, :v_curie)';
        }

        foreach (['s16', 's32', 's64', 's128', 's256'] as $shard) {
            if (isset($context[$shard])) {
                $params['ExpressionAttributeNames']["#{$shard}"] = $shard;
                $params['ExpressionAttributeValues'][":v_{$shard}"] = ['N' => (string)((int)$context[$shard])];
                $filterExpressions[] = "#{$shard} = :v_{$shard}";
            }
        }

        if (!empty($filterExpressions)) {
            $params['FilterExpression'] = implode(' AND ', $filterExpressions);
            unset($params['Limit']);
        }

        try {
            $response = $this->client->query($params);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new EventStoreOperationFailed(
                sprintf(
                    '%s while getting StreamSlice from DynamoDb table [%s] for stream [%s].',
                    $errorName,
                    $tableName,
                    $streamId
                ),
                $code,
                $e
            );
        }

        if (!$response['Count']) {
            return new StreamSlice([], $streamId, $forward, $consistent);
        }

        $events = [];
        $this->marshaler->skipValidation($skipValidation);
        foreach ($response['Items'] as $item) {
            try {
                $events[] = $this->marshaler->unmarshal($item);
            } catch (\Throwable $e) {
                $this->logger->error(
                    'Item returned from DynamoDb table [{table_name}] from stream [{stream_id}] could not be unmarshaled.',
                    [
                        'exception'  => $e,
                        'item'       => $item,
                        'context'    => $context,
                        'table_name' => $tableName,
                        'stream_id'  => (string)$streamId,
                    ]
                );
            }
        }
        $this->marshaler->skipValidation(false);

        return new StreamSlice($events, $streamId, $forward, $consistent, $response['Count'] >= $count);
    }

    final public function putEvents(StreamId $streamId, array $events, ?string $expectedEtag = null, array $context = []): void
    {
        if (!count($events)) {
            // ignore empty events array
            return;
        }

        $context['stream_id'] = $streamId;
        $context = $this->enrichContext(__FUNCTION__, $context);

        if (null !== $expectedEtag) {
            $this->optimisticCheck($streamId, $expectedEtag, $context);
        }

        $this->marshaler->skipValidation(false);
        $tableName = $this->getTableNameForWrite($context);
        $hashKey = $this->streamIdToHashKey($streamId);

        $items = [];
        foreach ($events as $event) {
            $this->pbjx->triggerLifecycle($event);
            $item = $this->marshaler->marshal($event);
            $item[EventStoreTable::HASH_KEY_NAME] = ['S' => $hashKey];
            if ($event::schema()->hasMixin('gdbots:pbjx:mixin:indexed')) {
                $item[EventStoreTable::INDEXED_KEY_NAME] = ['BOOL' => true];
            }
            $this->beforePutItem($item, $streamId, $event, $context);
            $items[] = [
                'Put' => [
                    'Item'      => $item,
                    'TableName' => $tableName,
                ],
            ];
        }

        try {
            $this->client->transactWriteItems([
                // todo: implement ClientRequestToken
                'TransactItems' => $items,
            ]);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::DATA_LOSS->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new EventStoreOperationFailed(
                sprintf(
                    '%s while putting events into DynamoDb table [%s] for stream [%s].',
                    $errorName,
                    $tableName,
                    $streamId
                ),
                $code,
                $e
            );
        }
    }

    final public function pipeEvents(StreamId $streamId, ?Microtime $since = null, ?Microtime $until = null, array $context = []): \Generator
    {
        $context['stream_id'] = $streamId;
        $context = $this->enrichContext(__FUNCTION__, $context);
        $consistent = filter_var($context['consistent'] ?? true, FILTER_VALIDATE_BOOLEAN);
        $reindexing = filter_var($context['reindexing'] ?? false, FILTER_VALIDATE_BOOLEAN);

        do {
            $slice = $this->getStreamSlice($streamId, $since, 100, true, $consistent, $context);
            $since = $slice->getLastOccurredAt();

            /** @var Message $event */
            foreach ($slice as $event) {
                if (null !== $until && $event->get('occurred_at')->toFloat() >= $until->toFloat()) {
                    return;
                }

                if ($reindexing && !$event::schema()->hasMixin('gdbots:pbjx:mixin:indexed')) {
                    continue;
                }

                yield $event;
            }
        } while ($slice->hasMore());
    }

    final public function pipeAllEvents(?Microtime $since = null, ?Microtime $until = null, array $context = []): \Generator
    {
        $context = $this->enrichContext(__FUNCTION__, $context);
        $generator = $this->doPipeAllEvents($since, $until, $context);

        /** @var \SplQueue $queue */
        $queue = $generator->current();

        do {
            $generator->next();
            while (!$queue->isEmpty()) {
                yield $queue->dequeue();
            }
        } while ($generator->valid());
    }

    protected function doPipeAllEvents(?Microtime $since = null, ?Microtime $until = null, array $context = []): \Generator
    {
        $tableName = $this->getTableNameForRead($context);
        $reindexing = filter_var($context['reindexing'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $skipErrors = filter_var($context['skip_errors'] ?? false, FILTER_VALIDATE_BOOLEAN);
        $skipValidation = filter_var($context['skip_validation'] ?? true, FILTER_VALIDATE_BOOLEAN);
        $totalSegments = NumberUtil::bound($context['total_segments'] ?? 16, 1, 64);
        $concurrency = NumberUtil::bound($context['concurrency'] ?? 25, 1, 100);

        $params = ['ExpressionAttributeNames' => [], 'ExpressionAttributeValues' => []];
        $filterExpressions = [];

        if (null !== $since) {
            $params['ExpressionAttributeNames']['#range'] = EventStoreTable::RANGE_KEY_NAME;
            $params['ExpressionAttributeValues'][':v_date_since'] = ['N' => $since->toString()];
            $filterExpressions[] = '#range > :v_date_since';
        }

        if (null !== $until) {
            $params['ExpressionAttributeNames']['#range'] = EventStoreTable::RANGE_KEY_NAME;
            $params['ExpressionAttributeValues'][':v_date_until'] = ['N' => $until->toString()];
            $filterExpressions[] = '#range < :v_date_until';
        }

        if ($reindexing) {
            $params['ExpressionAttributeNames']['#indexed'] = EventStoreTable::INDEXED_KEY_NAME;
            $filterExpressions[] = 'attribute_exists(#indexed)';
        }

        if (isset($context['curie'])) {
            $params['ExpressionAttributeNames']['#schema'] = '_schema';
            $params['ExpressionAttributeValues'][':v_curie'] = ['S' => trim($context['curie'], '*')];
            $filterExpressions[] = 'contains(#schema, :v_curie)';
        }

        foreach (['s16', 's32', 's64', 's128', 's256'] as $shard) {
            if (isset($context[$shard])) {
                $params['ExpressionAttributeNames']["#{$shard}"] = $shard;
                $params['ExpressionAttributeValues'][":v_{$shard}"] = ['N' => (string)((int)$context[$shard])];
                $filterExpressions[] = "#{$shard} = :v_{$shard}";
            }
        }

        if (empty($params['ExpressionAttributeNames'])) {
            unset($params['ExpressionAttributeNames']);
        }

        if (empty($params['ExpressionAttributeValues'])) {
            unset($params['ExpressionAttributeValues']);
        }

        if (!empty($filterExpressions)) {
            $params['FilterExpression'] = implode(' AND ', $filterExpressions);
        }

        $params['TableName'] = $tableName;
        $params['TotalSegments'] = $totalSegments;

        $pending = [];
        $iter2seg = ['prev' => [], 'next' => []];
        for ($segment = 0; $segment < $totalSegments; $segment++) {
            $params['Segment'] = $segment;
            $iter2seg['prev'][] = $segment;
            $pending[] = $this->client->getCommand('Scan', $params);
        }

        $this->marshaler->skipValidation($skipValidation);
        $queue = new \SplQueue();
        yield $queue;

        $fulfilled = function (ResultInterface $result, int|string $iterKey) use (
            $queue, $tableName, $context, $params, &$pending, &$iter2seg
        ) {
            $segment = $iter2seg['prev'][$iterKey];
            foreach ($result['Items'] as $item) {
                $streamId = null;

                try {
                    $streamId = $this->hashKeyToStreamId($item[EventStoreTable::HASH_KEY_NAME]['S']);
                    $event = $this->marshaler->unmarshal($item);
                } catch (\Throwable $e) {
                    $this->logger->error(
                        'Item returned from DynamoDb table [{table_name}] segment [{segment}] ' .
                        'from stream [{stream_id}] could not be unmarshaled.',
                        [
                            'exception'  => $e,
                            'item'       => $item,
                            'context'    => $context,
                            'table_name' => $tableName,
                            'segment'    => $segment,
                            'stream_id'  => (string)$streamId,
                        ]
                    );

                    continue;
                }

                $queue->enqueue([$event, $streamId]);
            }

            if ($result['LastEvaluatedKey']) {
                $params['Segment'] = $segment;
                $params['ExclusiveStartKey'] = $result['LastEvaluatedKey'];
                $pending[] = $this->client->getCommand('Scan', $params);
                $iter2seg['next'][] = $segment;
            } else {
                $this->logger->info(
                    'Scan of DynamoDb table [{table_name}] segment [{segment}] is complete.',
                    [
                        'context'    => $context,
                        'table_name' => $tableName,
                        'segment'    => $segment,
                    ]
                );
            }
        };

        $rejected = function (AwsException $exception, int|string $iterKey, PromiseInterface $aggregatePromise) use (
            $tableName, $context, $skipErrors, &$iter2seg
        ) {
            $segment = $iter2seg['prev'][$iterKey];

            $errorName = $exception->getAwsErrorCode() ?: ClassUtil::getShortName($exception);
            if ('ProvisionedThroughputExceededException' === $errorName) {
                $code = Code::RESOURCE_EXHAUSTED->value;
            } else {
                $code = Code::UNAVAILABLE->value;
            }

            if ($skipErrors) {
                $this->logger->error(
                    sprintf(
                        '%s while scanning DynamoDb table [{table_name}] segment [{segment}].',
                        $errorName
                    ),
                    [
                        'exception'  => $exception,
                        'context'    => $context,
                        'table_name' => $tableName,
                        'segment'    => $segment,
                    ]
                );

                return;
            }

            $this->marshaler->skipValidation(false);
            $aggregatePromise->reject(
                new EventStoreOperationFailed(
                    sprintf(
                        '%s while scanning DynamoDb table [%s] segment [%s].',
                        $errorName,
                        $tableName,
                        $segment
                    ),
                    $code,
                    $exception
                )
            );
        };

        while (count($pending) > 0) {
            $commands = $pending;
            $pending = [];
            $pool = new CommandPool($this->client, $commands, [
                'before'      => function () {
                    gc_collect_cycles();
                },
                'fulfilled'   => $fulfilled,
                'rejected'    => $rejected,
                'concurrency' => $concurrency,
            ]);
            $pool->promise()->wait();
            $iter2seg['prev'] = $iter2seg['next'];
            $iter2seg['next'] = [];
            yield;
        }

        yield;
        $this->marshaler->skipValidation(false);
    }

    protected function enrichContext(string $operation, array $context): array
    {
        if (isset($context['already_enriched'])) {
            return $context;
        }

        $event = new EnrichContextEvent('event_store', $operation, $context);
        $context = $this->dispatcher->dispatch($event, PbjxEvents::ENRICH_CONTEXT)->all();
        $context['already_enriched'] = true;
        return $context;
    }

    /**
     * Override to modify the stream id at write time.
     * Needed because the format of StreamId changed in
     * gdbots/schemas v2.x to include vendor: prefix.
     *
     * @param StreamId $streamId
     *
     * @return string
     */
    protected function streamIdToHashKey(StreamId $streamId): string
    {
        return $streamId->toString();
    }

    /**
     * Override to modify the stream id at read time.
     * Needed because the format of StreamId changed in
     * gdbots/schemas v2.x to include vendor: prefix.
     *
     * @param string $key
     *
     * @return StreamId
     */
    protected function hashKeyToStreamId(string $key): StreamId
    {
        return StreamId::fromString($key);
    }

    /**
     * Override to provide your own logic which determines which
     * table name to use for a read operation.
     *
     * @param array $context
     *
     * @return string
     */
    protected function getTableNameForRead(array $context): string
    {
        return $this->getTableNameForWrite($context);
    }

    /**
     * Override to provide your own logic which determines which
     * table name to use for a write operation.
     *
     * @param array $context
     *
     * @return string
     */
    protected function getTableNameForWrite(array $context): string
    {
        return $context['table_name'] ?? $this->tableName;
    }

    /**
     * Add derived/virtual fields to the item before pushing to DynamoDb.
     * Typically used to create indices or set ttl field, etc.
     *
     * @param array    $item
     * @param StreamId $streamId
     * @param Message  $event
     * @param array    $context
     */
    protected function beforePutItem(array &$item, StreamId $streamId, Message $event, array $context): void
    {
        // override to customize
    }

    /**
     * When needing to get or delete an event by its id we must first
     * get the DynamoDb item key from the GSI.  At this time DynamoDb
     * does not support a batch get from a GSI so this can only handle
     * one event id at a time.
     *
     * @param Identifier $eventId
     * @param array      $context
     *
     * @return array|null
     */
    protected function getItemKeyByEventId(Identifier $eventId, array $context = []): ?array
    {
        $tableName = $this->getTableNameForRead($context);
        $params = [
            'TableName'                 => $tableName,
            'IndexName'                 => EventStoreTable::GSI_EVENT_ID_NAME,
            'Limit'                     => 1,
            'ExpressionAttributeNames'  => [
                '#hash' => EventStoreTable::GSI_EVENT_ID_HASH_KEY_NAME,
            ],
            'ExpressionAttributeValues' => [
                ':v_hash' => ['S' => (string)$eventId],
            ],
            'KeyConditionExpression'    => '#hash = :v_hash',
        ];

        try {
            $response = $this->client->query($params);
        } catch (\Throwable $e) {
            if ($e instanceof AwsException) {
                $errorName = $e->getAwsErrorCode() ?: ClassUtil::getShortName($e);
                if ('ProvisionedThroughputExceededException' === $errorName) {
                    $code = Code::RESOURCE_EXHAUSTED->value;
                } else {
                    $code = Code::UNAVAILABLE->value;
                }
            } else {
                $errorName = ClassUtil::getShortName($e);
                $code = Code::INTERNAL->value;
            }

            throw new EventStoreOperationFailed(
                sprintf(
                    '%s on IndexQuery [%s] on DynamoDb table [%s].',
                    $errorName,
                    EventStoreTable::GSI_EVENT_ID_NAME,
                    $tableName
                ),
                $code,
                $e
            );
        }

        if (!isset($response['Items']) || empty($response['Items'])) {
            return null;
        }

        $item = array_pop($response['Items']);
        return [
            EventStoreTable::HASH_KEY_NAME  => ['S' => $item[EventStoreTable::HASH_KEY_NAME]['S']],
            EventStoreTable::RANGE_KEY_NAME => ['N' => (string)$item[EventStoreTable::RANGE_KEY_NAME]['N']],
        ];
    }

    /**
     * When an expected etag is provided we can check the head of the stream to see if it's
     * at the expected state before appending events.
     *
     * @param StreamId $streamId
     * @param string   $expectedEtag
     * @param array    $context
     *
     * @throws OptimisticCheckFailed
     */
    protected function optimisticCheck(StreamId $streamId, string $expectedEtag, array $context): void
    {
        $slice = $this->getStreamSlice($streamId, null, 1, false, true, $context);

        if (!$slice->count()) {
            throw new OptimisticCheckFailed(
                sprintf(
                    'The DynamoDb table [%s] has no events in stream [%s].',
                    $this->getTableNameForRead($context),
                    $streamId
                )
            );
        }

        $event = $slice->getIterator()->current();

        // todo: review this etag strategy (need to make this more explicit/obvious)
        $eventId = (string)$event->get('event_id');
        if ($eventId === $expectedEtag || md5($eventId) === $expectedEtag) {
            return;
        }

        throw new OptimisticCheckFailed(
            sprintf(
                'The last event [%s:%s] in DynamoDb table [%s] from stream [%s] doesn\'t match expected etag [%s].',
                $eventId,
                md5($eventId),
                $this->getTableNameForRead($context),
                $streamId,
                $expectedEtag
            )
        );
    }
}