datashaman/elasticsearch-cache

View on GitHub
src/ElasticsearchStore.php

Summary

Maintainability
A
35 mins
Test Coverage
<?php

namespace Datashaman\Elasticsearch\Cache;

use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\Missing404Exception;
use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
use GuzzleHttp\Ring\Future\FutureArray;
use Illuminate\Cache\TaggableStore;
use Illuminate\Contracts\Cache\Store;

class ElasticsearchStore extends TaggableStore implements Store
{
    /**
     * The Elasticsearch client instance.
     *
     * @var \Elasticsearch\Client
     */
    protected $client;

    /**
     * The prefix, used in creating the Elasticsearch index name (prefix-cache).
     *
     * @var string
     */
    protected $prefix;

    /**
     * The Elasticsearch document type.
     *
     * @var string
     */
    protected $type = 'cache';

    /**
     * Create a new Elasticsearch store.
     *
     * @param  \Elasticsearch\Client  $client
     * @param  string                 $prefix
     * @return void
     */
    public function __construct(Client $client, $prefix='')
    {
        $this->setPrefix($prefix);
        $this->client = $client;
    }

    protected function unserialize($value)
    {
        // return json_decode($value, true);
        return is_numeric($value) ? $value : unserialize($value);
    }

    protected function serialize($value)
    {
        // return json_encode($value);
        return is_numeric($value) ? $value : serialize($value);
    }

    protected function resolveWait($result)
    {
        while ($result instanceof FutureArray) {
            $result = $result->wait();
        }
        return $result;
    }

    /**
     * Retrieve an item from the cache by key.
     *
     * @param  string|array  $key
     * @return mixed
     */
    public function get($key)
    {
        $tries = 0;

        while (true) {
            try {
                $response = $this->client->get([
                    'index' => $this->getIndex(),
                    'type' => $this->type,
                    'id' => $key,
                ]);

                return $this->unserialize($response['_source']['value']);
            } catch (Missing404Exception $e) {
                return null;
            } catch (ServerErrorResponseException $e) {
                // assume it'll clear up, sleep for a bit
                sleep(1);
                $tries++;
            }

            if ($tries > 3) {
                return null;
            }
        }
    }

    /**
     * Retrieve multiple items from the cache by key.
     *
     * Items not found in the cache will have a null value.
     *
     * @param  array  $keys
     * @return array
     */
    public function many(array $keys)
    {
        $params = [
            'index' => $this->getIndex(),
            'body' => [
                'query' => [
                    'constant_score' => [
                        'filter' => [
                            'ids' => [
                                'type' => $this->type,
                                'values' => $keys,
                            ],
                        ],
                    ],
                ],
            ],
            'size' => count($keys),
        ];

        $response = $this->client->search($params);

        $return = collect($response['hits']['hits'])
            ->reduce(function ($carry, $hit) {
                $carry[$hit['_id']] = $this->unserialize($hit['_source']['value']);
                return $carry;
            }, collect())
            ->all();

        return $return;
    }

    /**
     * Store an item in the cache for a given number of minutes.
     *
     * @param  string  $key
     * @param  mixed   $value
     * @param  int     $minutes
     * @return void
     */
    public function put($key, $value, $minutes)
    {
        $value = $this->serialize($value);

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,
            'body' => compact('value'),
        ];

        if ($minutes > 0) {
            $params['ttl'] = $minutes.'m';
        }

        $this->resolveWait($this->client->index($params));
    }

    /**
     * Store multiple items in the cache for a given number of minutes.
     *
     * @param  array  $values
     * @param  int  $minutes
     * @return void
     */
    public function putMany(array $values, $minutes)
    {
        $meta = [];

        if ($minutes > 0) {
            $meta['_ttl'] = $minutes.'m';
        }

        $rows = collect();

        foreach ($values as $key => $raw) {
            $meta['_id'] = $key;

            $params['index'] = $meta;
            $rows[] = $params;

            $value = $this->serialize($raw);
            $rows[] = compact('value');
        }

        $body = $rows->map('json_encode')
            ->implode("\n") . "\n";

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'body' => $body,
        ];

        $this->resolveWait($this->client->bulk($params));
    }

    /**
     * Increment the value of an item in the cache.
     *
     * @param  string  $key
     * @param  mixed   $value
     * @return int|bool
     */
    public function increment($key, $value = 1)
    {
        $original = $this->get($key);

        $value += $original;

        $doc = compact('value');

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,
            'body' => compact('doc'),
            'retry_on_conflict' => 3,
        ];

        $this->resolveWait($this->client->update($params));
    }

    /**
     * Decrement the value of an item in the cache.
     *
     * @param  string  $key
     * @param  mixed   $value
     * @return int|bool
     */
    public function decrement($key, $value = 1)
    {
        return $this->increment($key, -$value);
    }

    /**
     * Store an item in the cache indefinitely.
     *
     * @param  string  $key
     * @param  mixed   $value
     * @return void
     */
    public function forever($key, $value)
    {
        $this->put($key, $value, 0);
    }

    /**
     * Remove an item from the cache.
     *
     * @param  string  $key
     * @return bool
     */
    public function forget($key)
    {
        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,
        ];

        return $this->resolveWait($this->client->delete($params));
    }

    public function scrollAndScan(callable $callback, $scroll = '10s', $size = 50, $query = ['match_all' => []])
    {
        $params = [
            'search_type' => 'scan',    // use search_type=scan
            'scroll' => $scroll,        // the length of time to hold the context consistent (make it short, eg 10s)
            'size' => $size,            // how many results *per shard* you want back
            'index' => $this->getIndex(),
            'type' => $this->type,
            'body' => [
                'query' => $query,
            ]
        ];

        $response = $this->client->search($params);
        $scroll_id = $response['_scroll_id'];

        while (true) {
            $response = $this->client->scroll(compact('scroll_id', 'scroll'));

            if (count($response['hits']['hits']) > 0) {
                call_user_func($callback, $response['hits']['hits']);
                $scroll_id = $response['_scroll_id'];
            } else {
                break;
            }
        }
    }

    /**
     * Remove all items from the cache.
     *
     * @return void
     */
    public function flush()
    {
        $this->scrollAndScan(function ($hits) {
            collect($hits)->each(function ($hit) {
                $this->forget($hit['_id']);
            });
        });
    }

    /**
     * Get the underlying Elasticsearch client.
     *
     * @return \Elasticsearch\Client
     */
    public function getClient()
    {
        return $this->client;
    }

    /**
     * Get the prefix. (used as index)
     *
     * @return string
     */
    public function getPrefix()
    {
        return $this->prefix;
    }

    /**
     * Set the prefix. (used as index)
     *
     * @param  string  $prefix
     * @return void
     */
    public function setPrefix($prefix)
    {
        $this->prefix = $prefix;
    }

    /**
     * Get the Elasticsearch index name.
     *
     * @return string
     */
    public function getIndex()
    {
        return $this->prefix.'-cache';
    }
}