CartoDB/cartodb20

View on GitHub
services/importer/lib/importer/georeferencer.rb

Summary

Maintainability
D
2 days
Test Coverage
require_relative './column'
require_relative './job'
require_relative './query_batcher'
require_relative './content_guesser'

require_relative '../../../../lib/cartodb/stats/importer'
require_relative '../../../dataservices-metrics/lib/geocoder_usage_metrics'

module CartoDB
  module Importer2
    class Georeferencer

      include ::LoggerHelper

      DEFAULT_BATCH_SIZE = 50000
      GEOMETRY_POSSIBLE_NAMES   = %w{ geometry the_geom wkb_geometry geom geojson wkt }
      DEFAULT_SCHEMA            = 'cdb_importer'
      THE_GEOM_WEBMERCATOR      = 'the_geom_webmercator'
      DIRECT_STATEMENT_TIMEOUT  = 1.hour * 1000

      def initialize(db, table_name, options, schema=DEFAULT_SCHEMA, job=nil, geometry_columns=nil, logger=nil)
        @db         = db
        @job        = job || Job.new({  logger: logger } )
        @table_name = table_name
        @schema     = schema
        @geometry_columns = geometry_columns || GEOMETRY_POSSIBLE_NAMES
        @from_geojson_with_transform = false
        @options = options
        @tracker = @options[:tracker] || lambda { |state| state }
        @content_guesser = CartoDB::Importer2::ContentGuesser.new(@db, @table_name, @schema, @options, @job)
        @importer_stats = CartoDB::Stats::Importer.instance
      end

      def set_importer_stats(importer_stats)
        @importer_stats = importer_stats
      end

      def mark_as_from_geojson_with_transform
        @from_geojson_with_transform = true
      end

      def run
        disable_autovacuum

        drop_the_geom_webmercator

        the_geom_column_name = create_the_geom_from_geometry_column  ||
          create_the_geom_from_ip_guessing      ||
          create_the_geom_from_namedplaces_guessing ||
          create_the_geom_from_country_guessing ||
          create_the_geom_in(table_name)

        enable_autovacuum

        raise GeometryCollectionNotSupportedError if get_geometry_type(the_geom_column_name || 'the_geom') == 'GEOMETRYCOLLECTION'
        self
      end

      def disable_autovacuum
        job.log "Disabling autovacuum for #{qualified_table_name}"
        db.run(%Q{
         ALTER TABLE #{qualified_table_name} SET (autovacuum_enabled = FALSE, toast.autovacuum_enabled = FALSE);
        })
      end

      def enable_autovacuum
        job.log "Enabling autovacuum for #{qualified_table_name}"
        db.run(%Q{
         ALTER TABLE #{qualified_table_name} SET (autovacuum_enabled = TRUE, toast.autovacuum_enabled = TRUE);
        })
      end

      def create_the_geom_from_geometry_column
        column = nil
        geometry_column_name = geometry_column_in
        return false unless geometry_column_name
        job.log "Creating the_geom from #{geometry_column_name} column"
        column = Column.new(db, table_name, geometry_column_name, user, schema, job)
        column.mark_as_from_geojson_with_transform if @from_geojson_with_transform
        column.empty_lines_to_nulls
        column.geometrify

        column_name = geometry_column_name
        if column_exists_in?(table_name, 'the_geom')
          geometry_type = get_geometry_type('the_geom') rescue nil
          if geometry_type.nil? || geometry_type == 'GEOMETRYCOLLECTION'
            invalid_the_geom = get_column('the_geom')
            if !column_exists_in?(table_name, 'invalid_the_geom')
              invalid_the_geom.rename_to('invalid_the_geom')
            end
          end
        end

        unless column_exists_in?(table_name, 'the_geom')
          column_name = 'the_geom'
          column.rename_to(column_name)
        end

        handle_multipoint(qualified_table_name) if multipoint?
        column_name
      rescue StandardError => exception
        job.log "Error creating the_geom: #{exception}. Trace: #{exception.backtrace}"
        if /statement timeout/.match(exception.message).nil?
          if column.empty?
            job.log "Dropping empty #{geometry_column_name}"
            column.drop
          else
            # probably this one needs to be kept doing... but how if times out?
            job.log "Renaming #{geometry_column_name} to invalid_the_geom"
            column.rename_to(:invalid_the_geom)
          end
        end
        false
      end

      def create_the_geom_from_country_guessing
        return false if not @content_guesser.enabled?
        return false if @content_guesser.sample.count == 0
        job.log 'Trying country guessing...'
        begin
          country_column_name = nil
          @importer_stats.timing('guessing') do
            @tracker.call('guessing')
            country_column_name = @content_guesser.country_column
            @tracker.call('importing')
          end
          if country_column_name
            job.log "Found country column: #{country_column_name}"
            create_the_geom_in table_name
            return geocode_countries country_column_name
          end
        rescue Exception => ex
          message = 'create_the_geom_from_country_guessing failed'
          log_warning(message: message, exception: ex)
          job.log "WARNING: #{message}: #{ex.inspect}"
        end
        return false
      end

      def create_the_geom_from_namedplaces_guessing
        return false if not @content_guesser.enabled?
        return false if @content_guesser.sample.count == 0
        job.log 'Trying namedplaces guessing...'
        begin
          @importer_stats.timing('guessing') do
            @tracker.call('guessing')
            @content_guesser.namedplaces.run!
            @tracker.call('importing')
          end
          if @content_guesser.namedplaces.found?
            job.log "Found namedplace column: #{@content_guesser.namedplaces.column}"
            create_the_geom_in table_name
            return geocode_namedplaces
          end
        rescue Exception => ex
          message = 'create_the_geom_from_namedplaces_guessing failed'
          log_warning(exception: ex, message: message)
          job.log "WARNING: #{message}: #{ex.inspect}"
        end
        return false
      end

      def create_the_geom_from_ip_guessing
        return false if not @content_guesser.enabled?
        return false if @content_guesser.sample.count == 0
        job.log 'Trying ip guessing...'
        begin
          ip_column_name = nil
          @importer_stats.timing('guessing') do
            @tracker.call('guessing')
            ip_column_name = @content_guesser.ip_column
            @tracker.call('importing')
          end
          if ip_column_name
            job.log "Found ip column: #{ip_column_name}"
            return geocode_ips ip_column_name
          end
        rescue Exception => ex
          message = "create_the_geom_from_ip_guessing failed: #{ex.message}"
          log_warning(exception: ex, message: message)
          job.log "WARNING: #{message}: #{ex.inspect}"
          return false
        end
      end

      def geocode_countries country_column_name
        job.log "Geocoding countries..."
        geocode(country_column_name, 'polygon', 'admin0')
      end

      def geocode_namedplaces
        job.log "Geocoding namedplaces..."
        geocode(@content_guesser.namedplaces.column[:column_name],
                'point',
                'namedplace',
                @content_guesser.namedplaces.country_column_name,
                @content_guesser.namedplaces.country)
      end

      def geocode_ips ip_column_name
        job.log "Geocoding ips..."
        geocode(ip_column_name, 'point', 'ipaddress')
      end

      def geocode(formatter, geometry_type, kind, country_column_name=nil, country=nil)
        geocoder = nil
        @importer_stats.timing("geocoding.#{kind}") do
          @tracker.call('geocoding')
          create_the_geom_in(table_name)
          orgname = user.organization.nil? ? nil : user.organization.name
          usage_metrics = CartoDB::GeocoderUsageMetrics.new(user.username, orgname)
          config = @options[:geocoder].merge(
            table_schema: schema,
            table_name: table_name,
            qualified_table_name: qualified_table_name,
            connection: db,
            formatter: formatter,
            geometry_type: geometry_type,
            kind: kind,
            max_rows: nil,
            country_column: country_column_name,
            countries: country.present? ? "'#{country}'" : nil,
            usage_metrics: usage_metrics
          )
          geocoding = Geocoding.new config.slice(:kind, :geometry_type, :formatter, :table_name, :country_column, :country_code)
          geocoding.user = user
          geocoding.data_import_id = data_import.id unless data_import.nil?
          geocoding.raise_on_save_failure = true
          geocoder = CartoDB::InternalGeocoder::Geocoder.new(config.merge!(geocoding_model: geocoding))
          geocoder.set_log(geocoding.log)
          geocoding.force_geocoder(geocoder)
          begin
            geocoding.run_geocoding!(row_count)
            raise "Geocoding failed" if geocoding.state == 'failed'
          rescue StandardError => e
            config_info = config.select {|key, value| [:table_schema, :table_name, :qualified_table_name, :formatter, :geometry_type, :kind, :max_rows, :country_column, ].include?(key) }
            log_error(
              message: 'Georeferencer could not register geocoding, fallback to geocoder.run',
              exception: e, config: config_info
            )
            geocoder.run
          end

          job.log "Geocoding finished"
        end
        geocoder.state == 'completed'
        'the_geom'
      end

      def row_count
        @row_count ||= db[%Q{select count(1) from #{qualified_table_name}}].first[:count]
      end

      def data_import
        @data_import ||= DataImport.where(logger: @job.logger.id).first
      end

      def user
        @user ||= ::User.where(id: user_id).first
      end

      def user_id
        @job.logger.user_id
      end

      def create_the_geom_in(table_name)
        job.log 'Creating the_geom column'
        return false if column_exists_in?(table_name, 'the_geom')

        db.run(%Q{
          SELECT public.AddGeometryColumn(
            '#{schema}','#{table_name}','the_geom',4326,'geometry',2
          );
        })
        'the_geom'
      end

      def column_exists_in?(table_name, column_name)
        columns_in(table_name).include?(column_name.to_sym)
      end

      def columns_in(table_name)
        db.schema(table_name, reload: true, schema: schema).map(&:first)
      end

      def geometry_column_in
        names = geometry_columns.map { |name| "'#{name}'" }.join(',')
        find_column_in(table_name, names)
      end

      def drop_the_geom_webmercator
        return self unless column_exists_in?(table_name, THE_GEOM_WEBMERCATOR)

        job.log 'Dropping the_geom_webmercator column'
        column = Column.new(db, table_name, THE_GEOM_WEBMERCATOR, user, schema, job)
        column.drop
      end

      def get_column(column = :the_geom)
        Column.new(db, table_name, column, user, schema, job)
      end

      def get_geometry_type(column = :the_geom)
        get_column(column).geometry_type
      end

      def find_column_in(table_name, possible_names)
        sample = db[%Q{
          SELECT  column_name
          FROM    information_schema.columns
          WHERE   table_name = '#{table_name}'
          AND     table_schema = '#{schema}'
          AND     lower(column_name) in (#{possible_names})
          LIMIT   1
        }].first

        !!sample && sample.fetch(:column_name, false)
      end

      def handle_multipoint(qualified_table_name)
        # TODO: capture_exceptions=true
        job.log 'Converting detected multipoint to point'

        user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
            user_direct_conn.run(%Q{
                                    UPDATE #{qualified_table_name}
                                    SET the_geom = ST_GeometryN(the_geom, 1)
                                    })
        end
      end

      def multipoint?
        is_multipoint = db[%Q{
          SELECT public.GeometryType(the_geom)
          FROM #{qualified_table_name}
          AS geometrytype
        }].first.fetch(:geometrytype) == 'MULTIPOINT'

        job.log 'found MULTIPOINT geometry' if is_multipoint

        is_multipoint
      rescue StandardError
        false
      end

      private

      attr_reader :db, :table_name, :schema, :job, :geometry_columns

      def qualified_table_name
        %Q("#{schema}"."#{table_name}")
      end

      def log_context
        super.merge(current_user: user)
      end
    end
  end
end