lib/rbhive/t_c_l_i_connection.rb
# suppress warnings
old_verbose, $VERBOSE = $VERBOSE, nil
raise 'Thrift is not loaded' unless defined?(Thrift)
raise 'RBHive is not loaded' unless defined?(RBHive)
# require thrift autogenerated files
require File.join(File.dirname(__FILE__), *%w[.. thrift t_c_l_i_service_constants])
require File.join(File.dirname(__FILE__), *%w[.. thrift t_c_l_i_service])
require File.join(File.dirname(__FILE__), *%w[.. thrift sasl_client_transport])
# restore warnings
$VERBOSE = old_verbose
# Monkey patch thrift to set an infinite read timeout
module Thrift
class HTTPClientTransport < BaseTransport
def flush
http = Net::HTTP.new @url.host, @url.port
http.use_ssl = @url.scheme == 'https'
http.read_timeout = nil
http.verify_mode = @ssl_verify_mode if @url.scheme == 'https'
resp = http.post(@url.request_uri, @outbuf, @headers)
data = resp.body
data = Bytes.force_binary_encoding(data)
@inbuf = StringIO.new data
@outbuf = Bytes.empty_byte_buffer
end
end
end
module RBHive
HIVE_THRIFT_MAPPING = {
10 => 0,
11 => 1,
12 => 2,
13 => 6,
:cdh4 => 0,
:cdh5 => 4,
:PROTOCOL_V1 => 0,
:PROTOCOL_V2 => 1,
:PROTOCOL_V3 => 2,
:PROTOCOL_V4 => 3,
:PROTOCOL_V5 => 4,
:PROTOCOL_V6 => 5,
:PROTOCOL_V7 => 6
}
def tcli_connect(server, port = 10_000, options)
logger = options.key?(:logger) ? options.delete(:logger) : StdOutLogger.new
connection = RBHive::TCLIConnection.new(server, port, options, logger)
ret = nil
begin
connection.open
connection.open_session
ret = yield(connection)
ensure
# Try to close the session and our connection if those are still open, ignore io errors
begin
connection.close_session if connection.session
connection.close
rescue IOError => e
# noop
end
end
ret
end
module_function :tcli_connect
class StdOutLogger
%w(fatal error warn info debug).each do |level|
define_method level.to_sym do |message|
STDOUT.puts(message)
end
end
end
class TCLIConnection
attr_reader :client
def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
options ||= {} # backwards compatibility
raise "'options' parameter must be a hash" unless options.is_a?(Hash)
if options[:transport] == :sasl and options[:sasl_params].nil?
raise ":transport is set to :sasl, but no :sasl_params option was supplied"
end
# Defaults to buffered transport, Hive 0.10, 1800 second timeout
options[:transport] ||= :buffered
options[:hive_version] ||= 10
options[:timeout] ||= 1800
@options = options
# Look up the appropriate Thrift protocol version for the supplied Hive version
@thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
@logger = logger
@transport = thrift_transport(server, port)
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = Hive2::Thrift::TCLIService::Client.new(@protocol)
@session = nil
@logger.info("Connecting to HiveServer2 #{server} on port #{port}")
end
def thrift_hive_protocol(version)
HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end
def thrift_transport(server, port)
@logger.info("Initializing transport #{@options[:transport]}")
case @options[:transport]
when :buffered
return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
when :sasl
return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
parse_sasl_params(@options[:sasl_params]))
when :http
return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
else
raise "Unrecognised transport type '#{transport}'"
end
end
def thrift_socket(server, port, timeout)
socket = Thrift::Socket.new(server, port)
socket.timeout = timeout
socket
end
# Processes SASL connection params and returns a hash with symbol keys or a nil
def parse_sasl_params(sasl_params)
# Symbilize keys in a hash
if sasl_params.kind_of?(Hash)
return sasl_params.inject({}) do |memo,(k,v)|
memo[k.to_sym] = v;
memo
end
end
return nil
end
def open
@transport.open
end
def close
@transport.close
end
def open_session
@session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end
def close_session
@client.CloseSession prepare_close_session
@session = nil
end
def session
@session && @session.sessionHandle
end
def client
@client
end
def execute(query)
@logger.info("Executing Hive Query: #{query}")
req = prepare_execute_statement(query)
exec_result = client.ExecuteStatement(req)
raise_error_if_failed!(exec_result)
exec_result
end
def priority=(priority)
set("mapred.job.priority", priority)
end
def queue=(queue)
set("mapred.job.queue.name", queue)
end
def set(name,value)
@logger.info("Setting #{name}=#{value}")
self.execute("SET #{name}=#{value}")
end
# Async execute
def async_execute(query)
@logger.info("Executing query asynchronously: #{query}")
exec_result = @client.ExecuteStatement(
Hive2::Thrift::TExecuteStatementReq.new(
sessionHandle: @session.sessionHandle,
statement: query,
runAsync: true
)
)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
# Return handles to get hold of this query / session again
{
session: @session.sessionHandle,
guid: op_handle.operationId.guid,
secret: op_handle.operationId.secret
}
end
# Is the query complete?
def async_is_complete?(handles)
async_state(handles) == :finished
end
# Is the query actually running?
def async_is_running?(handles)
async_state(handles) == :running
end
# Has the query failed?
def async_is_failed?(handles)
async_state(handles) == :error
end
def async_is_cancelled?(handles)
async_state(handles) == :cancelled
end
def async_cancel(handles)
@client.CancelOperation(prepare_cancel_request(handles))
end
# Map states to symbols
def async_state(handles)
response = @client.GetOperationStatus(
Hive2::Thrift::TGetOperationStatusReq.new(operationHandle: prepare_operation_handle(handles))
)
case response.operationState
when Hive2::Thrift::TOperationState::FINISHED_STATE
return :finished
when Hive2::Thrift::TOperationState::INITIALIZED_STATE
return :initialized
when Hive2::Thrift::TOperationState::RUNNING_STATE
return :running
when Hive2::Thrift::TOperationState::CANCELED_STATE
return :cancelled
when Hive2::Thrift::TOperationState::CLOSED_STATE
return :closed
when Hive2::Thrift::TOperationState::ERROR_STATE
return :error
when Hive2::Thrift::TOperationState::UKNOWN_STATE
return :unknown
when Hive2::Thrift::TOperationState::PENDING_STATE
return :pending
when nil
raise "No operation state found for handles - has the session been closed?"
else
return :state_not_in_protocol
end
end
# Async fetch results from an async execute
def async_fetch(handles, max_rows = 100)
# Can't get data from an unfinished query
unless async_is_complete?(handles)
raise "Can't perform fetch on a query in state: #{async_state(handles)}"
end
# Fetch and
fetch_rows(prepare_operation_handle(handles), :first, max_rows)
end
# Performs a query on the server, fetches the results in batches of *batch_size* rows
# and yields the result batches to a given block as arrays of rows.
def async_fetch_in_batch(handles, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?
# Can't get data from an unfinished query
unless async_is_complete?(handles)
raise "Can't perform fetch on a query in state: #{async_state(handles)}"
end
# Now let's iterate over the results
loop do
rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size)
break if rows.empty?
yield rows
end
end
def async_close_session(handles)
validate_handles!(handles)
@client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
end
# Pull rows from the query result
def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
fetch_results = @client.FetchResults(fetch_req)
raise_error_if_failed!(fetch_results)
rows = fetch_results.results.rows
TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
end
# Performs a explain on the supplied query on the server, returns it as a ExplainResult.
# (Only works on 0.12 if you have this patch - https://issues.apache.org/jira/browse/HIVE-5492)
def explain(query)
rows = []
fetch_in_batch("EXPLAIN " + query) do |batch|
rows << batch.map { |b| b[:Explain] }
end
ExplainResult.new(rows.flatten)
end
# Performs a query on the server, fetches up to *max_rows* rows and returns them as an array.
def fetch(query, max_rows = 100)
# Execute the query and check the result
exec_result = execute(query)
raise_error_if_failed!(exec_result)
# Get search operation handle to fetch the results
op_handle = exec_result.operationHandle
# Fetch the rows
fetch_rows(op_handle, :first, max_rows)
end
# Performs a query on the server, fetches the results in batches of *batch_size* rows
# and yields the result batches to a given block as arrays of rows.
def fetch_in_batch(query, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?
# Execute the query and check the result
exec_result = execute(query)
raise_error_if_failed!(exec_result)
# Get search operation handle to fetch the results
op_handle = exec_result.operationHandle
# Prepare fetch results request
fetch_req = prepare_fetch_results(op_handle, :next, batch_size)
# Now let's iterate over the results
loop do
rows = fetch_rows(op_handle, :next, batch_size)
break if rows.empty?
yield rows
end
end
def create_table(schema)
execute(schema.create_table_statement)
end
def drop_table(name)
name = name.name if name.is_a?(TableSchema)
execute("DROP TABLE `#{name}`")
end
def replace_columns(schema)
execute(schema.replace_columns_statement)
end
def add_columns(schema)
execute(schema.add_columns_statement)
end
def method_missing(meth, *args)
client.send(meth, *args)
end
private
def prepare_open_session(client_protocol)
req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : @options[:sasl_params] )
req.client_protocol = client_protocol
req
end
def prepare_close_session
::Hive2::Thrift::TCloseSessionReq.new( sessionHandle: self.session )
end
def prepare_execute_statement(query)
::Hive2::Thrift::TExecuteStatementReq.new( sessionHandle: self.session, statement: query.to_s, confOverlay: {} )
end
def prepare_fetch_results(handle, orientation=:first, rows=100)
orientation_value = "FETCH_#{orientation.to_s.upcase}"
valid_orientations = ::Hive2::Thrift::TFetchOrientation::VALUE_MAP.values
unless valid_orientations.include?(orientation_value)
raise ArgumentError, "Invalid orientation: #{orientation.inspect}"
end
orientation_const = eval("::Hive2::Thrift::TFetchOrientation::#{orientation_value}")
::Hive2::Thrift::TFetchResultsReq.new(
operationHandle: handle,
orientation: orientation_const,
maxRows: rows
)
end
def prepare_operation_handle(handles)
validate_handles!(handles)
Hive2::Thrift::TOperationHandle.new(
operationId: Hive2::Thrift::THandleIdentifier.new(guid: handles[:guid], secret: handles[:secret]),
operationType: Hive2::Thrift::TOperationType::EXECUTE_STATEMENT,
hasResultSet: false
)
end
def prepare_cancel_request(handles)
Hive2::Thrift::TCancelOperationReq.new(
operationHandle: prepare_operation_handle(handles)
)
end
def validate_handles!(handles)
unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session)
raise "Invalid handles hash: #{handles.inspect}"
end
end
def get_schema_for(handle)
req = ::Hive2::Thrift::TGetResultSetMetadataReq.new( operationHandle: handle )
metadata = client.GetResultSetMetadata( req )
metadata.schema
end
# Raises an exception if given operation result is a failure
def raise_error_if_failed!(result)
return if result.status.statusCode == 0
error_message = result.status.errorMessage || 'Execution failed!'
raise RBHive::TCLIConnectionError.new(error_message)
end
end
class TCLIConnectionError < StandardError; end
end