gooddata/gooddata-ruby

View on GitHub
lib/gooddata/rest/connection.rb

Summary

Maintainability
F
4 days
Test Coverage
# encoding: UTF-8
#
# Copyright (c) 2010-2021 GoodData Corporation. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

require 'terminal-table'
require 'securerandom'
require 'monitor'
require 'thread_safe'
require 'rest-client'

require_relative '../version'
require_relative '../exceptions/exceptions'

require_relative '../helpers/global_helpers'

require_relative 'phmap'

module GoodData
  module Rest
    class RestRetryError < StandardError
    end

    # Wrapper of low-level HTTP/REST client/library
    class Connection
      include MonitorMixin

      HTTPS_PROTOCOL = 'https://'
      HTTP_PROTOCOL = 'http://'
      DEFAULT_URL = 'https://secure.gooddata.com'
      LOGIN_PATH = '/gdc/account/login'
      TOKEN_PATH = '/gdc/account/token'
      KEYS_TO_SCRUB = [:password, :verifyPassword, :authorizationToken]
      API_LEVEL = 2

      ID_LENGTH = 16

      DEFAULT_HEADERS = {
        :content_type => :json,
        :accept => [:json, :zip],
        :user_agent => GoodData.gem_version_string
      }

      DEFAULT_WEBDAV_HEADERS = {
        :user_agent => GoodData.gem_version_string
      }

      CERTIFICATE_STORE = OpenSSL::X509::Store.new.freeze
      CERTIFICATE_STORE.set_default_paths

      DEFAULT_LOGIN_PAYLOAD = {
        :headers => DEFAULT_HEADERS,
        :verify_ssl => true,
        :ssl_cert_store => CERTIFICATE_STORE
      }

      RETRYABLE_ERRORS = [
        Net::HTTPBadResponse,
        RestClient::InternalServerError,
        RestClient::RequestTimeout,
        RestClient::MethodNotAllowed,
        SystemCallError,
        Timeout::Error
      ]

      RETRY_TIME_INITIAL_VALUE = 1
      RETRY_TIME_COEFFICIENT = 1.5
      RETRYABLE_ERRORS << Net::ReadTimeout if Net.const_defined?(:ReadTimeout)

      RETRYABLE_ERRORS << OpenSSL::SSL::SSLErrorWaitReadable if OpenSSL::SSL.const_defined?(:SSLErrorWaitReadable)

      class << self
        def construct_login_payload(username, password)
          res = {
            'postUserLogin' => {
              'login' => username,
              'password' => password,
              'remember' => 1,
              'verify_level' => 2
            }
          }
          res
        end

        # Generate random string with URL safe base64 encoding
        #
        # @param [String] length Length of random string to be generated
        #
        # @return [String] Generated random string
        def generate_string(length = ID_LENGTH)
          SecureRandom.urlsafe_base64(length)
        end

        # Retry block if exception thrown
        def retryable(options = {}, &_block)
          opts = { :tries => 12, :on => RETRYABLE_ERRORS }.merge(options)

          retry_exception = opts[:on]
          retries = opts[:tries]

          unless retry_exception.is_a?(Array)
            retry_exception = [retry_exception]
          end

          retry_time = RETRY_TIME_INITIAL_VALUE
          begin
            return yield
          rescue RestClient::Unauthorized, RestClient::Forbidden => e # , RestClient::Unauthorized => e
            raise e unless options[:refresh_token]
            raise e if options[:dont_reauth]

            options[:refresh_token].call # (dont_reauth: true)
            if (retries -= 1) > 0
              retry
            else
              fail e
            end
          rescue RestClient::TooManyRequests, RestClient::ServiceUnavailable, *retry_exception => e
            GoodData.logger.warn "#{e.message}, retrying in #{retry_time} seconds"
            sleep retry_time
            retry_time *= RETRY_TIME_COEFFICIENT
            # 10 requests with 1.5 coefficent should take ~ 3 mins to finish
            if (retries -= 1) > 0
              retry
            else
              fail e
            end
          end
          yield
        end
      end

      attr_reader :request_params

      # backward compatibility
      alias_method :cookies, :request_params
      alias_method :headers, :request_params
      attr_reader :server
      attr_reader :stats
      attr_reader :user
      attr_reader :verify_ssl

      def initialize(opts)
        super()
        @stats = ThreadSafe::Hash.new
        @execution_id = opts[:execution_id]

        headers = opts[:headers] || {}
        @webdav_headers = DEFAULT_WEBDAV_HEADERS.merge(headers)

        @user = nil
        @server = nil
        @opts = opts
        @verify_ssl = @opts[:verify_ssl] == false || @opts[:verify_ssl] == OpenSSL::SSL::VERIFY_NONE ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER

        # Initialize headers
        reset_headers!

        @at_exit_handler_installed = nil
      end

      # Connect using username and password
      def connect(username, password, options = {})
        server = options[:server] || Helpers::AuthHelper.read_server
        options = DEFAULT_LOGIN_PAYLOAD.merge(options)
        headers = options[:headers] || {}

        options = options.merge(headers)
        @server = RestClient::Resource.new fix_server_url(server), options

        # Install at_exit handler first
        unless @at_exit_handler_installed
          begin
            at_exit { disconnect if @user }
          rescue RestClient::Unauthorized
            GoodData.logger.info 'Already logged out'
          ensure
            @at_exit_handler_installed = true
          end
        end

        # Reset old cookies first
        if options[:sst_token]
          headers = {
            :x_gdc_authsst => options[:sst_token],
            :x_gdc_authtt => options[:tt_token]
          }
          merge_headers!(headers)
          get('/gdc/account/token', @request_params)

          @user = get('/gdc/account/profile/current')
          GoodData.logger.info("Connected using SST to server #{@server.url} to profile \"#{@user['accountSetting']['login']}\"")
          @auth = {}
          refresh_token :dont_reauth => true
        elsif  options[:headers][:x_gdc_authsst]
          @request_params = options[:headers]
          @user = get('/gdc/account/profile/current')
          GoodData.logger.info("Connected using SST to server #{@server.url} to profile \"#{@user['accountSetting']['login']}\"")
          @auth = {}
          refresh_token :dont_reauth => true
        else
          GoodData.logger.info("Connected using username \"#{username}\" to server #{@server.url}")
          credentials = Connection.construct_login_payload(username, password)
          generate_session_id
          @auth = post(LOGIN_PATH, credentials, :dont_reauth => true)['userLogin']

          refresh_token :dont_reauth => true
          @user = get(@auth['profile'])
        end
        GoodData.logger.info('Connection successful')
      rescue RestClient::Unauthorized => e
        GoodData.logger.info('Bad Login or Password')
        GoodData.logger.info('Connection failed')
        raise e
      rescue RestClient::Forbidden => e
        GoodData.logger.info('Connection failed')
        raise e
      end

      # Disconnect
      def disconnect
        # TODO: Wrap somehow
        url = @auth['state']

        begin
          clear_session_id
          delete(url, :x_gdc_authsst => sst_token, :tries => 0) if url
        rescue RestClient::Unauthorized
          begin
            refresh_token
            delete(url, :x_gdc_authsst => sst_token, :tries => 0) if url
          rescue RestClient::Unauthorized
            GoodData.logger.info 'Already disconnected'
          end
        end

        @auth = nil
        @server = nil
        @user = nil

        reset_headers!
      end

      # @param what Address of the remote file.
      # @param where Full path to the target file.
      # @option [Bool] :url_encode ('true') URL encode the address.
      def download(what, where, options = {})
        # handle the path (directory) given in what
        ilast_slash = what.rindex('/')
        if ilast_slash.nil?
          what_dir = ''
        else
          # take the directory from the path
          what_dir = what[0..ilast_slash - 1]
          # take the filename from the path
          what = what[ilast_slash + 1..-1]
        end

        option_dir = options[:directory] || ''
        option_dir = option_dir[0..-2] if option_dir[-1] == '/'

        # join the otion dir with the what_dir
        # [option dir empty, what dir empty] => the joined dir
        dir_hash = {
          [true, true] => '',
          [true, false] => what_dir,
          [false, true] => option_dir,
          [false, false] => "#{what_dir}/#{option_dir}"
        }
        dir = dir_hash[[option_dir.empty?, what_dir.empty?]]

        staging_uri = options[:staging_url].to_s

        base_url = dir.empty? ? staging_uri : URI.join("#{server}", staging_uri, "#{dir}/").to_s
        sanitized_what = options[:url_encode] == false ? what : CGI.escape(what)
        url = URI.join("#{server}", base_url, sanitized_what).to_s

        b = proc do |f|
          raw = {
            :headers => @webdav_headers.merge(:x_gdc_authtt => headers[:x_gdc_authtt]),
            :method => :get,
            :url => url,
            :verify_ssl => verify_ssl
          }
          RestClient::Request.execute(raw) do |chunk, _x, response|
            if response.code.to_s == '202'
              fail RestRetryError, 'Got 202, retry'
            elsif response.code.to_s != '200'
              fail ArgumentError, "Error downloading #{url}. Got response: #{response.code} #{response} #{response.body}"
            end
            f.write chunk
          end
        end

        GoodData::Rest::Connection.retryable(:tries => Helpers::GD_MAX_RETRY, :refresh_token => proc { refresh_token }, :on => RestRetryError) do
          if where.is_a?(IO) || where.is_a?(StringIO)
            b.call(where)
          else
            # Assume it is a string or file
            File.open(where, 'w') do |f|
              b.call(f)
            end
          end
        end
      end

      def refresh_token(_options = {})
        begin # rubocop:disable Style/RedundantBegin
          # avoid infinite loop GET fails with 401
          response = get(TOKEN_PATH, :x_gdc_authsst => sst_token, :dont_reauth => true)
          # Remove when TT sent in headers. Currently we need to parse from body
          merge_headers!(:x_gdc_authtt => GoodData::Helpers.get_path(response, %w(userToken token)))
        rescue Exception => e # rubocop:disable Style/RescueException
          raise e
        end
      end

      # Returns server URI
      #
      # @return [String] server uri
      def server_url
        @server && @server.url
      end

      # HTTP DELETE
      #
      # @param uri [String] Target URI
      def delete(uri, options = {})
        request(:delete, uri, nil, options)
      end

      # Helper for logging error
      #
      # @param e [RuntimeException] Exception to log
      # @param uri [String] Uri on which the request failed
      # @param params [Hash] Additional params
      def log_error(e, uri, params, options = {})
        return if e.response && e.response.code == 401 && !uri.include?('token') && !uri.include?('login')

        if options[:do_not_log].nil? || options[:do_not_log].index(e.class).nil?
          GoodData.logger.error(format_error(e, params))
        end
      end

      def request(method, uri, data, options = {}, &user_block)
        request_id = options[:request_id] || generate_request_id
        log_info(options.merge(request_id: request_id))
        stats_on = options[:stats_on]

        payload = data.is_a?(Hash) ? data.to_json : data

        GoodData.rest_logger.info "#{method.to_s.upcase}: #{@server.url}#{uri}, #{scrub_params(data, KEYS_TO_SCRUB)}"
        profile method.to_s.upcase, uri, request_id, stats_on do
          b = proc do
            params = fresh_request_params(request_id).merge(options)

            params['X-GDC-VERSION'] = API_LEVEL

            case method
            when :get
              @server[uri].get(params, &user_block)
            when :put
              @server[uri].put(payload, params)
            when :delete
              @server[uri].delete(params)
            when :post
              @server[uri].post(payload, params)
            end
          end
          process_response(options, &b)
        end
      end

      # HTTP GET
      #
      # @param uri [String] Target URI
      def get(uri, options = {}, &user_block)
        request(:get, uri, nil, options, &user_block)
      end

      # HTTP PUT
      #
      # @param uri [String] Target URI
      def put(uri, data, options = {})
        request(:put, uri, data, options)
      end

      # HTTP POST
      #
      # @param uri [String] Target URI
      def post(uri, data = nil, options = {})
        request(:post, uri, data, options)
      end

      # Reader method for SST token
      #
      # @return uri [String] SST token
      def sst_token
        request_params[:x_gdc_authsst]
      end

      def stats_table(values = stats)
        sorted = values.sort_by { |_k, v| v[:avg] }
        Terminal::Table.new :headings => %w(title avg min max total calls) do |t|
          overall = {
            :avg => 0,
            :calls => 0,
            :total => 0
          }

          sorted.each do |l|
            avg = l[1][:avg]
            min = l[1][:min]
            max = l[1][:max]
            total = l[1][:total]
            calls = l[1][:calls]

            row = [
              l[0],
              sprintf('%.3f', avg),
              sprintf('%.3f', min),
              sprintf('%.3f', max),
              sprintf('%.3f', total),
              calls
            ]

            overall[:min] = min if overall[:min].nil? || min < overall[:min]
            overall[:max] = max if overall[:max].nil? || max > overall[:max]
            overall[:total] += total
            overall[:calls] += calls
            overall[:avg] += avg

            t.add_row row
          end

          overall[:avg] = overall[:avg] / sorted.length
          row = [
            'TOTAL',
            sprintf('%.3f', overall[:avg]),
            sprintf('%.3f', overall[:min]),
            sprintf('%.3f', overall[:max]),
            sprintf('%.3f', overall[:total]),
            overall[:calls]
          ]

          t.add_row :separator
          t.add_row row
        end
      end

      # Reader method for TT token
      #
      # @return uri [String] TT token
      def tt_token
        request_params[:x_gdc_authtt]
      end

      # Uploads a file to GoodData server

      def upload(file, options = {})
        dir = options[:directory] || ''
        staging_uri = options[:staging_url].to_s
        url = dir.empty? ? staging_uri : URI.join("#{server}", staging_uri, "#{dir}/").to_s
        # Make a directory, if needed
        create_webdav_dir_if_needed url unless dir.empty?

        webdav_filename = options[:filename] || File.basename(file)
        do_stream_file URI.join("#{server}", url, CGI.escape(webdav_filename)), file
      end

      def generate_request_id
        "#{@execution_id}:#{session_id}:#{call_id}"
      end

      def enrich_error_message(exception)
        begin
          return exception unless exception.response

          response = JSON.parse(exception.response.body, symbolize_names: true)
          return exception unless exception.message && response[:error] && response[:error][:message]

          request_id = exception.response.headers[:x_gdc_request] || 'unknown'

          exception.message = exception.message + ': ' + response[:error][:message] % response[:error][:parameters] + ' request_id: ' + request_id
        rescue JSON::ParserError # rubocop:disable Lint/HandleExceptions
        end
        exception
      end

      private

      def create_webdav_dir_if_needed(url)
        return if webdav_dir_exists?(url)

        method = :mkcol
        b = proc do
          raw = {
            :method => method,
            :url => url,
            :headers => @webdav_headers.merge(:x_gdc_authtt => headers[:x_gdc_authtt]),
            :verify_ssl => verify_ssl
          }
          RestClient::Request.execute(raw)
        end

        GoodData::Rest::Connection.retryable(:tries => Helpers::GD_MAX_RETRY, :refresh_token => proc { refresh_token }) do
          b.call
        end
      end

      def do_stream_file(uri, filename, _options = {})
        GoodData.logger.info "Uploading file user storage #{uri}"

        request = RestClient::Request.new(:method => :put,
                                          :url => uri.to_s,
                                          :verify_ssl => verify_ssl,
                                          :headers => @webdav_headers.merge(:x_gdc_authtt => headers[:x_gdc_authtt]),
                                          :payload => File.new(filename, 'rb'))

        begin
          request.execute
        rescue => e
          raise "Error when uploading file #{filename}. Error: #{e}"
        end
      end

      def format_error(e, params = {})
        return e unless e.respond_to?(:response)
        error = MultiJson.load(e.response)
        message = GoodData::Helpers.interpolate_error_message(error)
        if error && error['error'] && error['error']['errorClass'] == 'com.gooddata.security.authorization.AuthorizationFailedException'
          message = "#{message}, accessing with #{user['accountSetting']['login']}"
        end
        <<-ERR

