mipearson/elasticsearch-store

View on GitHub
lib/activesupport/cache/elasticsearch_store.rb

Summary

Maintainability
A
0 mins
Test Coverage
# encoding: utf-8

begin
  require 'elasticsearch'
rescue LoadError => e
  $stderr.puts <<-EOF
    You don't have elasticsearch installed in your application.
    Please add it to your Gemfile and run bundle install.
  EOF
  raise e
end

require 'digest/md5'
require 'active_support/core_ext/array/extract_options'
require 'active_support/core_ext/hash/slice'
require 'active_support/cache'

module ActiveSupport
  module Cache
    # A cache store implementation which stores data in ElasticSearch:
    # http://www.elasticsearch.org/
    #
    # This store is experimental and was developed against a non-clustered
    # ElasticSearch environment.
    class ElasticsearchStore < Store
      # Creates a new ElasticstoreSearch object, with the given elasticsearch
      # server addresses. The addresses and the randomize_hosts,
      # retry_on_failure and reload_connections options are passed verbatim
      # to Elasticsearch::Client.new.
      #
      # Defaults to 'localhost:9200' if no addresses are specified.
      #
      # Will create an index named 'cache' unless the :index_name option is
      # specified.
      def initialize(*addresses)
        addresses = addresses.flatten
        options = addresses.extract_options!

        es_options = options.extract!(:retry_on_failure, :randomize_hosts, :reload_connections)
        es_options.delete_if { |k, v| v.nil? }
        @index_name = options.delete(:index_name) || 'cache'

        if options[:namespace]
          raise ArgumentError, ":namespace option not yet supported"
        end

        super(options)

        addresses = %w(localhost:9200) if addresses.length == 0
        es_options[:addresses] = addresses

        @client = Elasticsearch::Client.new(es_options)
        @index_created = false
      end

      # Clear the entire cache in our Elasticsearch index. This method should
      # be used with care when shared cache is being used.
      def clear(_options = nil)
        # TODO: Support namespaces
        @client.delete_by_query index: @index_name, type: 'entry', body: { query: { match_all: {} } }
      end

      protected

      def read_entry(key, options) # :nodoc:
        document = @client.get index: @index_name, type: 'entry', fields: %w(_ttl _source), id: key

        fields = document['fields'] || {}
        source = document['_source'] || {}
        expires_in = (fields['_ttl'].to_f / 1000.0) if fields.key? '_ttl'

        Entry.new(
          source["value"],
          options.merge(expires_in: expires_in)
        )
      rescue Elasticsearch::Transport::Transport::Errors::NotFound
        nil
      end

      def write_entry(key, entry, _options) # :nodoc:
        request = { value: entry.value }
        if entry.expires_at
          expires_in = (entry.expires_at - Time.now.to_f) * 1000
          # If we somehow manage to get a negative expires, force it to 1ms
          # as ES will crack it otherwise.
          request[:_ttl] = [expires_in, 1].max
        end

        @client.index index: @index_name, type: 'entry', id: key, body: request
      end

      def delete_entry(key, _options) # :nodoc:
        @client.delete index: @index_name, type: 'entry', id: key
      end

      private

      def create_index
        @index_created = true

        # TODO: Check index mappings to make sure they match ours, warn if they don't.
        @client.indices.create index: @index_name, body: {
          settings: { index: {
            number_of_replicas: 0
          } },
          mappings: { entry: { properties: {
            _ttl: { enabled: true },
            value: { type: 'string', index: 'no' }
          } } }
        }
      rescue Elasticsearch::Transport::Transport::Errors::BadRequest => e
        raise unless e.message =~ /IndexAlreadyExists/
      end
    end
  end
end