lib/result_tree_to_hbase_converter.rb
require 'securerandom'
require 'thrift'
require_relative "executor"
require File.join(File.dirname(__FILE__), 'adapter', 'hbase')
module HipsterSqlToHbase
# This class provides the method necessary to execute the Thrift result
# generated after parsing the SQL sentence.
class ThriftCallGroup < Array
@incr = false
def initialize(arr,incr=false)
arr.each do |v|
self << v
end
@incr = incr
end
def execute(host=nil,port=nil)
HipsterSqlToHbase::Executor.new().execute(self,host,port,@incr)
end
end
# This class takes care of all HBase (Thrift) conversion magic by transforming
# the ResultTree objects into ThriftCallGroup objects.
class ResultTreeToHbaseConverter
# Depending on the SQL sentence type, call the appropriate function.
def convert(result_tree)
send("#{result_tree[:query_type].to_s}_sentence",result_tree[:query_hash])
end
# When SQL sentence is an INSERT query generate the Thrift mutations according
# to the specified query values.
def insert_sentence(hash)
thrift_method = "mutateRow"
thrift_table = hash[:into]
thrift_calls = []
hash[:values].each do |value_set|
thrift_row = SecureRandom.uuid
thrift_mutations = []
i = 0
hash[:columns].each do |col|
thrift_mutations << HBase::Mutation.new(column: col, value: value_set[i].to_s)
i += 1
end
thrift_calls << {:method => thrift_method,:arguments => [thrift_table,thrift_row,thrift_mutations,{}]}
end
HipsterSqlToHbase::ThriftCallGroup.new(thrift_calls,true)
end
# When SQL sentence is a SELECT query generate the Thrift filters according
# to the specified query values.
def select_sentence(hash)
thrift_method = "getRowsByScanner"
thrift_table = hash[:from]
thrift_columns = hash[:select]
thrift_filters = recurse_where(hash[:where] || [])
thrift_limit = hash[:limit]
HipsterSqlToHbase::ThriftCallGroup.new([{:method => thrift_method,:arguments => [thrift_table,thrift_columns,thrift_filters,thrift_limit,{}]}])
end
# When SQL sentence is a CREATE TABLE query generate the Thrift column descriptors/families
# in accordance to the specified query values.
def create_table_sentence(hash)
thrift_method = "createTable"
thrift_table = hash[:table]
thrift_columns = []
hash[:columns].each do |col_name|
col_descriptor = Hbase::ColumnDescriptor.new
col_descriptor.name = col_name
thrift_columns << col_descriptor
end
HipsterSqlToHbase::ThriftCallGroup.new([{:method => thrift_method,:arguments => [thrift_table,thrift_columns]}])
end
private
# Format the scanner filter for thrift based on the where clause(s)
# of a SELECT query.
def recurse_where(where_arr)
result_arr = []
where_arr.each do |val|
if val.is_a? Hash
result_arr << filters_from_key_value_pair(val)
elsif val.is_a? Array
result_arr << "(#{recurse_where(val)})"
elsif val.is_a? String
result_arr << val
else
raise "Recursive where undefined error."
end
end
result_arr.join(" ")
end
# Generate a Thrift QualifierFilter and ValueFilter from key value pair.
def filters_from_key_value_pair(kvp)
kvp[:qualifier] = kvp[:column].split(':')
kvp[:column] = kvp[:qualifier].shift
if (kvp[:condition].to_s != "LIKE")
"(SingleColumnValueFilter('#{kvp[:column]}','#{kvp[:qualifier].join(':')}',#{kvp[:condition]},'binary:#{kvp[:value]}',true,true))"
else
kvp[:value] = Regexp.escape(kvp[:value])
kvp[:value].sub!(/^%/,"^.*")
kvp[:value].sub!(/%$/,".*$")
while kvp[:value].match(/([^\\]{1,1})%/)
kvp[:value].sub!(/([^\\]{1,1})%/,"#{$1}.*?")
end
kvp[:value].sub!(/^_/,"^.")
kvp[:value].sub!(/_$/,".$")
while kvp[:value].match(/([^\\]{1,1})_/)
kvp[:value].sub!(/([^\\]{1,1})_/,"#{$1}.")
end
"(SingleColumnValueFilter('#{kvp[:column]}','#{kvp[:qualifier].join(':')}',=,'regexstring:#{kvp[:value]}',true,true))"
end
end
end
end