lib/cassandra_object/adapters/cassandra_schemaless_adapter.rb
gem 'cassandra-driver'
require 'cassandra'
require 'logger'
module CassandraObject
module Adapters
class CassandraSchemalessAdapter < AbstractAdapter
class QueryBuilder
def initialize(adapter, scope)
@adapter = adapter
@scope = scope
end
def select_string
selected_values = @scope.select_values.select { |sv| sv == :column1 || sv == :values }
if selected_values.any?
(['KEY'] | selected_values) * ','
else
'*'
end
end
def to_query_async
# empty ids
return nil if !@scope.id_values.present? && !@scope.where_values.present? && !@scope.is_all && !@scope.limit_value.present?
if @scope.id_values.empty?
str = [
"SELECT #{select_string} FROM #{@scope.klass.column_family}",
where_string_async(nil)
]
str << 'ALLOW FILTERING' if @scope.klass.allow_filtering
return [] << str.delete_if(&:blank?) * ' '
end
str = [
"SELECT #{select_string} FROM #{@scope.klass.column_family}",
where_string_async(@scope.id_values)
]
str << 'ALLOW FILTERING' if @scope.klass.allow_filtering
[str.delete_if(&:blank?) * ' ']
end
def where_string_async(ids)
conditions = []
if ids.present?
conditions << if ids.size > 1
"#{@adapter.primary_key_column} IN (#{ids.map { |id| "'#{id}'" }.join(',')})"
else
"#{@adapter.primary_key_column} = '#{ids.first}'"
end
end
select_values = @scope.select_values.select { |sv| sv != :column1 }
if select_values.size > 0
select_str = select_values.size > 1 ? "column1 IN (#{select_values.map { |sv| '?' }.join(',')})" : 'column1 = ?'
conditions << select_str
end
conditions += @scope.where_values.select.each_with_index { |_, i| i.even? }
conditions.any? ? "WHERE #{conditions.join(' AND ')}" : nil
end
end
def primary_key_column
'key'
end
def cassandra_cluster_options
cluster_options = config.slice(*[
:auth_provider,
:client_cert,
:compression,
:compressor,
:connect_timeout,
:connections_per_local_node,
:connections_per_remote_node,
:consistency,
:write_consistency,
:credentials,
:futures_factory,
:hosts,
:load_balancing_policy,
:logger,
:page_size,
:passphrase,
:password,
:port,
:private_key,
:protocol_version,
:reconnection_policy,
:retry_policy,
:schema_refresh_delay,
:schema_refresh_timeout,
:server_cert,
:ssl,
:timeout,
:trace,
:username,
:heartbeat_interval,
:idle_timeout
])
{
# load_balancing_policy: 'Cassandra::LoadBalancing::Policies::%s',
reconnection_policy: 'Cassandra::Reconnection::Policies::%s',
retry_policy: 'Cassandra::Retry::Policies::%s'
}.each do |policy_key, class_template|
params = cluster_options[policy_key]
if params
if params.is_a?(Hash)
cluster_options[policy_key] = (class_template % [params[:policy].classify]).constantize.new(*params[:params] || [])
else
cluster_options[policy_key] = (class_template % [params.classify]).constantize.new
end
end
end
# Setting defaults
cluster_options.merge!({
heartbeat_interval: cluster_options.keys.include?(:heartbeat_interval) ? cluster_options[:heartbeat_interval] : 30,
idle_timeout: cluster_options[:idle_timeout] || 60,
max_schema_agreement_wait: 1,
consistency: cluster_options[:consistency] || :local_one,
write_consistency: cluster_options[:write_consistency] || cluster_options[:consistency] || :local_one,
protocol_version: cluster_options[:protocol_version] || 3,
page_size: cluster_options[:page_size] || 10000
})
cluster_options
end
def connection
@connection ||= begin
cluster = Cassandra.cluster cassandra_cluster_options
cluster.connect config[:keyspace]
end
end
def execute(statement, arguments = [])
consistency = config[:write_consistency] || config[:consistency]
# puts "schemaless adapter: #{consistency}"
ActiveSupport::Notifications.instrument('cql.cassandra_object', cql: statement) do
connection.execute statement, arguments: arguments, consistency: consistency, page_size: config[:page_size]
end
end
def execute_async(queries, arguments = [], per_page = nil, next_cursor = nil)
consistency = config[:consistency]
# puts "schemaless adapter async: #{consistency}"
retries = 0
per_page ||= config[:page_size]
futures = queries.map { |q|
ActiveSupport::Notifications.instrument('cql.cassandra_object', cql: q) do
connection.execute_async q, arguments: arguments, consistency: consistency, page_size: per_page, paging_state: next_cursor
end
}
futures.map do |future|
begin
rows = future.get
rows
rescue StandardError => e
retries += 1
sleep 0.01
retry if retries <= 3
raise e
end
end
end
def pre_select(scope, per_page = nil, next_cursor = nil)
query = "SELECT DISTINCT #{primary_key_column} FROM #{scope.klass.column_family}"
query << " LIMIT #{scope.limit_value}" if scope.limit_value == 1
ids = []
new_next_cursor = nil
execute_async([query], nil, per_page, next_cursor).each do |item|
item.rows.each { |x| ids << x[primary_key_column] }
new_next_cursor = item.paging_state unless item.last_page?
end
{ ids: ids, new_next_cursor: new_next_cursor }
end
def select(scope)
queries = QueryBuilder.new(self, scope).to_query_async
queries.compact! if queries.present?
raise CassandraObject::RecordNotFound if !queries.present?
arguments = scope.select_values.select { |sv| sv != :column1 }.map(&:to_s)
arguments += scope.where_values.select.each_with_index { |_, i| i.odd? }.reject { |c| c.empty? }.map(&:to_s)
records = execute_async(queries, arguments).map do |item|
# pagination
elems = []
loop do
item.rows.each { |x| elems << x }
break if item.last_page?
item = item.next_page
end
elems
end
{ results: records.flatten! }
end
def select_paginated(scope)
queries = QueryBuilder.new(self, scope).to_query_async
queries.compact! if queries.present?
raise CassandraObject::RecordNotFound if !queries.present?
arguments = scope.select_values.select { |sv| sv != :column1 }.map(&:to_s)
arguments += scope.where_values.select.each_with_index { |_, i| i.odd? }.reject { |c| c.empty? }.map(&:to_s)
new_next_cursor = nil
records = []
execute_async(queries, arguments, scope.limit_value, scope.next_cursor).each do |item|
new_next_cursor = item.paging_state unless item.last_page?
item.rows.each { |x| records << x }
end
{ results: records, new_next_cursor: new_next_cursor }
end
def insert(table, id, attributes, ttl = nil)
write(table, id, attributes, ttl)
end
def update(table, id, attributes, ttl = nil)
write(table, id, attributes, ttl)
end
def write(table, id, attributes, ttl)
queries = []
attributes.each do |column, value|
if !value.nil?
query = "INSERT INTO #{table} (#{primary_key_column},column1,value) VALUES (?,?,?)"
query += " USING TTL #{ttl}" if !ttl.nil?
args = [id.to_s, column.to_s, value.to_s]
queries << { query: query, arguments: args }
else
queries << { query: "DELETE FROM #{table} WHERE #{primary_key_column} = ? AND column1= ?", arguments: [id.to_s, column.to_s] }
end
end
execute_batchable(queries)
end
def delete(table, ids)
ids = [ids] if !ids.is_a?(Array)
arguments = nil
arguments = ids if ids.size == 1
statement = "DELETE FROM #{table} WHERE #{create_ids_where_clause(ids)}" # .gsub('?', ids.map { |id| "'#{id}'" }.join(','))
execute(statement, arguments)
end
def execute_batch(statements)
consistency = config[:write_consistency] || config[:consistency]
# puts "schemaless execute batch #{consistency}"
raise 'Statements is empty!' if statements.empty?
batch = connection.batch do |b|
statements.each do |statement|
b.add(statement[:query], arguments: statement[:arguments])
end
end
connection.execute(batch, consistency: consistency, page_size: config[:page_size])
end
# SCHEMA
def create_table(table_name, params = {})
stmt = "CREATE TABLE #{table_name} (" +
'key text,' +
'column1 text,' +
'value text,' +
'PRIMARY KEY (key, column1)' +
')'
# WITH COMPACT STORAGE
schema_execute statement_with_options(stmt, params[:options]), config[:keyspace]
end
def drop_table(table_name, confirm = false)
count = (schema_execute "SELECT count(*) FROM #{table_name}", config[:keyspace]).rows.first['count']
if confirm || count == 0
schema_execute "DROP TABLE #{table_name}", config[:keyspace]
else
raise "The table #{table_name} is not empty! If you want to drop it add the option confirm = true"
end
end
def schema_execute(cql, keyspace)
schema_db = Cassandra.cluster cassandra_cluster_options
connection = schema_db.connect keyspace
connection.execute cql, consistency: config[:write_consistency] || config[:consistency]
end
def cassandra_version
@cassandra_version ||= execute('select release_version from system.local').rows.first['release_version'].to_f
end
# /SCHEMA
def statement_create_with_options(stmt, options)
if !options.nil?
statement_with_options stmt, options
else
# standard
if cassandra_version < 3
"#{stmt} WITH COMPACT STORAGE
AND bloom_filter_fp_chance = 0.001
AND CLUSTERING ORDER BY (column1 ASC)
AND caching = '{\"keys\":\"ALL\", \"rows_per_partition\":\"NONE\"}'
AND comment = ''
AND compaction = {'min_sstable_size': '52428800', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'chunk_length_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.0
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 1.0
AND speculative_retry = 'NONE';"
else
"#{stmt} WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
AND comment = ''
AND compaction = {'class': 'SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
"
end
end
end
def create_ids_where_clause(ids)
return ids if ids.empty?
ids = ids.first if ids.is_a?(Array) && ids.one?
sql = ids.is_a?(Array) ? "#{primary_key_column} IN (#{ids.map { |id| "'#{id}'" }.join(',')})" : "#{primary_key_column} = ?"
sql
end
end
end
end