honeybee/honeybee

View on GitHub
src/Infrastructure/DataAccess/Storage/CouchDb/EventStream/EventStreamReader.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Honeybee\Infrastructure\DataAccess\Storage\CouchDb\EventStream;

use GuzzleHttp\Exception\RequestException;
use Honeybee\Common\Error\RuntimeError;
use Honeybee\Model\Event\AggregateRootEventList;
use Honeybee\Model\Event\EventStream;
use Honeybee\Infrastructure\Config\Settings;
use Honeybee\Infrastructure\Config\SettingsInterface;
use Honeybee\Infrastructure\DataAccess\Storage\CouchDb\CouchDbStorage;
use Honeybee\Infrastructure\DataAccess\Storage\StorageReaderInterface;
use Honeybee\Infrastructure\DataAccess\Storage\StorageReaderIterator;

class EventStreamReader extends CouchDbStorage implements StorageReaderInterface
{
    const STARTKEY_FILTER = '["%s", {}]';

    const ENDKEY_FILTER = '["%s", 1]';

    protected $next_identifier = null;

    protected $identifier_list;

    public function read($identifier, SettingsInterface $settings = null)
    {
        try {
            $view_params = [
                'startkey' => sprintf(self::STARTKEY_FILTER, $identifier),
                'endkey' => sprintf(self::ENDKEY_FILTER, $identifier),
                'include_docs' => 'true',
                'reduce' => 'false',
                'descending' => 'true',
                'limit' => 5000 // @todo use snapshot size config setting as soon as available
            ];
            if (!$this->config->has('design_doc')) {
                throw new RuntimeError(
                    'Missing setting for "design_doc" that holds the name of the couchdb design document, ' .
                    'that is expected to contain the event_stream view.'
                );
            }
            $view_path = sprintf(
                '/_design/%s/_view/%s',
                $this->config->get('design_doc'),
                $this->config->get('view_name', 'event_stream')
            );
            $response = $this->request($view_path, self::METHOD_GET, [], $view_params);
            $result_data = json_decode($response->getBody(), true);
        } catch (RequestException $error) {
            $error_response = $error->getResponse();
            if ($error_response !== null && $error_response->getStatusCode() === 404) {
                return null;
            } else {
                throw $error;
            }
        }

        if ($result_data['total_rows'] > 0) {
            return $this->createEventStream($identifier, array_reverse($result_data['rows']));
        }

        return null;
    }

    public function readAll(SettingsInterface $settings = null)
    {
        $settings = $settings ?: new Settings;

        if ($settings->get('first', true)) {
            $this->identifier_list = $this->fetchEventStreamIdentifiers();
        }
        $this->next_identifier = key($this->identifier_list);
        next($this->identifier_list);

        if (!$this->next_identifier) {
            return [];
        }

        return [ $this->read($this->next_identifier, $settings) ];
    }

    public function getIterator()
    {
        return new StorageReaderIterator($this);
    }

    protected function createEventStream($identifier, array $event_stream_data)
    {
        $events = new AggregateRootEventList;
        foreach ($event_stream_data as $event_data) {
            $event_data = $event_data['doc'];
            if (!isset($event_data[self::OBJECT_TYPE])) {
                throw new RuntimeError("Missing type key within event data.");
            }
            $event_class = $event_data[self::OBJECT_TYPE];
            $events->addItem(new $event_class($event_data));
        }
        $data['identifier'] = $identifier;
        $data['events'] = $events;

        return new EventStream($data);
    }

    protected function fetchEventStreamIdentifiers()
    {
        $event_stream_keys = [];
        $view_name = sprintf('/_design/default_views/_view/%s', $this->config->get('view_name'));

        $request_params = [
            'group' => 'true',
            'group_level' => 1,
            'reduce' => 'true'
        ];

        $response = $this->request(
            sprintf('/_design/default_views/_view/%s', $this->config->get('view_name')),
            self::METHOD_GET,
            [],
            $request_params
        );
        $result_data = json_decode($response->getBody(), true);

        foreach ($result_data['rows'] as $row) {
            $event_stream_keys[$row['key'][0]] = $row['value'];
        }

        return $event_stream_keys;
    }
}