tumugi/tumugi-plugin-bigquery

View on GitHub
lib/tumugi/plugin/bigquery/client.rb

Summary

Maintainability
C
7 hrs
Test Coverage
require 'kura'
require 'json'
require_relative './error'

Tumugi::Config.register_section('bigquery', :project_id, :client_email, :private_key, :private_key_file)

module Tumugi
  module Plugin
    module Bigquery
      class Client
        attr_reader :project_id

        def initialize(project_id: nil, client_email: nil, private_key: nil, private_key_file: nil)
          @project_id = project_id

          if client_email.nil? && private_key.nil? && !private_key_file.nil?
            @client = Kura.client(private_key_file)
            if @project_id.nil?
              key = JSON.parse(File.read(private_key_file))
              @project_id = key['project_id']
            end
          else
            # This method call style is needed for jruby.
            # JRuby cannot handle correctly if method using keyword hash and last hash argument.
            # see https://bugs.ruby-lang.org/issues/7529
            @client = Kura.client(project_id = { "project_id" => @project_id, "client_email" => client_email, "private_key" => private_key },
                                  client_email = nil, private_key = nil, {http_options: {timeout: 60}})
          end
        rescue Kura::ApiError => e
          process_error(e)
        end

        def projects(limit: 1000, &blk)
          @client.projects(limit: limit, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def datasets(project_id: nil, all: false, limit: 1000, &blk)
          @client.datasets(project_id: project_id || @project_id, all: all, limit: limit, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def dataset(dataset_id, project_id: nil, &blk)
          @client.dataset(dataset_id, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def dataset_exist?(dataset_id, project_id: nil)
          !@client.dataset(dataset_id, project_id: project_id || @project_id).nil?
        rescue Kura::ApiError => e
          process_error(e)
        end

        def insert_dataset(dataset_id, project_id: nil, &blk)
          @client.insert_dataset(dataset_id, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def delete_dataset(dataset_id, project_id: nil, delete_contents: true, &blk)
          @client.delete_dataset(dataset_id, project_id: project_id || @project_id, delete_contents: delete_contents, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def patch_dataset(dataset_id, project_id: nil,
                          access: nil,
                          description: :na,
                          default_table_expiration_ms: :na,
                          friendly_name: :na,
                          &blk)
          @client.patch_dataset(dataset_id, project_id: project_id || @project_id,
                                access: access,
                                description: description,
                                default_table_expiration_ms: default_table_expiration_ms,
                                friendly_name: friendly_name,
                                &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def tables(dataset_id, project_id: nil, limit: 1000, &blk)
          @client.tables(dataset_id, project_id: project_id || @project_id, limit: limit, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def table(dataset_id, table_id, project_id: nil)
          @client.table(dataset_id, table_id, project_id: project_id || @project_id)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def table_exist?(dataset_id, table_id, project_id: nil)
          !@client.table(dataset_id, table_id, project_id: project_id || @project_id).nil?
        rescue Kura::ApiError => e
          process_error(e)
        end

        def insert_table(dataset_id, table_id,
                          project_id: nil,
                          expiration_time: nil,
                          friendly_name: nil,
                          schema: nil,
                          description: nil,
                          query: nil,
                          external_data_configuration: nil,
                          &blk)
          @client.insert_table(dataset_id, table_id,
                                project_id: project_id || @project_id,
                                expiration_time: expiration_time,
                                friendly_name: friendly_name,
                                schema: schema,
                                description: description,
                                query: query,
                                external_data_configuration: external_data_configuration,
                                &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def delete_table(dataset_id, table_id, project_id: nil, &blk)
          @client.delete_table(dataset_id, table_id, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def patch_table(dataset_id, table_id,
                        project_id: nil,
                        expiration_time: :na,
                        friendly_name: :na,
                        description: :na,
                        &blk)
          @client.patch_table(dataset_id, table_id,
                              project_id: project_id || @project_id,
                              expiration_time: expiration_time,
                              friendly_name: friendly_name,
                              description: description,
                              &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def list_tabledata(dataset_id, table_id, project_id: nil, start_index: 0, max_result: 100, page_token: nil, schema: nil, &blk)
          @client.list_tabledata(dataset_id, table_id,
                                  project_id: project_id || @project_id,
                                  start_index: start_index,
                                  max_result: max_result,
                                  page_token: page_token,
                                  schema: schema,
                                  &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def insert_tabledata(dataset_id, table_id, rows, project_id: nil, ignore_unknown_values: false, skip_invalid_rows: false, template_suffix: nil)
          @client.insert_tabledata(dataset_id, table_id, rows,
                                  project_id: project_id || @project_id,
                                  ignore_unknown_values: ignore_unknown_values,
                                  skip_invalid_rows: skip_invalid_rows,
                                  template_suffix: template_suffix)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def insert_job(configuration, job_id: nil, project_id: nil, media: nil, wait: nil, &blk)
          @client.insert_job(configuration, job_id: job_id, project_id: project_id || @project_id, media: media, wait: wait, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def query(sql, mode: :truncate,
                  dataset_id: nil, table_id: nil,
                  allow_large_results: true,
                  flatten_results: true,
                  priority: "INTERACTIVE",
                  use_query_cache: true,
                  use_legacy_sql: true,
                  user_defined_function_resources: nil,
                  project_id: nil,
                  job_project_id: nil,
                  job_id: nil,
                  wait: nil,
                  dry_run: false,
                  &blk)
          @client.query(sql, mode: mode,
                        dataset_id: dataset_id, table_id: table_id,
                        allow_large_results: allow_large_results,
                        flatten_results: flatten_results,
                        priority: priority,
                        use_query_cache: use_query_cache,
                        use_legacy_sql: use_legacy_sql,
                        user_defined_function_resources: user_defined_function_resources,
                        project_id: project_id || @project_id,
                        job_project_id: job_project_id || @project_id,
                        job_id: job_id,
                        wait: wait,
                        dry_run: dry_run,
                        &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def load(dataset_id, table_id, source_uris=nil,
                 schema: nil,
                 field_delimiter: ",",
                 mode: :append,
                 allow_jagged_rows: false,
                 max_bad_records: 0,
                 ignore_unknown_values: false,
                 allow_quoted_newlines: false,
                 quote: '"',
                 skip_leading_rows: 0,
                 source_format: "CSV",
                 project_id: nil,
                 job_project_id: nil,
                 job_id: nil,
                 file: nil, wait: nil,
                 dry_run: false,
                 &blk)
          @client.load(dataset_id, table_id, source_uris=source_uris,
                       schema: schema,
                       field_delimiter: field_delimiter,
                       mode: mode,
                       allow_jagged_rows: allow_jagged_rows,
                       max_bad_records: max_bad_records,
                       ignore_unknown_values: ignore_unknown_values,
                       allow_quoted_newlines: allow_quoted_newlines,
                       quote: quote,
                       skip_leading_rows: skip_leading_rows,
                       source_format: source_format,
                       project_id: project_id || @project_id,
                       job_project_id: job_project_id || @project_id,
                       job_id: job_id,
                       file: file,
                       wait: wait,
                       dry_run: dry_run,
                       &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def extract(dataset_id, table_id, dest_uris,
                    compression: "NONE",
                    destination_format: "CSV",
                    field_delimiter: ",",
                    print_header: true,
                    project_id: nil,
                    job_project_id: nil,
                    job_id: nil,
                    wait: nil,
                    dry_run: false,
                    &blk)
          @client.extract(dataset_id, table_id, dest_uris,
                          compression: compression,
                          destination_format: destination_format,
                          field_delimiter: field_delimiter,
                          print_header: print_header,
                          project_id: project_id || @project_id,
                          job_project_id: job_project_id || @project_id,
                          job_id: job_id,
                          wait: wait,
                          dry_run: dry_run,
                          &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id,
                 mode: :truncate,
                 src_project_id: nil,
                 dest_project_id: nil,
                 job_project_id: dest_project_id,
                 job_id: nil,
                 wait: nil,
                 dry_run: false,
                 &blk)
          @client.copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id,
                       mode: mode,
                       src_project_id: src_project_id || @project_id,
                       dest_project_id: dest_project_id || @project_id,
                       job_project_id: job_project_id || @project_id,
                       job_id: job_id,
                       wait: wait,
                       dry_run: dry_run,
                       &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def job(job_id, project_id: nil, &blk)
          @client.job(job_id, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def cancel_job(job_id, project_id: nil, &blk)
          @client.cancel_job(job_id, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        def wait_job(job, timeout=60*10, project_id: nil, &blk)
          @client.wait_job(job, timeout, project_id: project_id || @project_id, &blk)
        rescue Kura::ApiError => e
          process_error(e)
        end

        private

        def process_error(e)
          raise Tumugi::Plugin::Bigquery::BigqueryError.new(e.message, e.reason)
        end
      end
    end
  end
end