weheartit/cassie

View on GitHub
lib/cassie.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

require "cassandra"

# This class provides a lightweight wrapper around the Cassandra driver. It provides
# a foundation for maintaining a connection and constructing CQL statements.
class Cassie
  require File.expand_path("../cassie/config.rb", __FILE__)
  require File.expand_path("../cassie/subscribers.rb", __FILE__)
  require File.expand_path("../cassie/model.rb", __FILE__)
  require File.expand_path("../cassie/schema.rb", __FILE__)
  require File.expand_path("../cassie/testing.rb", __FILE__)
  require File.expand_path("../cassie/railtie.rb", __FILE__) if defined?(Rails)

  class RecordNotFound < StandardError
  end

  class RecordInvalid < StandardError
    attr_reader :record

    def initialize(record)
      super("Errors on #{record.class.name}: #{record.errors.to_hash.inspect}")
      @record = record
    end
  end

  # Message passed to subscribers with the statement, options, and time for each statement
  # to execute. Note that if statements are batched they will be packed into one message
  # with a Cassandra::Statements::Batch statement and empty options.
  class Message
    attr_reader :statement, :options, :elapsed_time

    def initialize(statement, options, elapsed_time)
      @statement = statement
      @options = options
      @elapsed_time = elapsed_time
    end
  end

  attr_reader :config, :subscribers
  attr_accessor :consistency

  class << self
    # A singleton instance that can be shared to communicate with a Cassandra cluster.
    def instance
      unless defined?(@instance) && @instance
        instance = new(@config)
        @instance = instance
      end
      @instance
    end

    # Call this method to load the Cassie::Config from the specified file for the
    # specified environment.
    def configure!(options)
      if defined?(@instance) && @instance
        old_instance = @instance
        @instance = nil
        old_instance.disconnect
      end
      @config = Cassie::Config.new(options)
    end

    # This method can be used to set a consistency level for all Cassandra queries
    # within a block that don't explicitly define them. It can be used where consistency
    # is important (i.e. on validation queries) but where a higher level method
    # doesn't provide an option to set it.
    def consistency(level)
      save_val = Thread.current[:cassie_consistency]
      begin
        Thread.current[:cassie_consistency] = level
        yield
      ensure
        Thread.current[:cassie_consistency] = save_val
      end
    end

    # Get a Logger compatible object if it has been set.
    def logger
      @logger if defined?(@logger)
    end

    # Set a logger with a Logger compatible object.
    attr_writer :logger
  end

  def initialize(config)
    @config = config
    @monitor = Monitor.new
    @session = nil
    @prepared_statements = {}
    @last_prepare_warning = Time.now
    @subscribers = Subscribers.new
    @consistency = ((config.cluster || {})[:consistency] || :local_one)
  end

  # Open a connection to the Cassandra cluster.
  def connect
    start_time = Time.now
    cluster_config = config.cluster
    cluster_config = cluster_config.merge(logger: logger) if logger
    cluster = Cassandra.cluster(cluster_config)
    logger&.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms")
    @monitor.synchronize do
      unless @session
        @session = cluster.connect(config.default_keyspace)
        @prepared_statements = {}
      end
    end
  end

  # Close the connections to the Cassandra cluster.
  def disconnect
    logger&.info("Cassie.disconnect from #{config.sanitized_cluster}")
    old_session = nil
    @monitor.synchronize do
      old_session = @session
      @session = nil
      @prepared_statements = {}
    end
    old_session&.close
  end

  # Return true if the connection to the Cassandra cluster has been established.
  def connected?
    !!@session
  end

  # Force reconnection. If you're using this code in conjunction in a forking server environment
  # like passenger or unicorn you should call this method after forking.
  def reconnect
    existing_session = @session
    @monitor.synchronize do
      if @session == existing_session
        disconnect
        connect
      end
    end
  end

  # Prepare a CQL statement for repeate execution. Prepared statements
  # are cached on the driver until the connection is closed. Calling
  # prepare multiple times with the same CQL string will return
  # the prepared statement from a cache.
  def prepare(cql)
    raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String)
    statement = @prepared_statements[cql]
    cache_filled_up = false
    unless statement
      @monitor.synchronize do
        statement = session.prepare(cql)
        @prepared_statements[cql] = statement
        if @prepared_statements.size > config.max_prepared_statements
          # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used,
          # but that would require additional overhead on each query. This method will eventually
          # keep the most active queries in the cache and is overall more efficient.
          @prepared_statements.delete(@prepared_statements.first[0])
          cache_filled_up = true
        end
      end
    end

    if cache_filled_up && logger && Time.now > @last_prepare_warning + 10
      # Set a throttle on how often this message is logged so we don't kill performance enven more.
      @last_prepare_warning = Time.now
      logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.")
    end

    statement
  end

  # Declare and execute a batch statement. Any insert, update, or delete
  # calls made within the block will add themselves to the batch which
  # is executed at the end of the block.
  def batch(options = nil)
    if Thread.current[:cassie_batch]
      yield
    else
      begin
        batch = []
        Thread.current[:cassie_batch] = batch
        yield
        unless batch.empty?
          batch_statement = session.logged_batch
          batch.each do |cql, values|
            if values.blank?
              batch_statement.add(cql)
            else
              statement = prepare(cql)
              statement = statement.bind(Array(values)) if values.present?
              batch_statement.add(statement)
            end
          end
          execute(batch_statement, nil, options)
        end
      ensure
        Thread.current[:cassie_batch] = nil
      end
    end
  end

  # Find rows using the CQL statement. If the statement is a string
  # and values are provided then the statement will executed as a prepared
  # statement. In general all statements should be executed this way.
  #
  # If you have a statement without arguments, then you should call
  # prepare before and pass the prepared statement if you plan on
  # executing the same query multiple times.
  def find(cql, values = nil, options = nil)
    execute(cql, values, options)
  end

  # Insert a row from a hash into a table.
  #
  # You can specify a ttl for the created row by supplying a :ttl option.
  #
  # If this method is called inside a batch block it will be executed in the batch.
  def insert(table, values_hash, options = nil)
    columns = []
    values = []
    values_hash.each do |column, value|
      unless value.nil?
        columns << column
        values << value
      end
    end
    cql = "INSERT INTO #{table} (#{columns.join(", ")}) VALUES (#{question_marks(columns.size)})"

    if options&.include?(:ttl)
      options = options.dup
      ttl = options.delete(:ttl)
      if ttl
        cql += " USING TTL ?"
        values << Integer(ttl)
      end
    end

    batch_or_execute(cql, values, options)
  end

  # Update a row in a table. The values to update should be passed in the
  # values_hash while the primary key should be passed in the key_hash.
  #
  # You can specify a ttl for the created row by supplying a :ttl option.
  #
  # If this method is called inside a batch block it will be executed in the batch.
  def update(table, values_hash, key_hash, options = nil)
    key_cql, key_values = key_clause(key_hash)
    update_cql = []
    update_values = []
    if values_hash.is_a?(String)
      update_cql << values_hash
    else
      values_hash.each do |column, value|
        update_cql << "#{column} = ?"
        update_values << value
      end
    end
    values = update_values + key_values

    cql = "UPDATE #{table}"

    if options&.include?(:ttl)
      options = options.dup
      ttl = options.delete(:ttl)
      if ttl
        cql += " USING TTL ?"
        values.unshift(Integer(ttl))
      end
    end

    cql += " SET #{update_cql.join(", ")} WHERE #{key_cql}"

    batch_or_execute(cql, values, options)
  end

  # Delete a row from a table. You should pass the primary key value
  # in the key_hash.
  #
  # If this method is called inside a batch block it will be executed in the batch.
  def delete(table, key_hash, options = nil)
    key_cql, key_values = key_clause(key_hash)
    cql = "DELETE FROM #{table} WHERE #{key_cql}"
    batch_or_execute(cql, key_values, options)
  end

  # Execute an arbitrary CQL statment. If values are passed and the statement is a
  # string, it will be prepared and executed as a prepared statement.
  def execute(cql, values = nil, options = nil)
    start_time = Time.now
    begin
      statement = nil
      statement = if cql.is_a?(String)
        if values.present?
          prepare(cql)
        else
          Cassandra::Statements::Simple.new(cql)
        end
      else
        cql
      end

      if values.present?
        values = Array(values)
        options = (options ? options.merge(arguments: values) : {arguments: values})
      end

      # Set a default consistency from a block context if it isn't explicitly set.
      statement_consistency = current_consistency
      if statement_consistency
        if options
          options = options.merge(consistency: statement_consistency) if options[:consistency].nil?
        else
          options = {consistency: statement_consistency}
        end
      end

      session.execute(statement, options || {})
    rescue Cassandra::Errors::NoHostsAvailable => e
      reconnect
      raise e
    ensure
      if statement.is_a?(Cassandra::Statement) && !subscribers.empty?
        payload = Message.new(statement, options, Time.now - start_time)
        subscribers.each { |subscriber| subscriber.call(payload) }
      end
    end
  end

  # Return the current consistency level that has been set for statements.
  def current_consistency
    Thread.current[:cassie_consistency] || consistency
  end

  private

  def logger
    self.class.logger
  end

  def session
    unless connected?
      # Check again inside the monitor lock so we don't get in a race condition
      # where another thread has already established the connection.
      @monitor.synchronize do
        connect unless connected?
      end
    end
    @session
  end

  def batch_or_execute(cql, values, options = nil)
    batch = Thread.current[:cassie_batch]
    if batch
      batch << [cql, values]
      nil
    else
      execute(cql, values, options)
    end
  end

  def question_marks(size)
    "?#{",?" * (size - 1)}"
  end

  def key_clause(key_hash)
    cql = []
    values = []
    key_hash.each do |key, value|
      cql << "#{key} = ?"
      values << value
    end
    [cql.join(" AND "), values]
  end

  # Extract the CQL from a statement
  def statement_cql(statement, previous = nil)
    cql = nil
    if statement.respond_to?(:cql)
      cql = statement.cql
    elsif statement.respond_to?(:statements) && (previous.nil? || !previous.include?(statement))
      previous ||= []
      previous << statement
      cql = statement.statements.collect { |s| statement_cql(s, previous) }.join("; ")
    end
    cql
  end
end