archan937/clickhouse

View on GitHub
lib/clickhouse/connection/client.rb

Summary

Maintainability
A
0 mins
Test Coverage
module Clickhouse
  class Connection
    module Client
      include ActiveSupport::NumberHelper

      def connect!
        ping! unless connected?
      end

      def ping!
        ensure_authentication
        status = client.get("/").status
        if status != 200
          raise ConnectionError, "Unexpected response status: #{status}"
        end
        true
      rescue Faraday::Error => e
        raise ConnectionError, e.message
      end

      def connected?
        instance_variables.include?(:@client) && !!@client
      end

      def get(query)
        request(:get, query)
      end

      def post(query, body = nil)
        request(:post, query, body)
      end

      def url
        "#{@config[:scheme]}://#{@config[:host]}:#{@config[:port]}"
      end

    private

      def client
        @client ||= Faraday.new(:url => url)
      end

      def ensure_authentication
        username, password = @config.values_at(:username, :password)
        client.basic_auth(username || "default", password) if username || password
      end

      def path(query)
        params = @config.select{|k, _v| k == :database}
        params[:query] = query
        params[:output_format_write_statistics] = 1
        query_string = params.collect{|k, v| "#{k}=#{CGI.escape(v.to_s)}"}.join("&")

        "/?#{query_string}"
      end

      def request(method, query, body = nil)
        connect!
        query = query.strip
        start = Time.now

        response = client.send(method, path(query), body)
        status = response.status
        duration = Time.now - start
        query, format = Utils.extract_format(query)
        response = parse_body(format, response.body)
        stats = parse_stats(response)

        write_log duration, query, stats
        raise QueryError, "Got status #{status} (expected 200): #{response}" unless status == 200
        response

      rescue Faraday::Error => e
        raise ConnectionError, e.message
      end

      def parse_body(format, body)
        case format
        when "JSON", "JSONCompact"
          body = JSON.parse(body) if body.strip[0] == "{"
        end
        body
      end

      def parse_stats(response)
        return {} unless response.is_a?(Hash)

        options = {:locale => :en, :precision => 2, :significant => false}
        stats = response["statistics"].merge("rows" => response["rows"])
        factor = 1 / stats["elapsed"]

        stats["elapsed"] = number_to_human_duration(stats["elapsed"])
        stats["rows_per_second"] = number_to_human(stats["rows_read"] * factor, options).downcase
        stats["data_per_second"] = number_to_human_size(stats["bytes_read"] * factor, options)
        stats["rows_read"] = number_to_human(stats["rows_read"], options).downcase
        stats["data_read"] = number_to_human_size(stats["bytes_read"], options)

        stats
      end

      def write_log(duration, query, stats)
        duration = number_to_human_duration(duration)

        rows,
        elapsed,
        rows_read,
        data_read,
        rows_per_second,
        data_per_second = stats.values_at(*%w(
          rows
          elapsed
          rows_read
          data_read
          rows_per_second
          data_per_second
        ))

        line1 = "\n \e[1mSQL (#{duration})\e[0m  #{query};"
        line2 = "\n  \e[1m#{rows} #{"row".pluralize(rows)} in set. Elapsed: #{elapsed}. Processed: #{rows_read} rows, #{data_read} (#{rows_per_second} rows/s, #{data_per_second}/s)\e[0m" if rows

        log :debug, "#{line1}#{line2} "
      end

      def number_to_human_duration(number)
        if (amount = number * 1000.0) < 1000
          round = (amount < 1) ? 3 : 1
          "#{amount.round(round)}ms"
        else
          "#{number.round(1)}s"
        end
      end

    end
  end
end