CartoDB/cartodb20

View on GitHub
services/importer/lib/importer/column.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'active_support/time'

require_relative './job'
require_relative './string_sanitizer'
require_relative './exceptions'
require_relative './query_batcher'

require_relative '../../../../lib/carto/db/sanitize'

module CartoDB
  module Importer2
    class Column
      DEFAULT_SRID    = 4326
      WKB_RE          = /^\d{2}/
      GEOJSON_RE      = /{.*(type|coordinates).*(type|coordinates).*}/m
      WKT_RE          = /POINT|LINESTRING|POLYGON/
      KML_MULTI_RE    = /<Line|<Polygon/
      KML_POINT_RE    = /<Point>/
      DEFAULT_SCHEMA  = 'cdb_importer'
      DIRECT_STATEMENT_TIMEOUT = 1.hour * 1000
      RESERVED_COLUMN_NAMES = Carto::DB::Sanitize::RESERVED_COLUMN_NAMES
      PG_IDENTIFIER_MAX_LENGTH = Carto::DB::Sanitize::MAX_IDENTIFIER_LENGTH

      REJECTED_COLUMN_NAMES = Carto::DB::Sanitize::REJECTED_COLUMN_NAMES

      NO_COLUMN_SANITIZATION_VERSION = 0
      INITIAL_COLUMN_SANITIZATION_VERSION = 1
      CURRENT_COLUMN_SANITIZATION_VERSION = 2

      def initialize(db, table_name, column_name, user, schema = DEFAULT_SCHEMA, job = nil, logger = nil, capture_exceptions = true)
        @job          = job || Job.new({logger: logger})
        @db           = db
        @table_name   = table_name
        @column_name  = column_name.to_sym
        @schema       = schema
        @capture_exceptions = capture_exceptions
        @user = user

        @from_geojson_with_transform = false
      end

      def mark_as_from_geojson_with_transform
        @from_geojson_with_transform = true
      end

      def type
        db.schema(table_name, reload: true, schema: schema)
          .select { |column_details|
            column_details.first == column_name
          }.last.last.fetch(:db_type)
      end

      def geometrify
        job.log 'geometrifying'
        raise "empty column #{column_name}" if empty?
        convert_from_wkt                    if wkt?
        convert_from_kml_multi              if kml_multi?
        convert_from_kml_point              if kml_point?
        convert_from_geojson_with_transform if geojson? && @from_geojson_with_transform
        convert_from_geojson                if geojson?

        cast_to('geometry')
        convert_to_2d
        job.log 'geometrified'
        self
      end

      def convert_from_wkt
        #TODO: @capture_exceptions
        job.log 'Converting geometry from WKT to WKB'
        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = ST_GeomFromText("#{column_name}", #{DEFAULT_SRID})
                                 })
        end
        self
      end

      def convert_from_geojson_with_transform
        # 1) cast to proper null the geom column
        db.run(%Q{
          UPDATE #{qualified_table_name}
          SET "#{column_name}" = NULL
          WHERE "#{column_name}" = ''
        })

        # 2) Normal geojson behavior
        #TODO: @capture_exceptions
        job.log 'Converting geometry from GeoJSON with transform to WKB'
        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = public.ST_SetSRID(public.ST_GeomFromGeoJSON("#{column_name}"), #{DEFAULT_SRID})
                                 })
        end

        self
      end

      def convert_from_geojson
        #TODO: @capture_exceptions
        job.log 'Converting geometry from GeoJSON to WKB'
        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = public.ST_SetSRID(public.ST_GeomFromGeoJSON("#{column_name}"), #{DEFAULT_SRID})
                                 })
        end

        self
      end

      def convert_from_kml_point
        #TODO: @capture_exceptions
        job.log 'Converting geometry from KML point to WKB'
        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = public.ST_SetSRID(public.ST_GeomFromKML("#{column_name}"),#{DEFAULT_SRID})
                                 })
        end
      end

      def convert_from_kml_multi
        #TODO: @capture_exceptions
        job.log 'Converting geometry from KML multi to WKB'
        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = public.ST_SetSRID(public.ST_Multi(public.ST_GeomFromKML("#{column_name}")),#{DEFAULT_SRID})
                                 })
        end
      end

      def convert_to_2d
        #TODO: @capture_exceptions
        job.log 'Converting to 2D point'

        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                 UPDATE #{qualified_table_name}
                                 SET "#{column_name}" = public.ST_Force2D("#{column_name}")
                                 })
        end

      end

      def wkb?
        !!(sample.to_s =~ WKB_RE)
      end

      def wkt?
        !!(sample.to_s =~ WKT_RE)
      end

      def geojson?
        !!(sample.to_s =~ GEOJSON_RE)
      end

      def kml_point?
        !!(sample.to_s =~ KML_POINT_RE)
      end

      def kml_multi?
        !!(sample.to_s =~ KML_MULTI_RE)
      end

      def cast_to(type)
        job.log "casting #{column_name} to #{type}"

        @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
          user_direct_conn.run(%Q{
                                    ALTER TABLE #{qualified_table_name}
                                    ALTER "#{column_name}"
                                    TYPE #{type}
                                    USING "#{column_name}"::#{type}
                                  })
        end
        self
      end

      def sample
        return nil if empty?
        records_with_data.first.fetch(column_name)
      end

      def empty?
        records_with_data.empty?
      end

      def records_with_data
        @records_with_data ||= db[%Q{
          SELECT "#{column_name}" FROM "#{schema}"."#{table_name}"
          WHERE "#{column_name}" IS NOT NULL
          AND "#{column_name}"::text != ''
          LIMIT 1
        }]
      end

      def rename_to(new_name)
        return self if new_name.to_s == column_name.to_s

        job.log "Renaming column #{column_name} TO #{new_name}"

        db.run(%Q{
          ALTER TABLE "#{schema}"."#{table_name}"
          RENAME COLUMN "#{column_name}" TO "#{new_name}"
        })
        @column_name = new_name
      end

      def geometry_type
        sample = db[%Q{
          SELECT public.GeometryType(ST_Force2D("#{column_name}"::geometry))
          AS type
          FROM #{schema}.#{table_name}
          WHERE "#{column_name}" IS NOT NULL
          LIMIT 1
        }].first
        sample && sample.fetch(:type)
      end

      def drop
        db.run(%Q{
          ALTER TABLE #{qualified_table_name}
          DROP COLUMN IF EXISTS "#{column_name}"
        })
      end

      # Replace empty strings by nulls to avoid cast errors
      def empty_lines_to_nulls
        # first timeout crash
        job.log 'replace empty strings by nulls?'
        column_id = column_name.to_sym
        column_type = nil
        db.schema(table_name).each do |colid, coldef|
          if colid == column_id
            column_type = coldef[:type]
          end
        end
        if column_type != nil && column_type == :string
          #TODO: @capture_exceptions
          job.log 'string column found, replacing'
          @user.db_service.in_database_direct_connection(statement_timeout: DIRECT_STATEMENT_TIMEOUT) do |user_direct_conn|
            user_direct_conn.run(%Q{
                   UPDATE #{qualified_table_name}
                   SET "#{column_name}"=NULL
                   WHERE "#{column_name}"=''
                 })
          end
        else
          job.log 'no string column found, nothing replaced'
        end
      end

      def sanitize
        rename_to(sanitized_name)
      end

      def sanitized_name
        self.class.sanitize_name column_name
      end

      def self.reserved?(name)
        RESERVED_COLUMN_NAMES.include?(name.downcase)
      end

      def self.unsupported?(name)
        name !~ /^[a-zA-Z_]/
      end

      def self.reserved_or_unsupported?(name)
        reserved?(name) || unsupported?(name)
      end

      def self.sanitize_name(column_name)
        name = StringSanitizer.sanitize(column_name.to_s, transliterate_cyrillic: true)
        return name unless reserved_or_unsupported?(name)
        "_#{name}"
      end

      def self.rejected?(name)
        REJECTED_COLUMN_NAMES.include?(name) || name =~ /^[0-9]/
      end

      def self.get_valid_column_name(candidate_column_name, column_sanitization_version, column_names)
        return candidate_column_name if column_sanitization_version == NO_COLUMN_SANITIZATION_VERSION

        existing_names = column_names - [candidate_column_name]

        if column_sanitization_version == INITIAL_COLUMN_SANITIZATION_VERSION
          get_valid_column_name_v1(candidate_column_name, existing_names)
        elsif column_sanitization_version == 2
          get_valid_column_name_v2(candidate_column_name, existing_names)
        elsif column_sanitization_version == 3
          get_valid_column_name_v3(candidate_column_name, existing_names)
        else
          raise "Invalid column sanitization version #{column_sanitization_version.inspect}"
        end
      end

      private

      def self.get_valid_column_name_v1(candidate_column_name, existing_names)
        # NOTE: originally not all uses of this sanitization version applied reserved words
        # reserved_words = RESERVED_COLUMN_NAMES
        reserved_words = []

        candidate_column_name = 'untitled_column' if candidate_column_name.blank?
        candidate_column_name = candidate_column_name.to_s.squish

        # Subsequent characters can be letters, underscores or digits
        candidate_column_name = candidate_column_name.gsub(/[^a-z0-9]/,'_').gsub(/_{2,}/, '_')

        # Valid names start with a letter or an underscore
        candidate_column_name = "column_#{candidate_column_name}" unless candidate_column_name[/^[a-z_]{1}/]

        avoid_collisions(candidate_column_name, existing_names, reserved_words)
      end

      def self.get_valid_column_name_v2(candidate_column_name, existing_names)
        new_column_name = sanitize_name(candidate_column_name).gsub(/_{2,}/, '_')
        new_column_name = [0, PG_IDENTIFIER_MAX_LENGTH] if new_column_name.size > PG_IDENTIFIER_MAX_LENGTH
        avoid_collisions(new_column_name, existing_names, RESERVED_COLUMN_NAMES)
      end

      def self.get_valid_column_name_v3(candidate_column_name, existing_names)
          # experimental sanitization
          # this can be configured using the locale files for the current (I18n.locale) locale;
          # for example, for I18n.locale == :en we could add this to config/locales/en.yml
          #   en:
          #     i18n:
          #      transliterate:
          #        rule:
          #          Ж: Zh
          #          ж: zh
          new_column_name = candidate_column_name.parameterize.tr('-','_')
          new_column_name = [0, PG_IDENTIFIER_MAX_LENGTH] if new_column_name.size > PG_IDENTIFIER_MAX_LENGTH
          avoid_collisions(new_column_name, existing_names, RESERVED_COLUMN_NAMES)
      end

      def self.avoid_collisions(name, existing_names, reserved_words, max_length=PG_IDENTIFIER_MAX_LENGTH)
        count = 1
        new_name = name
        while existing_names.include?(new_name) || reserved_words.include?(new_name.downcase)
          suffix = "_#{count}"
          new_name = name[0..max_length-suffix.length] + suffix
          count += 1
        end
        new_name
      end

      attr_reader :job, :db, :table_name, :column_name, :schema

      def qualified_table_name
        %Q("#{schema}"."#{table_name}")
      end
    end
  end
end