jeanlescure/hipster_sql_to_hbase

View on GitHub
lib/result_tree_to_hbase_converter.rb

Summary

Maintainability
A
50 mins
Test Coverage
require 'securerandom'
require 'thrift'
require_relative "executor"

require File.join(File.dirname(__FILE__), 'adapter', 'hbase')

module HipsterSqlToHbase
  
  # This class provides the method necessary to execute the Thrift result
  # generated after parsing the SQL sentence.
  class ThriftCallGroup < Array
    @incr = false
    def initialize(arr,incr=false)
      arr.each do |v|
        self << v
      end
      @incr = incr
    end
    def execute(host=nil,port=nil)
      HipsterSqlToHbase::Executor.new().execute(self,host,port,@incr)
    end
  end
  
  # This class takes care of all HBase (Thrift) conversion magic by transforming
  # the ResultTree objects into ThriftCallGroup objects.
  class ResultTreeToHbaseConverter
  
    # Depending on the SQL sentence type, call the appropriate function.
    def convert(result_tree)
      send("#{result_tree[:query_type].to_s}_sentence",result_tree[:query_hash])
    end
    
    # When SQL sentence is an INSERT query generate the Thrift mutations according
    # to the specified query values.
    def insert_sentence(hash)
      thrift_method = "mutateRow"
      thrift_table = hash[:into]
      thrift_calls = []
      hash[:values].each do |value_set|
        thrift_row = SecureRandom.uuid
        thrift_mutations = []
        i = 0
        hash[:columns].each do |col|
          thrift_mutations << HBase::Mutation.new(column: col, value: value_set[i].to_s)
          i += 1
        end
        thrift_calls << {:method => thrift_method,:arguments => [thrift_table,thrift_row,thrift_mutations,{}]}
      end
      HipsterSqlToHbase::ThriftCallGroup.new(thrift_calls,true)
    end
    
    # When SQL sentence is a SELECT query generate the Thrift filters according
    # to the specified query values.
    def select_sentence(hash)
      thrift_method = "getRowsByScanner"
      thrift_table = hash[:from]
      thrift_columns = hash[:select]
      thrift_filters = recurse_where(hash[:where] || [])
      thrift_limit = hash[:limit]
      
      HipsterSqlToHbase::ThriftCallGroup.new([{:method => thrift_method,:arguments => [thrift_table,thrift_columns,thrift_filters,thrift_limit,{}]}])
    end
    
    # When SQL sentence is a CREATE TABLE query generate the Thrift column descriptors/families
    # in accordance to the specified query values.
    def create_table_sentence(hash)
      thrift_method = "createTable"
      thrift_table = hash[:table]
      thrift_columns = []
      hash[:columns].each do |col_name|
        col_descriptor = Hbase::ColumnDescriptor.new
        col_descriptor.name = col_name
        thrift_columns << col_descriptor
      end
      
      HipsterSqlToHbase::ThriftCallGroup.new([{:method => thrift_method,:arguments => [thrift_table,thrift_columns]}])
    end
    
    private
    
    # Format the scanner filter for thrift based on the where clause(s)
    # of a SELECT query.
    def recurse_where(where_arr)
      result_arr = []
      where_arr.each do |val|
        if val.is_a? Hash
          result_arr << filters_from_key_value_pair(val)
        elsif val.is_a? Array
          result_arr << "(#{recurse_where(val)})"
        elsif val.is_a? String
          result_arr << val
        else
          raise "Recursive where undefined error."
        end
      end
      result_arr.join(" ")
    end
    
    # Generate a Thrift QualifierFilter and ValueFilter from key value pair.
    def filters_from_key_value_pair(kvp)
      kvp[:qualifier] = kvp[:column].split(':')
      kvp[:column] = kvp[:qualifier].shift
      if (kvp[:condition].to_s != "LIKE")
        "(SingleColumnValueFilter('#{kvp[:column]}','#{kvp[:qualifier].join(':')}',#{kvp[:condition]},'binary:#{kvp[:value]}',true,true))"
      else
        kvp[:value] = Regexp.escape(kvp[:value])
        kvp[:value].sub!(/^%/,"^.*")
        kvp[:value].sub!(/%$/,".*$")
        while kvp[:value].match(/([^\\]{1,1})%/)
          kvp[:value].sub!(/([^\\]{1,1})%/,"#{$1}.*?")
        end
        kvp[:value].sub!(/^_/,"^.")
        kvp[:value].sub!(/_$/,".$")
        while kvp[:value].match(/([^\\]{1,1})_/)
          kvp[:value].sub!(/([^\\]{1,1})_/,"#{$1}.")
        end
        "(SingleColumnValueFilter('#{kvp[:column]}','#{kvp[:qualifier].join(':')}',=,'regexstring:#{kvp[:value]}',true,true))"
      end
    end
    
  end
end