archan937/clickhouse

View on GitHub
lib/clickhouse/connection/query.rb

Summary

Maintainability
B
4 hrs
Test Coverage
require "clickhouse/connection/query/table"
require "clickhouse/connection/query/result_set"
require "clickhouse/connection/query/result_row"

module Clickhouse
  class Connection
    module Query

      def execute(query, body = nil)
        body = post(query, body)
        body.empty? ? true : body
      end

      def query(query)
        query = Utils.extract_format(query)[0]
        query += " FORMAT JSONCompact"
        parse_data get(query)
      end

      def databases
        query("SHOW DATABASES").flatten
      end

      def tables
        query("SHOW TABLES").flatten
      end

      def create_table(name, &block)
        execute(Clickhouse::Connection::Query::Table.new(name, &block).to_sql)
      end

      def describe_table(name)
        query("DESCRIBE TABLE #{name}").to_a
      end

      def rename_table(*args)
        names = (args[0].is_a?(Hash) ? args[0].to_a : [args]).flatten
        raise Clickhouse::InvalidQueryError, "Odd number of table names" unless (names.size % 2) == 0
        names = Hash[*names].collect{|(from, to)| "#{from} TO #{to}"}
        execute("RENAME TABLE #{names.join(", ")}")
      end

      def drop_table(name)
        execute("DROP TABLE #{name}")
      end

      def exists_table(name)
        execute("EXISTS TABLE #{name}").strip == '1'
      end

      def insert_rows(table, options = {})
        options[:csv] ||= begin
          options[:rows] ||= yield([])
          generate_csv options[:rows], options[:names]
        end
        execute("INSERT INTO #{table} FORMAT CSVWithNames", options[:csv])
      end

      def select_rows(options)
        query to_select_query(options)
      end

      def select_row(options)
        select_rows(options)[0]
      end

      def select_values(options)
        select_rows(options).collect{|row| row[0]}
      end

      def select_value(options)
        values = select_values(options)
        values[0] if values
      end

      def count(options)
        options = options.merge(:select => "COUNT(*)")
        select_value(options).to_i
      end

      def to_select_query(options)
        to_select_options(options).collect do |(key, value)|
          next if value.nil? || (value.respond_to?(:empty?) && value.empty?)

          statement = [key.to_s.upcase]
          statement << "BY" if %W(GROUP ORDER).include?(statement[0])
          statement << to_segment(key, value)
          statement.join(" ")

        end.compact.join("\n").force_encoding("UTF-8")
      end

    private

      def generate_csv(rows, names = nil)
        hashes = rows[0].is_a?(Hash)

        if hashes
          names ||= rows[0].keys
        end

        CSV.generate do |csv|
          csv << names if names
          rows.each do |row|
            csv << (hashes ? row.values_at(*names) : row)
          end
        end
      end

      def inspect_value(value)
        value.nil? ? "NULL" : value.inspect.gsub(/(^"|"$)/, "'").gsub("\\\"", "\"")
      end

      def to_select_options(options)
        keys = [:select, :from, :where, :group, :having, :order, :limit, :offset]

        options = Hash[keys.zip(options.values_at(*keys))]
        options[:select] ||= "*"
        options[:limit] ||= 0 if options[:offset]
        options[:limit] = options.values_at(:offset, :limit).compact.join(", ") if options[:limit]
        options.delete(:offset)

        options
      end

      def to_segment(type, value)
        case type
        when :select
          [value].flatten.join(", ")
        when :where, :having
          value.is_a?(Hash) ? to_condition_statements(value) : value
        else
          value
        end
      end

      def to_condition_statements(value)
        value.collect do |attr, val|
          if val == :empty
            "empty(#{attr})"
          elsif val.is_a?(Range)
            [
              "#{attr} >= #{inspect_value(val.first)}",
              "#{attr} <= #{inspect_value(val.last)}"
            ]
          elsif val.is_a?(Array)
            "#{attr} IN (#{val.collect{|x| inspect_value(x)}.join(", ")})"
          elsif val.to_s.match(/^`.*`$/)
            "#{attr} #{val.gsub(/(^`|`$)/, "")}"
          else
            "#{attr} = #{inspect_value(val)}"
          end
        end.flatten.join(" AND ")
      end

      def parse_data(data)
        names = data["meta"].collect{|column| column["name"]}
        types = data["meta"].collect{|column| column["type"]}
        ResultSet.new data["data"], names, types
      end

    end
  end
end