CartoDB/cartodb20

View on GitHub
services/table-geocoder/lib/geocoder_cache.rb

Summary

Maintainability
A
1 hr
Test Coverage
require_relative 'exceptions'
require_relative '../../../lib/carto/http/client'

module CartoDB
  class GeocoderCache

    DEFAULT_BATCH_SIZE = 5000
    DEFAULT_MAX_ROWS   = 1000000
    HTTP_CONNECT_TIMEOUT = 60
    HTTP_DEFAULT_TIMEOUT = 600

    attr_reader :connection, :working_dir, :table_name, :hits, :misses,
                :max_rows, :sql_api, :formatter, :cache_results

    def initialize(arguments)
      @sql_api = arguments.fetch(:sql_api)
      @connection = arguments.fetch(:connection)
      @table_name = arguments.fetch(:table_name)
      @qualified_table_name = arguments.fetch(:qualified_table_name)
      @working_dir = arguments[:working_dir] || Dir.mktmpdir
      `chmod 777 #{@working_dir}`
      @formatter = arguments.fetch(:formatter)
      @max_rows = arguments[:max_rows] || DEFAULT_MAX_ROWS
      @cache_results = nil
      @batch_size = arguments[:batch_size] || DEFAULT_BATCH_SIZE
      @cache_results = File.join(working_dir, "#{temp_table_name}_results.csv")
      @usage_metrics = arguments.fetch(:usage_metrics)
      @log = arguments.fetch(:log)
      init_rows_count
    end

    def run
      @log.append_and_store "Started searching previous geocoded results in geocoder cache"
      get_cache_results
      create_temp_table
      load_results_to_temp_table
      @hits = connection.select.from(temp_table_name).where('longitude is not null and latitude is not null').count.to_i
      copy_results_to_table
      @log.append_and_store "Finished geocoder cache job"
    rescue StandardError => e
      @log.append_and_store "Error getting results from geocoder cache: #{e.inspect}"
      handle_cache_exception e
    ensure
      @usage_metrics.incr(:geocoder_cache, :total_requests, @total_rows)
      @usage_metrics.incr(:geocoder_cache, :success_responses, @hits)
      @usage_metrics.incr(:geocoder_cache, :empty_responses, (@total_rows - @hits - @failed_rows))
      @usage_metrics.incr(:geocoder_cache, :failed_responses, @failed_rows)
      update_log_stats
    end

    def get_cache_results
      begin
        count = count + 1 rescue 0
        limit = [@batch_size, @max_rows - (count * @batch_size)].min
        rows = connection.fetch(%Q{
            SELECT DISTINCT(md5(#{formatter})) AS searchtext
            FROM #{@qualified_table_name}
            WHERE cartodb_georef_status IS NULL
            LIMIT #{limit} OFFSET #{count * @batch_size}
        }).all
        @total_rows += rows.size
        sql   = "WITH addresses(address) AS (VALUES "
        sql << rows.map { |r| "('#{r[:searchtext]}')" }.join(',')
        sql << ") SELECT DISTINCT ON(geocode_string) st_x(g.the_geom) longitude, st_y(g.the_geom) latitude,g.geocode_string FROM addresses a INNER JOIN #{sql_api[:table_name]} g ON md5(g.geocode_string)=a.address"
        response = run_query(sql, 'csv').gsub(/\A.*/, '').gsub(/^$\n/, '')
        File.open(cache_results, 'a') { |f| f.write(response.force_encoding("UTF-8")) } unless response == "\n"
      end while rows.size >= @batch_size && (count * @batch_size) + rows.size < @max_rows
    end

    def store
      begin
        count = count + 1 rescue 0
        sql   = %Q{
           WITH
            -- write the new values
           n(searchtext, the_geom) AS (
              VALUES %%VALUES%%
           ),
            -- update existing rows
           upsert AS (
              UPDATE #{sql_api[:table_name]} o
              SET updated_at = NOW()
              FROM n WHERE o.geocode_string = n.searchtext
              RETURNING o.geocode_string
           )
           -- insert missing rows
           INSERT INTO #{sql_api[:table_name]} (geocode_string,the_geom)
           SELECT n.searchtext, n.the_geom FROM n
           WHERE n.searchtext NOT IN (
            SELECT geocode_string FROM upsert
           );
        }
        rows = connection.fetch(%Q{
          SELECT DISTINCT(quote_nullable(#{formatter})) AS searchtext, the_geom
          FROM #{@qualified_table_name} AS orig
          WHERE orig.cartodb_georef_status IS TRUE AND the_geom IS NOT NULL
          LIMIT #{@batch_size} OFFSET #{count * @batch_size}
        }).all
        sql.gsub! '%%VALUES%%', rows.map { |r| "(#{r[:searchtext]}, '#{r[:the_geom]}')" }.join(',')
        run_query(sql) if rows && rows.size > 0
      end while rows.size >= @batch_size
    rescue StandardError => e
      handle_cache_exception e
    ensure
      drop_temp_table
    end

    def create_temp_table
      connection.run(%Q{
        CREATE TABLE #{temp_table_name} (
          longitude text, latitude text, geocode_string text
        );
      })
    end

    def load_results_to_temp_table
      connection.copy_into(Sequel.lit(temp_table_name), data: File.read(cache_results), format: :csv)
    end

    def copy_results_to_table
      connection.run(%Q{
        UPDATE #{@qualified_table_name} AS dest
        SET the_geom = ST_GeomFromText(
              'POINT(' || orig.longitude || ' ' || orig.latitude || ')', 4326
            ),
            cartodb_georef_status = TRUE
        FROM #{temp_table_name} AS orig
        WHERE #{formatter} = orig.geocode_string
      })
    end

    def drop_temp_table
      connection.run("DROP TABLE IF EXISTS #{temp_table_name}")
    end

    def temp_table_name
      @temp_table_name ||= "geocoding_cache_#{Time.now.to_i}"
    end

    def run_query(query, format = '')
      params = { q: query, api_key: sql_api[:api_key], format: format }
      http_client = Carto::Http::Client.get('geocoder_cache',
                                            log_requests: true)
      response = http_client.post(sql_api[:base_url],
                                  body: URI.encode_www_form(params),
                                  connecttimeout: HTTP_CONNECT_TIMEOUT,
                                  timeout: HTTP_DEFAULT_TIMEOUT)
      response.body
    end

    # It handles in such a way that the caching is silently stopped
    def handle_cache_exception(exception)
      drop_temp_table
      if exception.class == Sequel::DatabaseError && exception.message =~ /canceling statement due to statement timeout/
        # for the moment we just wrap the exception to get a specific error in rollbar
        exception =  Carto::GeocoderErrors::GeocoderCacheDbTimeoutError.new(exception)
      end
      # In case we get some error we are going to pass all the rows as failed
      @failed_rows = @total_rows
      CartoDB.notify_exception(exception)
    end

    private

    def init_rows_count
      @hits = 0
      @total_rows = 0
      @failed_rows = 0
    end

    def update_log_stats
      @log.append_and_store "Geocoding cache stats update. "\
        "Total rows: #{@total_rows} "\
        "--- Hits: #{@hits} --- Failed: #{@failed_rows}"
    end
  end
end