CartoDB/cartodb20

View on GitHub
services/datasources/lib/datasources/url/arcgis.rb

Summary

Maintainability
D
2 days
Test Coverage
require 'json'
require 'addressable/uri'
require_relative '../base_direct_stream'
require_relative '../../../../../lib/carto/http/client'

module CartoDB
  module Datasources
    module Url

      class ArcGIS < BaseDirectStream

        # Required for all datasources
        DATASOURCE_NAME = 'arcgis'

        ARCGIS_API_LIKE_URL_RE = /\/rest\/services/i

        METADATA_URL     = '%s?f=json'
        FEATURE_IDS_URL  = '%s/query?where=1%%3D1&returnIdsOnly=true&f=json'
        FEATURE_DATA_POST_URL = '%s/query'
        LAYERS_URL       = '%s/layers?f=json'

        MINIMUM_SUPPORTED_VERSION = 10.1

        # In seconds, for connecting
        HTTP_CONNECTION_TIMEOUT = 60
        # In seconds, for the full request
        HTTP_TIMEOUT = 120
        # In seconds, for writting to logs
        LOG_TIMEOUT = 120

        # Amount to multiply or divide
        BLOCK_FACTOR = 2
        MIN_BLOCK_SIZE = 1
        # GeoJSON can get too big in memory, or ArcGIS have mem problems, so keep reasonable number
        MAX_BLOCK_SIZE = 100
        # In seconds, use 0 to disable
        BLOCK_SLEEP_TIME = 0

        # Each retry will be after SLEEP_REQUEST_TIME^(current_retries_count). Set to 0 to disable retrying
        MAX_RETRIES = 5
        SLEEP_REQUEST_TIME = 5
        SKIP_FAILED_IDS = false

        # Used to display more data only (for local debugging purposes)
        DEBUG = false

        VECTOR_LAYER_TYPE = 'Feature Layer'.freeze
        OID_FIELD_TYPE    = 'esriFieldTypeOID'.freeze

        attr_reader :metadata

        # Constructor
        # @param user ::User
        def initialize(user)
          super
          @service_name = DATASOURCE_NAME

          # Fields:
          # @metadata = {
          #   arcgis_version:             nil,
          #   name:                       nil,
          #   description:                nil,
          #   type:                       nil,
          #   geometry_type:              nil,
          #   copyright:                  nil,
          #   fields:                     [],
          #   max_records_per_query:      500,
          #   supported_formats:          [],
          #   advanced_queries_supported: false
          # }
          @metadata = nil

          @user = user

          @url = nil
          @ids_total = 0
          @ids_retrieved = 0
          @block_size = 0
          @current_stream_status = true
          @last_stream_status = true
          @ids = nil
          @log_timer = nil
        end

        # Factory method
        # @param user ::User
        # @return CartoDB::Datasources::Url::ArcGIS
        def self.get_new(user)
          return new(user)
        end

        # @return String
        def to_s
          "<CartoDB::Datasources::Url::ArcGis @url=#{@url} @metadata=#{@metadata} @ids_total=#{@ids_total}" +
          " @ids_retrieved=#{@ids_retrieved} current_block_size=#{block_size(update=false)}>"
        end

        # If will provide a url to download the resource, or requires calling get_resource()
        # @return Bool
        def providers_download_url?
          false
        end

        # Perform the listing and return results
        # @param filter Array : (Optional) filter to specify which resources to retrieve. Leave empty for all supported.
        # @return [ Hash ]
        def get_resources_list(filter=[])
          filter
        end

        # Retrieves a resource and returns its contents
        # @param id string
        # @return mixed
        def get_resource(id)
          raise 'Not supported by this datasource'
        end

        def timed_log(s)
          t2 = Time.now
          if ((@log_timer == nil) || ((t2 - @log_timer) > LOG_TIMEOUT))
            append_and_store(s)
            @log_timer = t2
          end
        end

        def build_empty_initial_stream
          {
            geometryType: @metadata[:geometry_type],
            fields: @metadata[:fields],
            features: []
          }.to_json
        end

        # Initial stream, to be used for container creation (table usually)
        # @param id string
        # @return String
        def initial_stream(id)
          sub_id = get_subresource_id(id)
          @url = sanitize_id(id, sub_id)

          @ids = get_ids_list(@url)

          return build_empty_initial_stream if @ids.empty?

          @ids_total = @ids.length

          first_item = get_by_ids(@url, [@ids.slice!(0)], @metadata[:fields])
          @ids_retrieved += 1

          timed_log("Retrieved the first element (Total elements: #{@ids_total})")

          # Start with a pesimistic setup
          @block_size = [MIN_BLOCK_SIZE * BLOCK_FACTOR, MAX_BLOCK_SIZE, @metadata[:max_records_per_query]].min

          ::JSON.dump(first_item)
        end

        # @param id string
        # @return String|nil Nil if no more items
        def stream_resource(id)
          return nil if @ids.empty?

          retries = 0
          begin
            ids_block = @ids.slice!(0, [@ids.length, block_size].min)

            puts "#{@ids_retrieved}/#{@ids_total} (#{ids_block.length})" if DEBUG

            items = get_by_ids(@url, ids_block, @metadata[:fields])
            timed_log("Downloaded a chunk of #{ids_block.length} ids (#{@ids_retrieved + ids_block.length} so far)")
            @current_stream_status = true
            retries = 0
            sleep(BLOCK_SLEEP_TIME) unless BLOCK_SLEEP_TIME == 0
          rescue ExternalServiceError => exception
            if @block_size == MIN_BLOCK_SIZE && retries >= MAX_RETRIES
              if SKIP_FAILED_IDS
                items = []
              else
                append_and_store("Too many download failures. Giving up.")
                raise exception
              end
            else
              timed_log("FAILED to download a chunk of #{ids_block.length} ids (#{@ids_retrieved} ids already downloaded). Retrying...")
              @current_stream_status = false
              # Add back, next pass will get fewer items
              @ids = ids_block + @ids

              if @block_size == MIN_BLOCK_SIZE
                retries += 1
                sleep_time = SLEEP_REQUEST_TIME ** retries
                puts "Retry delay (#{sleep_time}s)" if DEBUG
                sleep(sleep_time)
              end

              retry
            end
          end

          @ids_retrieved += ids_block.length

          items.length > 0 ? ::JSON.dump(items) : ''
        end

        # @param id string
        # @return Hash
        # @throws DataDownloadError
        # @throws ResponseError
        # @throws InvalidServiceError
        # @throws ServiceDisabledError
        def get_resource_metadata(id)
          if is_multiresource?(id)
            @url = sanitize_id(id)
            {
              # Store original id, not the sanitized one
              id:           id,
              subresources: get_layers_list(@url)
            }
          else
            sub_id = get_subresource_id(id)
            get_subresource_metadata(id, sub_id)
          end
        end

        # Retrieves current filters. Unused as here there's no get_resources_list
        # @return {}
        def filter
          {}
        end

        # Sets current filters. Unused as here there's no get_resources_list
        # @param filter_data {}
        def filter=(filter_data=[])
          filter_data
        end

        # If this datasource accepts a data import instance
        # @return Boolean
        def persists_state_via_data_import?
          false
        end

        # If true, a single resource id might return >1 subresources (each one spawning a table)
        # @param id String
        # @return Bool
        def multi_resource_import_supported?(id)
          is_multiresource?(id)
        end

        private

        def http_client
          @http_client ||= Carto::Http::Client.get('arcgis')
        end

        def append_and_store(str, truncate = true, timestamp = Time.now.utc)
          @logger.append_and_store(str, truncate, timestamp) unless @logger.nil?
        end

        # @param id String
        # @param subresource_id String
        # @return Hash
        # @throws DataDownloadError
        # @throws ResponseError
        # @throws InvalidServiceError
        def get_subresource_metadata(id, subresource_id)
          @url = sanitize_id(id, subresource_id)

          response = http_client.get(METADATA_URL % [@url], http_options)
          validate_response(METADATA_URL % [@url], response)

          # non-rails symbolize keys
          data = ::JSON.parse(response.body).inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}

          raise ResponseError.new("Invalid layer type: '#{data[:type]}'") if data[:type] != VECTOR_LAYER_TYPE
          raise ResponseError.new("Missing data: 'fields'") if data[:fields].nil?

          if data[:supportedQueryFormats].present?
            supported_formats = data.fetch(:supportedQueryFormats).gsub(' ', '').split(',')
          else
            supported_formats = []
          end

          begin
            @metadata = {
              arcgis_version:             data.fetch(:currentVersion),
              name:                       data.fetch(:name),
              description:                data.fetch(:description, ''),
              type:                       data.fetch(:type),
              geometry_type:              data.fetch(:geometryType),
              copyright:                  data.fetch(:copyrightText, ''),
              fields:                     data.fetch(:fields).try(:map) { |field|
                {
                  name: field['name'],
                  type: field['type']
                }
              },
              max_records_per_query:      data.fetch(:maxRecordCount, 500),
              supported_formats:          supported_formats,
              advanced_queries_supported: data.fetch(:supportsAdvancedQueries, false)
            }
          rescue StandardError => exception
            raise ResponseError.new("Missing data: #{exception.to_s} #{exception.backtrace}")
          end

          raise InvalidServiceError.new("Unsupported ArcGIS version #{@metadata[:arcgis_Version]}, must be >= #{MINIMUM_SUPPORTED_VERSION}") \
            if @metadata[:arcgis_version] < MINIMUM_SUPPORTED_VERSION

          {
            id:       id,
            title:    @metadata[:name],
            url:      nil,
            service:  DATASOURCE_NAME,
            checksum: nil,
            size:     NO_CONTENT_SIZE_PROVIDED,
            filename: filename_from(@metadata[:name])
          }
        end

        # Just detects if id is a full map or a specific layer
        # @param id String
        # @return Bool
        def is_multiresource?(id)
          unless id.rindex('?').nil?
            id = id.slice(0, id.rindex('?'))
          end
          (id =~ /\/([0-9]+\/|[0-9]+)$/).nil?
        end

        def get_subresource_id(id)
          id.match(/([0-9]+\/|[0-9]+)$?/)[0]
        end

        # @param id String
        # @param sub_id String|nil
        # @return String
        # @throws InvalidInputDataError
        def sanitize_id(id, sub_id=nil)
          # http://<host>/<site>/rest/services/<folder>/<serviceName>/<serviceType>/
          # <site> is almost always "arcgis" (according to official doc)
          unless id =~ ARCGIS_API_LIKE_URL_RE
            raise InvalidInputDataError.new("Url doesn't looks as from ArcGIS server")
          end

          unless id.rindex('?').nil?
            id = id.slice(0, id.rindex('?'))
          end
          if is_multiresource?(id) && !sub_id.nil?
            id = (id.end_with?('/') ? id : id + '/') + sub_id
          end

          id
        end

        # @return Array [ { :id, :title} ]
        # @throws DataDownloadError
        # @throws ResponseError
        def get_layers_list(url)
          request_url = LAYERS_URL % [url]
          response = http_client.get(request_url, http_options)
          validate_response(request_url, response)

          begin
            data = ::JSON.parse(response.body).fetch('layers')
          rescue StandardError => exception
            raise ResponseError.new("Missing data: #{exception.to_s} #{request_url} #{exception.backtrace}")
          end

          # We only support vector layers (not raster layers)
          data = data.reject { |layer| layer['type'] != VECTOR_LAYER_TYPE }

          raise ResponseError.new("Empty layers list #{request_url}") if data.length == 0

          begin
            data.collect { |item|
              {
                # Leave prepared all child urls
                id: (url.end_with?('/') ? url : url + '/') + item.fetch('id').to_s,
                title: item.fetch('name')
              }
            }
          rescue StandardError => exception
            raise ResponseError.new("Missing data: #{exception.to_s} #{request_url} #{exception.backtrace}")
          end
        end

        # NOTE: Assumes url is valid
        # NOTE: Returned ids are sorted so they can be chunked into blocks to
        #       be requested by range queries: `(OBJECTID >= ... AND OBJECTID <= ... )`
        # @param url String
        # @return Array
        # @throws DataDownloadError
        # @throws ResponseError
        def get_ids_list(url)
          request_url = FEATURE_IDS_URL % [url]
          response = http_client.get(request_url, http_options)
          validate_response(request_url, response)

          begin
            data = ::JSON.parse(response.body).fetch('objectIds').sort
          rescue StandardError => exception
            raise ResponseError.new("Missing data: #{exception.to_s} #{request_url} #{exception.backtrace}")
          end

          data
        end

        # NOTE: Assumes url is valid
        # @param url String
        # @param ids Array
        # @param fields Array
        # @return Array [ Hash ] (non-symbolized keys)
        # @throws InvalidInputDataError
        # @throws DataDownloadError
        # @throws ExternalServiceError
        def get_by_ids(url, ids, fields)
          raise InvalidInputDataError.new("'ids' empty or invalid") if (ids.nil? || ids.length == 0)
          raise InvalidInputDataError.new("'fields' empty or invalid") if (fields.nil? || fields.length == 0)

          oid_field = fields.find { |field| field[:type] == OID_FIELD_TYPE }

          if ids.length == 1
            ids_field = { objectIds: ids.first }
          else
            if oid_field
              # Note that ids is sorted
              ids_field = { where: "#{oid_field[:name]} >=#{ids.first} AND #{oid_field[:name]} <=#{ids.last}" }
            else
              # This could be innefficient with large number of ids, but it is limited to MAX_BLOCK_SIZE
              ids_field = { objectIds: ids.join(',') }
            end
          end

          prepared_fields = fields.map { |field| "#{field[:name]}" }.join(',')

          prepared_url = FEATURE_DATA_POST_URL % [url]
          # @see http://resources.arcgis.com/en/help/arcgis-rest-api/index.html#/Query_Map_Service_Layer/02r3000000p1000000/
          params_data = {
            outFields:  prepared_fields,
            outSR:      4326,
            f:          'json'
          }

          params_data.merge! ids_field

          puts "#{prepared_url} (POST) Params:#{params_data}" if DEBUG
          response = http_client.post(prepared_url, http_options(params_data, :post))

          # Timeout connecting to ArcGIS
          if response.code == 0
            raise ExternalServiceError.new("TIMEOUT: #{prepared_url} : #{response.body} #{self.to_s}")
          end
          if response.code != 200
            raise DataDownloadError.new("ERROR: #{prepared_url} POST " +
                                        "#{params_data} (#{response.code}) : #{response.body} #{self.to_s}")
          end
          if response.code == 400 && !response.return_message.nil? \
              && response.return_message.downcase.include?('operation is not supported')
            raise UnsupportedOperationError.new("#{request_url} (#{response.code}) : #{response.body}") \
          end

          begin
            body = ::JSON.parse(response.body)
          rescue JSON::ParserError
            begin
              # HACK: JSON spec does not cover Infinity
              body = ::JSON.parse(response.body.gsub(':INF,', ':"Infinity",'))
            rescue JSON::ParserError => e
              # We cannot do much about it, log, skip and continue
              append_and_store("get_by_ids: #{e.inspect}")
              append_and_store("get_by_ids: #{prepared_url} (POST) Params: #{params_data}", _truncate=false)
              return []
            end
          end

          # Arcgis error
          raise ExternalServiceError.new("#{prepared_url} : #{response.body}") if body.include?('error')

          begin
            retrieved_items = body.fetch('features')
            return [] if retrieved_items.nil? || retrieved_items.empty?
            retrieved_fields = body.fetch('fields')
            geometry_type = body.fetch('geometryType')
            spatial_reference = body.fetch('spatialReference')
          rescue StandardError => exception
            raise ResponseError.new("Missing data: #{exception.to_s} #{prepared_url} #{exception.backtrace}")
          end
          raise ResponseError.new("'fields' empty or invalid #{prepared_url}") \
            if (retrieved_fields.nil? || retrieved_fields.length == 0)
          raise ResponseError.new("'features' empty or invalid #{prepared_url}") \
            if (retrieved_items.nil? || !retrieved_items.kind_of?(Array))

          # Fields can be optional, cannot be enforced to always be present
          desired_fields = fields.map { |field| field[:name] }

          {
            geometryType:     geometry_type,
            spatialReference: spatial_reference,
            fields:           retrieved_fields,
            features:         retrieved_items.collect { |item|
              {
                'attributes' => item['attributes'].select{ |k, v| desired_fields.include?(k) },
                'geometry' => item['geometry']
              }
            }
          }
        end

        # By default, will update the block size, incrementing or decrementing it according to stream operation results
        # We decrement faster (BLOCK_FACTOR ** BLOCK_FACTOR) then we increment on success (BLOCK_FACTOR) to
        # reduce the load of the remote server faster
        def block_size(update=true)
          if update
            if @current_stream_status && (@block_size < MAX_BLOCK_SIZE)
              @block_size = [@block_size * BLOCK_FACTOR, MAX_BLOCK_SIZE].min
            end
            if !@current_stream_status && @block_size > MIN_BLOCK_SIZE
              @block_size = [[(@block_size / (BLOCK_FACTOR ** BLOCK_FACTOR)).floor, MIN_BLOCK_SIZE].max, MAX_BLOCK_SIZE].min
            end
            @block_size = [@block_size, @metadata[:max_records_per_query]].min
          end
          @block_size
        end

        def http_options(params={}, method=:get)
          {
            method:           method,
            params:           method == :get ? params : {},
            body:             method == :post ? params : {},
            followlocation:   true,
            ssl_verifypeer:   false,
            accept_encoding:  'gzip',
            headers:          { 'Accept-Charset' => 'utf-8' },
            ssl_verifyhost:   0,
            nosignal:         true,
            connecttimeout:   HTTP_CONNECTION_TIMEOUT,
            timeout:          HTTP_TIMEOUT
          }
        end

        def filename_from(feature_name)
          feature_name.gsub(/[^\w]/, '_').downcase + '.json'
        end

        def validate_response(request_url, response)
          raise ExternalServiceTimeoutError.new("TIMEOUT: #{request_url} : #{response.return_message}") \
            if response.timed_out? || (response.code.zero? && !response.return_message.nil? \
              && response.return_message.downcase.include?('timeout'))

          raise UnsupportedOperationError.new("#{request_url} (#{response.code}) : #{response.body}") \
            if response.code == 400 && !response.return_message.nil? \
              && response.return_message.downcase.include?('operation is not supported')

          raise DataDownloadError.new("#{request_url} (#{response.code}) : #{response.body}") \
            if response.code != 200
        end
      end
    end

    class URLTooLargeError < StandardError
    end
  end
end