remomueller/aqueduct-postgresql

View on GitHub
lib/aqueduct/wrappers/postgresql.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'aqueduct'

module Aqueduct
  module Wrappers
    class Postgresql
      include Aqueduct::Wrapper

      def sql_codes
        { text: 'text', numeric: 'numeric', open: '"', close: '"' }
      end

      def connect
        @db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
      end

      def disconnect
        @db_connection.finish if @db_connection
        true
      end

      def query(sql_statement)
        results = []
        total_count = 0
        if @db_connection
          results = @db_connection.exec(sql_statement).values
          total_count = results.size
        end
        [results, total_count]
      end

      def connected?
        result = false
        error = ''
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
          status = db_connection.status
        rescue PG::Error => e
          error = "#{e.error}"
        ensure
          result = true if status == 0
          db_connection.finish if db_connection
        end
        { result: result, error: error }
      end

      def get_table_metadata
        result = {}
        error = ''
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
          if db_connection
            tables = []
            results = db_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';")
            tables = results.collect{|r| r["table_name"]}

            tables.sort{|table_a, table_b| table_a.downcase <=> table_b.downcase}.each do |my_table|
              results = db_connection.exec("SELECT column_name, data_type FROM information_schema.columns WHERE table_name ='#{my_table}';")
              columns = results.collect{ |r| { column: r["column_name"], datatype: r['data_type'] } }
              result[my_table] = columns.sort{|a,b| a[:column].downcase <=> b[:column].downcase}
            end
          end
        rescue PG::Error => e
          error = "#{e.error}"
        ensure
          db_connection.finish if db_connection
        end
        { result: result, error: error }
      end

      def tables
        tables = []
        error = ''
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
          if db_connection
            results = db_connection.exec("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';")
            tables = results.collect{|r| r["table_name"]}
          end
        rescue PG::Error => e
          error = "#{e.error}"
        ensure
          db_connection.finish if db_connection
        end
        { result: tables, error: error }
      end

      def table_columns(table)
        columns = []
        error = ''
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
          if db_connection
            results = db_connection.exec("SELECT column_name, data_type FROM information_schema.columns WHERE table_name ='#{table}';")
            columns = results.collect{ |r| { column: r["column_name"], datatype: r['data_type'] } }
          end
        rescue PG::Error => e
          error = "Error retrieving column information. Please make sure that this database is configured correctly."
        ensure
          db_connection.finish if db_connection
        end
        { columns: columns, error: error }
      end

      def get_all_values_for_column(table, column)
        values = []
        error = ''
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
          if db_connection
            results = db_connection.exec("SELECT column_name FROM information_schema.columns WHERE table_name ='#{table}';")
            columns = results.collect{ |r| r["column_name"] }
            column_found = columns.include?(column)

            if not column_found
              error += " <i>#{column}</i> does not exist in <i>#{@source.database}.#{table}</i>"
            else
              results = db_connection.exec("SELECT CAST(\"#{column}\" AS text) FROM \"#{table}\";")
              values = results.collect{|r| r[column.to_s]}
            end
          end
        rescue PG::Error => e
          error = "#{e.error}"
        ensure
          if db_connection
            db_connection.finish
          else
            error += " unable to connect to <i>#{@source.name}</i>"
          end
        end
        { values: values, error: error }
      end

      def column_values(table, column)
        error = ''
        result = []
        begin
          db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )

          results = db_connection.exec("SELECT column_name FROM information_schema.columns WHERE table_name ='#{table}';")
          columns = results.collect{ |r| r["column_name"] }
          column_found = columns.include?(column)

          if column_found
            results = db_connection.exec("SELECT CAST(\"#{column}\" AS text) FROM \"#{table}\" GROUP BY \"#{column}\";")
            result = results.collect{|r| r[column.to_s]}
          end
        rescue PG::Error => e
          error = "Error: #{e.inspect}"
        ensure
          db_connection.finish if db_connection
        end
        { result: result, error: error }
      end

      def count(query_concepts, conditions, tables, join_conditions, concept_to_count)
        result = 0
        error = ''
        sql_conditions = ''
        begin
          t = Time.now
          if tables.size > 0
            sql_conditions = "SELECT count(#{concept_to_count ? 'DISTINCT ' + concept_to_count : '*'}) as record_count FROM #{tables.join(', ')} WHERE #{join_conditions.join(' and ')}#{' and ' unless join_conditions.blank?}#{conditions}"
            Rails.logger.info sql_conditions
            db_connection = PG.connect( host: @source.host, user: @source.username, password: @source.password, dbname: @source.database, port: @source.port )
            if db_connection
              results = db_connection.exec(sql_conditions)
              result = results[0]["record_count"].to_i
            end
          else
            error = "Database [#{@source.name}] Error: No tables for concepts. Database not fully mapped."
          end
        rescue PG::Error => e
          error = "Database [#{@source.name}] Error: #{e}"
        ensure
          db_connection.finish if db_connection
        end
        { result: result, error: error, sql_conditions: sql_conditions }
      end
    end
  end
end