#{e}: #{message}
Request ID: #{params[:x_gdc_request]}
Full response:
#{JSON.pretty_generate(error)}
Backtrace:\n#{e.backtrace.join("\n")}
ERR
      rescue MultiJson::ParseError
        "Failed to parse #{e}. Raw response: #{e.response}"
      end

      # generate session id to be passed as the first part to
      # x_gdc_request header
      def session_id
        @session_id ||= Connection.generate_string
      end

      def call_id
        Connection.generate_string
      end

      def generate_session_id
        @session_id = Connection.generate_string
      end

      def clear_session_id
        @session_id = nil
      end

      # log info_message given in options and make sure request_id is there
      def log_info(options)
        # if info_message given, log it with request_id (given or generated)
        if options[:info_message]
          GoodData.logger.info "#{options[:info_message]} Request id: #{options[:request_id]}"
        end
      end

      # request heders with freshly generated request id
      def fresh_request_params(request_id = nil)
        tt = { :x_gdc_authtt => tt_token }
        tt.merge(:x_gdc_request => request_id || generate_request_id)
      end

      def merge_headers!(headers)
        @request_params.merge! headers.slice(:x_gdc_authtt, :x_gdc_authsst)
      end

      def process_response(options = {}, &block)
        retries = options[:tries] || Helpers::GD_MAX_RETRY
        process = options[:process]
        dont_reauth = options[:dont_reauth]
        options = options.reject { |k, _| [:process, :dont_reauth].include?(k) }
        opts = { tries: retries, refresh_token: proc { refresh_token unless dont_reauth } }.merge(options)
        begin
          response = GoodData::Rest::Connection.retryable(opts) do
            block.call
          end
        rescue RestClient::Exception => e
          fail enrich_error_message(e)
        end
        merge_headers! response.headers
        content_type = response.headers[:content_type]
        return response if process == false
        if content_type == 'application/json' || content_type == 'application/json;charset=UTF-8'
          result = response.to_str == '""' ? {} : MultiJson.load(response.to_str)
          GoodData.rest_logger.debug "Request ID: #{response.headers[:x_gdc_request]} - Response: #{result.inspect}"
        elsif ['text/plain;charset=UTF-8', 'text/plain; charset=UTF-8', 'text/plain'].include?(content_type)
          result = response
          GoodData.rest_logger.debug 'Response: plain text'
        elsif content_type == 'application/zip'
          result = response
          GoodData.rest_logger.debug 'Response: a zipped stream'
        elsif content_type == 'text/csv'
          result = response
          GoodData.rest_logger.debug 'Response: CSV text'
        elsif response.headers[:content_length].to_s == '0'
          result = nil
          GoodData.rest_logger.debug 'Response: Empty response possibly 204'
        elsif response.code == 204
          result = nil
          GoodData.rest_logger.debug 'Response: 204 no content'
        elsif response.code == 200 && content_type.nil? && response.body.empty?
          result = nil
          # TMA-696
          GoodData.rest_logger.warn 'Got response status 200 but no content-type and body.'
        else
          fail "Unsupported response content type '%s':\n%s" % [content_type, response.to_str[0..127]]
        end
        result
      end

      def profile(method, path, request_id, stats_on, &block)
        t1 = Time.now
        res = block.call
        t2 = Time.now
        delta = t2 - t1

        add_stat_record method, path, delta, t1, request_id if stats_on
        res
      end

      def reset_headers!
        @request_params = {}
      end

      def scrub_params(params, keys)
        keys = keys.reduce([]) { |acc, elem| acc.concat([elem.to_s, elem.to_sym]) }

        new_params = GoodData::Helpers.deep_dup(params)
        GoodData::Helpers.hash_dfs(new_params) do |k, _key|
          keys.each do |key_to_scrub|
            k[key_to_scrub] = ('*' * k[key_to_scrub].length) if k && k.key?(key_to_scrub) && k[key_to_scrub]
          end
        end
        new_params
      end

      def add_stat_record(method, path, delta, time_stamp, request_id)
        synchronize do
          orig_title = "#{method.to_s.upcase} #{path}"
          title = "#{method.to_s.upcase} #{path}"

          placeholders = true

          if placeholders
            title = self.class.map_placeholders(title)
          end

          stat = stats[title]
          if stat.nil?
            stat = {
              :min => delta,
              :max => delta,
              :total => 0,
              :avg => 0,
              :calls => 0,
              :entries => []
            }
          end

          stat[:min] = delta if delta < stat[:min]
          stat[:max] = delta if delta > stat[:max]
          stat[:total] += delta
          stat[:calls] += 1
          stat[:avg] = stat[:total] / stat[:calls]

          stat[:entries] << orig_title if placeholders

          stats[title] = stat

          endpoint = self.class.map_placeholders path.clone
          duration = delta
          time_stamp = time_stamp.utc.strftime "%Y-%m-%dT%H:%M:%S.%L"
          domain = server_url.gsub %r{http://|https://}, ""
          execution_id = request_id

          GoodData.gd_logger.update_store(domain, method, duration, endpoint)
          GoodData.gd_logger.add Logger::DEBUG, { endpoint: endpoint, duration: duration, domain: domain,
                                                 execution_id: execution_id, time_stamp: time_stamp }, "rest_call"
        end
      end

      def webdav_dir_exists?(url)
        method = :get
        GoodData.logger.debug "#{method}: #{url}"

        GoodData::Rest::Connection.retryable(:tries => Helpers::GD_MAX_RETRY, :refresh_token => proc { refresh_token }) do
          raw = {
            :method => method,
            :url => url,
            :headers => @webdav_headers.merge(:x_gdc_authtt => headers[:x_gdc_authtt]),
            :verify_ssl => verify_ssl
          }.merge(headers)
          begin
            RestClient::Request.execute(raw)
          rescue RestClient::Exception => e
            false if e.http_code == 404
          end
        end
      end

      def fix_server_url(server)
        server = server.chomp('/')
        if server.start_with? HTTP_PROTOCOL
          server = server.sub HTTP_PROTOCOL, HTTPS_PROTOCOL
          GoodData.logger.warn 'You specified the HTTP protocol in your server string. It has been autofixed to HTTPS.'
        end

        server = HTTPS_PROTOCOL + server unless server.start_with? HTTPS_PROTOCOL
        server
      end
    end
  end
end