
View on GitHub


35 mins
Test Coverage

namespace Datashaman\Elasticsearch\Cache;

use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\Missing404Exception;
use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
use GuzzleHttp\Ring\Future\FutureArray;
use Illuminate\Cache\TaggableStore;
use Illuminate\Contracts\Cache\Store;

class ElasticsearchStore extends TaggableStore implements Store
     * The Elasticsearch client instance.
     * @var \Elasticsearch\Client
    protected $client;

     * The prefix, used in creating the Elasticsearch index name (prefix-cache).
     * @var string
    protected $prefix;

     * The Elasticsearch document type.
     * @var string
    protected $type = 'cache';

     * Create a new Elasticsearch store.
     * @param  \Elasticsearch\Client  $client
     * @param  string                 $prefix
     * @return void
    public function __construct(Client $client, $prefix='')
        $this->client = $client;

    protected function unserialize($value)
        // return json_decode($value, true);
        return is_numeric($value) ? $value : unserialize($value);

    protected function serialize($value)
        // return json_encode($value);
        return is_numeric($value) ? $value : serialize($value);

    protected function resolveWait($result)
        while ($result instanceof FutureArray) {
            $result = $result->wait();
        return $result;

     * Retrieve an item from the cache by key.
     * @param  string|array  $key
     * @return mixed
    public function get($key)
        $tries = 0;

        while (true) {
            try {
                $response = $this->client->get([
                    'index' => $this->getIndex(),
                    'type' => $this->type,
                    'id' => $key,

                return $this->unserialize($response['_source']['value']);
            } catch (Missing404Exception $e) {
                return null;
            } catch (ServerErrorResponseException $e) {
                // assume it'll clear up, sleep for a bit

            if ($tries > 3) {
                return null;

     * Retrieve multiple items from the cache by key.
     * Items not found in the cache will have a null value.
     * @param  array  $keys
     * @return array
    public function many(array $keys)
        $params = [
            'index' => $this->getIndex(),
            'body' => [
                'query' => [
                    'constant_score' => [
                        'filter' => [
                            'ids' => [
                                'type' => $this->type,
                                'values' => $keys,
            'size' => count($keys),

        $response = $this->client->search($params);

        $return = collect($response['hits']['hits'])
            ->reduce(function ($carry, $hit) {
                $carry[$hit['_id']] = $this->unserialize($hit['_source']['value']);
                return $carry;
            }, collect())

        return $return;

     * Store an item in the cache for a given number of minutes.
     * @param  string  $key
     * @param  mixed   $value
     * @param  int     $minutes
     * @return void
    public function put($key, $value, $minutes)
        $value = $this->serialize($value);

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,
            'body' => compact('value'),

        if ($minutes > 0) {
            $params['ttl'] = $minutes.'m';


     * Store multiple items in the cache for a given number of minutes.
     * @param  array  $values
     * @param  int  $minutes
     * @return void
    public function putMany(array $values, $minutes)
        $meta = [];

        if ($minutes > 0) {
            $meta['_ttl'] = $minutes.'m';

        $rows = collect();

        foreach ($values as $key => $raw) {
            $meta['_id'] = $key;

            $params['index'] = $meta;
            $rows[] = $params;

            $value = $this->serialize($raw);
            $rows[] = compact('value');

        $body = $rows->map('json_encode')
            ->implode("\n") . "\n";

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'body' => $body,


     * Increment the value of an item in the cache.
     * @param  string  $key
     * @param  mixed   $value
     * @return int|bool
    public function increment($key, $value = 1)
        $original = $this->get($key);

        $value += $original;

        $doc = compact('value');

        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,
            'body' => compact('doc'),
            'retry_on_conflict' => 3,


     * Decrement the value of an item in the cache.
     * @param  string  $key
     * @param  mixed   $value
     * @return int|bool
    public function decrement($key, $value = 1)
        return $this->increment($key, -$value);

     * Store an item in the cache indefinitely.
     * @param  string  $key
     * @param  mixed   $value
     * @return void
    public function forever($key, $value)
        $this->put($key, $value, 0);

     * Remove an item from the cache.
     * @param  string  $key
     * @return bool
    public function forget($key)
        $params = [
            'index' => $this->getIndex(),
            'type' => $this->type,
            'id' => $key,

        return $this->resolveWait($this->client->delete($params));

    public function scrollAndScan(callable $callback, $scroll = '10s', $size = 50, $query = ['match_all' => []])
        $params = [
            'search_type' => 'scan',    // use search_type=scan
            'scroll' => $scroll,        // the length of time to hold the context consistent (make it short, eg 10s)
            'size' => $size,            // how many results *per shard* you want back
            'index' => $this->getIndex(),
            'type' => $this->type,
            'body' => [
                'query' => $query,

        $response = $this->client->search($params);
        $scroll_id = $response['_scroll_id'];

        while (true) {
            $response = $this->client->scroll(compact('scroll_id', 'scroll'));

            if (count($response['hits']['hits']) > 0) {
                call_user_func($callback, $response['hits']['hits']);
                $scroll_id = $response['_scroll_id'];
            } else {

     * Remove all items from the cache.
     * @return void
    public function flush()
        $this->scrollAndScan(function ($hits) {
            collect($hits)->each(function ($hit) {

     * Get the underlying Elasticsearch client.
     * @return \Elasticsearch\Client
    public function getClient()
        return $this->client;

     * Get the prefix. (used as index)
     * @return string
    public function getPrefix()
        return $this->prefix;

     * Set the prefix. (used as index)
     * @param  string  $prefix
     * @return void
    public function setPrefix($prefix)
        $this->prefix = $prefix;

     * Get the Elasticsearch index name.
     * @return string
    public function getIndex()
        return $this->prefix.'-cache';