gooddata/gooddata-ruby

View on GitHub
lib/gooddata/helpers/data_helper.rb

Summary

Maintainability
A
1 hr
Test Coverage
# encoding: UTF-8
#
# Copyright (c) 2010-2017 GoodData Corporation. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

require 'csv'
require 'digest'
require 'open-uri'

module GoodData
  module Helpers
    class DataSource
      attr_reader :realized

      class << self
        def interpolate_sql_params(query, params)
          keys = query.scan(/\$\{([^\{]+)\}/).flatten
          keys.reduce(query) do |a, e|
            key = e
            raise "Param #{key} is not present in schedule params yet it is expected to be interpolated in the query" unless params.key?(key)
            a.gsub("${#{key}}", params[key])
          end
        end
      end

      def initialize(opts = {})
        opts = opts.is_a?(String) ? { type: :staging, path: opts } : opts
        opts = GoodData::Helpers.symbolize_keys(opts)
        @source = opts[:type]
        @options = opts
        @realized = false
      end

      def realize(params = {})
        @realized = true
        source = @source && @source.to_s
        case source
        when 'ads'
          realize_query(params)
        when 'staging'
          realize_staging(params)
        when 'web'
          realize_link
        when 's3'
          realize_s3(params)
        when 'redshift', 'snowflake', 'bigquery', 'postgresql', 'mssql', 'mysql'
          raise GoodData::InvalidEnvError, "DataSource does not support type \"#{source}\" on the platform #{RUBY_PLATFORM}" unless RUBY_PLATFORM =~ /java/
          require_relative '../cloud_resources/cloud_resources'
          realize_cloud_resource(source, params)
        when 'blobStorage'
          require_relative '../cloud_resources/blobstorage/blobstorage_client'
          blob_storage_client = GoodData::BlobStorageClient.new(params)
          blob_storage_client.realize_blob(@options[:file], params)
        else
          raise "DataSource does not support type \"#{source}\""
        end
      end

      def realized?
        @realized == true
      end

      private

      def realize_cloud_resource(type, params)
        cloud_resource_client = GoodData::CloudResources::CloudResourceFactory.create(type, params)
        cloud_resource_client.realize_query(@options[:query], params)
      end

      def realize_query(params)
        query = DataSource.interpolate_sql_params(@options[:query], params)
        dwh = params['ads_client'] || params[:ads_client] || raise("Data Source needs a client to ads to be able to query the storage but 'ads_client' is empty.")
        filename = Digest::SHA256.new.hexdigest(query)
        measure = Benchmark.measure do
          CSV.open(filename, 'w') do |csv|
            header_written = false
            header = nil
            dwh.execute_select(query) do |row|
              unless header_written
                header_written = true
                header = row.keys
                csv << header
              end
              csv << row.values_at(*header)
            end
          end
        end
        GoodData.logger.info "Realizing SQL query \"#{query}\" took #{measure.real}"
        filename
      end

      def realize_staging(params)
        path = @options[:path]
        url = URI.parse(path)
        filename = Digest::SHA256.new.hexdigest(path)
        if url.relative?
          params['gdc_project'].download_file(path, filename)
        else
          params['GDC_GD_CLIENT'].download_file(path, filename)
        end
        filename
      end

      def realize_link
        link = @options[:url]
        filename = Digest::SHA256.new.hexdigest(link)
        measure = Benchmark.measure do
          File.open(filename, 'w') do |f|
            open(link) { |rf| f.write(rf.read) }
          end
        end
        GoodData.logger.info("Realizing web download from \"#{link}\" took #{measure.real}")
        filename
      end

      def realize_s3(params)
        s3_client = params['s3_client'] && params['s3_client']['client']
        raise 'AWS client not present. Perhaps S3Middleware is missing in the brick definition?' if !s3_client || !s3_client.respond_to?(:bucket)
        bucket_name = @options[:bucket]
        key = @options[:key].present? ? @options[:key] : @options[:file]
        raise 'Key "bucket" is missing in S3 datasource' if bucket_name.blank?
        raise 'Key "key" or "file" is missing in S3 datasource' if key.blank?

        GoodData.logger.info("Realizing download from S3. Bucket #{bucket_name}, object with key #{key}.")
        filename = Digest::SHA256.new.hexdigest(@options.to_json)
        bucket = s3_client.bucket(bucket_name)
        obj = bucket.object(key)
        obj.get(response_target: filename, bucket: bucket_name, key: key)
        s3_size = obj.size
        actual_size = File.size(filename)
        GoodData.logger.info("File size in S3: #{s3_size}")
        GoodData.logger.info("Downloaded file size: #{actual_size}")
        unless s3_size == actual_size
          fail "Error downloading file #{key}. Expected size #{s3_size}, got #{actual_size}."
        end
        GoodData.logger.info('Done downloading file.')
        filename
      end
    end
  end
end