gdbots/pbjx-php

View on GitHub
src/EventSearch/Elastica/IndexManager.php

Summary

Maintainability
F
3 days
Test Coverage
<?php
declare(strict_types=1);

namespace Gdbots\Pbjx\EventSearch\Elastica;

use Elastica\Client;
use Elastica\Index;
use Elastica\IndexTemplate;
use Elastica\Mapping;
use Gdbots\Pbj\Message;
use Gdbots\Pbj\MessageResolver;
use Gdbots\Pbjx\Exception\EventSearchOperationFailed;
use Gdbots\Schemas\Pbjx\Enum\Code;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class IndexManager
{
    /**
     * Our "occurred_at" field is a 16 digit integer (seconds + 6 digits microtime)
     * In order to use elasticsearch time range queries we'll store a derived value
     * of the ISO (in UTC/ZULU) into another field.
     *
     * Generally we use "__" to indicate a derived field but kibana won't recognize it
     * and it's already been debated with no fix yet.
     *
     * @link  https://github.com/elastic/kibana/issues/2551
     * @link  https://github.com/elastic/kibana/issues/4762
     *
     * So for now, we'll use "d__" to indicate a derived field for ES.
     *
     * @const string
     */
    const OCCURRED_AT_ISO_FIELD_NAME = 'd__occurred_at_iso';

    /**
     * The name of the index (without any time interval) that all events
     * will be written to and searched in.
     *
     * Indexes are prefix-YYYYq# by default.  By extending this class you
     * can customize the prefix for multi-tenant applications.
     * e.g. client1-prefix-yyyymm.
     *
     * Configurable intervals are coming soon (quarterly, monthly, daily)
     *
     * @var string
     */
    protected string $indexPrefix;

    protected ?LoggerInterface $logger;

    public function __construct(string $indexPrefix, ?LoggerInterface $logger = null)
    {
        $this->indexPrefix = rtrim($indexPrefix, '-') . '-';
        $this->logger = $logger ?: new NullLogger();
    }

    /**
     * Returns the index prefix which can be used in template
     * creation or finding/updating indexes.
     *
     * @return string
     */
    public function getIndexPrefix(): string
    {
        return rtrim($this->indexPrefix, '-');
    }

    /**
     * Returns the name of the index that should be used when only
     * a date and context is available. Typically this is when
     * deleting events and you don't have a search request or
     * an event and can't derive the index from the usual methods.
     *
     * @param \DateTimeInterface $date
     * @param array              $context
     *
     * @return string
     */
    public function getIndexNameFromContext(\DateTimeInterface $date, array $context): string
    {
        return $context['index_name'] ?? $this->indexPrefix . $this->getIndexIntervalSuffix($date);
    }

    /**
     * Returns the name of the index that the event should be written to.
     *
     * @param Message $event
     *
     * @return string
     */
    public function getIndexNameForWrite(Message $event): string
    {
        /** @var \DateTimeInterface $occurredAt */
        $occurredAt = $event->get('occurred_at')->toDateTime();
        return $this->indexPrefix . $this->getIndexIntervalSuffix($occurredAt);
    }

    /**
     * Returns an array of index names (or patterns) that should be
     * queried for the given search request.
     *
     * @param Message $request
     *
     * @return string[]
     */
    public function getIndexNamesForSearch(Message $request): array
    {
        /** @var \DateTime $after */
        /** @var \DateTime $before */
        $after = $request->get('occurred_after');
        $before = $request->get('occurred_before');

        /*
         * when no lower bound is used, we must assume they meant to search
         * all events to the beginning of time.  this could take forever.
         */
        if (null === $after) {
            return [$this->indexPrefix . '*'];
        }

        // if no upper bound is present, make one up
        $end = $before ?: new \DateTime('now', new \DateTimeZone('UTC'));

        /*
         * now, while start is less than end... accumulate the intervals as indices
         * if the span results in too many indices, use a wildcard to prevent urls
         * being too long for a GET request (multi-index searches have index in url)
         */
        $indices = [];
        $start = clone $after;

        do {
            $indices[$this->indexPrefix . $this->getIndexIntervalSuffix($start)] = true;
            $start = $start->modify('+1 week');
        } while ($start < $end);
        $indices[$this->indexPrefix . $this->getIndexIntervalSuffix($start)] = true;

        if (count($indices) > 9) {
            $start = clone $after;
            $indices = [];
            do {
                $indices[$this->indexPrefix . $start->format('Y') . '*'] = true;
                $start = $start->modify('+1 year');
            } while ($start < $end);
        }

        return array_keys($indices);
    }

    /**
     * Returns the suffix that should be used for routing writes
     * and search requests for a given date.
     *
     * todo: make this configureable, right now we're using quarterly by default
     *
     * @param \DateTimeInterface $date
     *
     * @return string
     */
    public function getIndexIntervalSuffix(\DateTimeInterface $date): string
    {
        $quarter = ceil($date->format('n') / 3);
        return $date->format('Y') . 'q' . $quarter;
    }

    /**
     * Creates an index template in elastic search.
     *
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
     *
     * @param Client $client
     * @param string $name
     */
    final public function updateTemplate(Client $client, string $name): void
    {
        $params = [
            'template' => '*' . $name . '*',
            'settings' => [
                'number_of_shards'   => 1,
                'number_of_replicas' => 1,
                'index'              => [
                    'analysis' => [
                        'analyzer'   => $this->getCustomAnalyzers(),
                        'normalizer' => $this->getCustomNormalizers(),
                    ],
                ],
            ],
            'mappings' => $this->createMapping()->toArray(),
        ];

        $this->beforeUpdateTemplate($params);
        $template = new IndexTemplate($client, $name);
        $template->create($params);
        $this->logger->info(sprintf('Successfully created index template [%s].', $params['template']));
    }

    /**
     * Updates an existing index settings and all of its mappings.
     * The index template handles this for any newly created indices but
     * the existing ones need to be updated directly.
     *
     * This is generally only needed for indices that you're still actively
     * writing data to.  If this fails you'll need to delete the index and
     * re-index all data for that time frame.
     *
     * The name can contain wildcards.
     *
     * @param Client $client
     * @param string $name
     */
    final public function updateIndex(Client $client, string $name): void
    {
        $index = new Index($client, $name);

        try {
            $index->setMapping($this->createMapping(), [
                'ignore_unavailable' => true,
                'allow_no_indices'   => true,
            ]);
        } catch (\Throwable $e) {
            if (str_contains($e->getMessage(), 'no such index')) {
                $this->logger->info(sprintf('No index exists yet [%s] in ElasticSearch. Ignoring.', $name));
                return;
            }

            throw new EventSearchOperationFailed(
                sprintf('Failed to put mapping for index [%s] into ElasticSearch.', $name),
                Code::INTERNAL->value,
                $e
            );
        }

        $this->logger->info(sprintf('Successfully put mapping for index [%s]', $name));
        $this->updateAnalyzers($index, $name);
        $this->updateNormalizers($index, $name);
    }

    /**
     * Checks if an existing index is missing any custom analyzers
     * and if it is, updates settings to include them.
     *
     * @param Index  $index
     * @param string $name
     */
    protected function updateAnalyzers(Index $index, string $name): void
    {
        $settings = $index->getSettings();
        $customAnalyzers = $this->getCustomAnalyzers();
        $missingAnalyzers = [];

        try {
            foreach ($customAnalyzers as $id => $analyzer) {
                if (!$settings->get("analysis.analyzer.{$id}")) {
                    $missingAnalyzers[$id] = $analyzer;
                }
            }
        } catch (\Throwable $e) {
            $this->logger->error(
                sprintf('Unable to read index [%s] and get analyzer settings.', $name),
                ['exception' => $e, 'index_name' => $name]
            );

            return;
        }

        if (empty($missingAnalyzers)) {
            $this->logger->info(
                sprintf(
                    'Index [%s] has all custom analyzers [%s].',
                    $name,
                    implode(',', array_keys($customAnalyzers))
                )
            );

            return;
        }

        $this->logger->warning(
            sprintf(
                'Closing index [%s] to add custom analyzers [%s].',
                $name,
                implode(',', array_keys($missingAnalyzers))
            )
        );

        try {
            $index->close();
            $index->setSettings(['analysis' => ['analyzer' => $customAnalyzers]]);
            $index->open();
        } catch (\Throwable $e) {
            $this->logger->error(
                sprintf('Unable to close index [%s] and update analyzer settings.', $name),
                ['exception' => $e, 'index_name' => $name]
            );

            return;
        }

        $this->logger->info(sprintf('Successfully added missing analyzers to index [%s]', $name));
    }

    /**
     * Checks if an existing index is missing any custom normalizers
     * and if it is, updates settings to include them.
     *
     * @param Index  $index
     * @param string $name
     */
    protected function updateNormalizers(Index $index, string $name): void
    {
        $settings = $index->getSettings();
        $customNormalizers = $this->getCustomNormalizers();
        $missingNormalizers = [];

        try {
            foreach ($customNormalizers as $id => $normalizer) {
                if (!$settings->get("analysis.normalizer.{$id}")) {
                    $missingNormalizers[$id] = $normalizer;
                }
            }
        } catch (\Throwable $e) {
            $this->logger->error(
                sprintf('Unable to read index [%s] and get normalizer settings.', $name),
                ['exception' => $e, 'index_name' => $name]
            );

            return;
        }

        if (empty($missingNormalizers)) {
            $this->logger->info(
                sprintf(
                    'Index [%s] has all custom normalizers [%s].',
                    $name,
                    implode(',', array_keys($customNormalizers))
                )
            );

            return;
        }

        $this->logger->warning(
            sprintf(
                'Closing index [%s] to add custom normalizers [%s].',
                $name,
                implode(',', array_keys($missingNormalizers))
            )
        );

        try {
            $index->close();
            $index->setSettings(['analysis' => ['normalizer' => $missingNormalizers]]);
            $index->open();
        } catch (\Throwable $e) {
            $this->logger->error(
                sprintf('Unable to close index [%s] and update normalizer settings.', $name),
                ['exception' => $e, 'index_name' => $name]
            );

            return;
        }

        $this->logger->info(sprintf('Successfully added missing normalizers to index [%s]', $name));
    }

    protected function beforeUpdateTemplate(array &$params): void
    {
        // Override to customize the template params before it's pushed to elastic search.
    }

    protected function createMapping(): Mapping
    {
        $builder = $this->getMappingBuilder();
        foreach (MessageResolver::findAllUsingMixin('gdbots:pbjx:mixin:indexed:v1') as $curie) {
            $builder->addSchema(MessageResolver::resolveCurie($curie)::schema());
        }

        $mapping = $builder->build();
        $properties = $mapping->getProperties();
        $properties[self::OCCURRED_AT_ISO_FIELD_NAME] = MappingBuilder::TYPES['date'];
        $mapping->setProperties($properties);

        return $mapping;
    }

    protected function getMappingBuilder(): MappingBuilder
    {
        return new MappingBuilder();
    }

    /**
     * @link http://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-custom-analyzer.html
     *
     * @return array
     */
    protected function getCustomAnalyzers(): array
    {
        return MappingBuilder::getCustomAnalyzers();
    }

    /**
     * @link https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-normalizers.html
     *
     * @return array
     */
    protected function getCustomNormalizers(): array
    {
        return MappingBuilder::getCustomNormalizers();
    }
}