CartoDB/cartodb20

View on GitHub
app/models/table.rb

Summary

Maintainability
F
1 wk
Test Coverage
# Proxies management of a table in the users database
require 'forwardable'

require_relative './table/column_typecaster'
require_relative './table/privacy_manager'
require_relative './table/relator'
require_relative './table/user_table'
require_relative './visualization/member'
require_relative './visualization/collection'
require_relative './visualization/overlays'
require_relative './visualization/table_blender'
require_relative '../../services/importer/lib/importer/query_batcher'
require_relative '../../services/importer/lib/importer/cartodbfy_time'
require_relative '../../services/importer/lib/importer/column'
require_relative '../../services/datasources/lib/datasources/decorators/factory'
require_relative '../../services/table-geocoder/lib/internal-geocoder/latitude_longitude'
require_relative '../model_factories/layer_factory'
require_relative '../model_factories/map_factory'
require_relative '../../lib/cartodb/stats/user_tables'
require_relative '../../lib/cartodb/stats/importer'
require_dependency 'carto/table_utils'
require_dependency 'carto/valid_table_name_proposer'

class Table
  extend Forwardable
  include Carto::TableUtils
  include ::LoggerHelper

  # TODO Part of a service along with schema
  # INFO: created_at and updated_at cannot be dropped from existing tables without dropping the triggers first
  CARTODB_REQUIRED_COLUMNS = %w{cartodb_id the_geom}.freeze
  CARTODB_COLUMNS = %w{cartodb_id created_at updated_at the_geom}.freeze
  THE_GEOM_WEBMERCATOR = :the_geom_webmercator
  THE_GEOM = :the_geom
  CARTODB_ID = :cartodb_id
  DATATYPE_DATE = 'date'.freeze

  NO_GEOMETRY_TYPES_CACHING_TIMEOUT = 5.minutes
  GEOMETRY_TYPES_PRESENT_CACHING_TIMEOUT = 24.hours
  STATEMENT_TIMEOUT = 1.hour * 1000

  DEFAULT_THE_GEOM_TYPE = 'geometry'

  VALID_GEOMETRY_TYPES = %W{ geometry multipolygon point multilinestring }

  def_delegators :relator, *CartoDB::TableRelator::INTERFACE
  def_delegators :@user_table, *::UserTable::INTERFACE

  attr_accessor :user_table

  def initialize(args = {})
    if args[:user_table].nil?
      # TODO: This won't work, you need to UserTable.new.set_fields(args, args.keys)
      @user_table = model_class.new(args)
    else
      @user_table = args[:user_table]
    end
    self.user_id = args[:user_id] if args[:user_id].present?
    # TODO: this probably makes sense only if user_table is not passed as argument
    @user_table.set_service(self)
  end

  # This is here just for testing purposes (being able to test this service against both models)
  def model_class
    Carto::UserTable
  end

  # forwardable does not work well with this one
  def layers
    @user_table.layers
  end

  def save
    # TODO: kept for compatibility reasons on tests on both models, until 100% removal of ::UserTable support
    if @user_table.respond_to?(:save!)
      @user_table.save!
    else
      @user_table.save
    end

    self
  end

  def update(args)
    # Sequel and ActiveRecord #update don't behave equally, we need this workaround for compatibility reasons
    if @user_table.is_a?(Carto::UserTable)
      @user_table.update_attributes(args)
    else
      @user_table.update(args)
    end
    self
  end

  def reload
    @user_table.reload
    self
  end

  # ----------------------------------------------------------------------------

  def geometry_types_key
    "#{redis_key}:geometry_types"
  end

  def geometry_types
    # default return value
    types = []

    types_str = cache.get geometry_types_key
    if types_str.present?
      # cache hit
      types = JSON.parse(types_str)
    else
      # cache miss, query and store
      types = query_geometry_types
      timeout = types.empty? ? NO_GEOMETRY_TYPES_CACHING_TIMEOUT : GEOMETRY_TYPES_PRESENT_CACHING_TIMEOUT
      cache.setex(geometry_types_key, timeout, types)
    end

    types
  end

  def is_raster?
    schema.select { |key, value| value == 'raster' }.length > 0
  end

  attr_accessor :force_schema,
                :import_from_file,
                :import_from_url,
                :import_from_query,
                :import_from_table_copy,
                :importing_encoding,
                :the_geom_type_value,
                :migrate_existing_table,
                # this flag is used to register table changes only without doing operations on in the database
                # for example when the table is renamed or created. For remove see keep_user_database_table
                :register_table_only,
                :new_table,
                # Handy for rakes and custom ghost table registers, won't delete user table in case of error
                :keep_user_database_table

  # Getter by table uuid using canonical visualizations
  # @param table_id String
  # @param viewer_user ::User
  def self.get_by_id(table_id, viewer_user)
    table = nil
    return table unless viewer_user

    table_temp = Carto::UserTable.where(id: table_id).first.service
    unless table_temp.nil?
      vis = CartoDB::Visualization::Collection.new.fetch(
          user_id: viewer_user.id,
          map_id: table_temp.map_id,
          type: Carto::Visualization::TYPE_CANONICAL
      ).first
      table = vis.table unless vis.nil?
    end
    table
  end

  # Getter by table uuid using canonical visualizations. No privacy checks
  # @param table_id String
  def self.get_by_table_id(table_id)
    table_temp = Carto::UserTable.where(id: table_id).first
    table_temp.service unless table_temp.nil?
  end

  # Get a list of tables given an array with the names
  # (can be fully qualified).
  # it also needs the user used to search a table when the
  # name is not qualified
  def self.get_all_by_names(names, viewer_user)
    names.map { |t|
      user_id = viewer_user.id
      table_name, table_schema = Table.table_and_schema(t)
      unless table_schema.nil?
        owner = ::User.where(username:table_schema).first
        unless owner.nil?
          user_id = owner.id
        end
      end
      ::UserTable.where(user_id: user_id, name: table_name).first
    }
  end

  # TODO: REFACTOR THIS patch introduced to continue with #3664
  def self.get_all_user_tables_by_names(names, viewer_user)
    names.map { |t|
      user_id = viewer_user.id
      table_name, table_schema = Table.table_and_schema(t)
      unless table_schema.nil?
        owner = ::User.where(username:table_schema).first
        unless owner.nil?
          user_id = owner.id
        end
      end
      Carto::UserTable.where(user_id: user_id, name: table_name).first
    }
  end

  def self.table_and_schema(table_name)
    if table_name =~ /\./
      table_name, schema = table_name.split('.').reverse
      # remove quotes from schema
      schema = schema.delete('"')
      [table_name, (schema if schema != 'public')]
    else
      [table_name, nil]
    end
  end

  def data_import
    @data_import ||= DataImport.where(id: @user_table.data_import_id).first || DataImport.new(user_id: owner.id)
  end

  ## Callbacks

  def import_to_cartodb(uniname = nil)
    if migrate_existing_table.present? || uniname
      data_import.data_type = DataImport::TYPE_EXTERNAL_TABLE if data_import.data_type.nil?
      data_import.data_source = migrate_existing_table || uniname
      data_import.save

      # ensure unique name, also ensures self.name can override any imported table name
      uniname = get_valid_name(name ? name : migrate_existing_table) unless uniname

      # Make sure column names are sanitized. Make it consistently.
      sanitize_columns(table_name: uniname, database_schema: owner.database_schema, connection: owner.in_database)

      # with table #{uniname} table created now run migrator to CartoDBify
      hash_in = ::SequelRails.configuration.environment_for(Rails.env).merge(
        'host' => owner.database_host,
        'database' => owner.database_name,
        :logger => ::Rails.logger,
        'username' => owner.database_username,
        'password' => owner.database_password,
        :schema => owner.database_schema,
        :current_name => migrate_existing_table || uniname,
        :suggested_name => uniname,
        :debug => Rails.env.development?,
        :remaining_quota => owner.remaining_quota,
        :remaining_tables => owner.remaining_table_quota,
        :data_import_id => data_import.id
      ).symbolize_keys
      importer = CartoDB::Migrator.new(hash_in)
      imported_name = importer.migrate!
      data_import.reload
      data_import.save
      imported_name
    end
  end

  # TODO: basically most if not all of what the import_cleanup does is done by cartodbfy.
  # Consider deletion.
  def import_cleanup
      # When tables are created using ogr2ogr they are added a ogc_fid or gid primary key
      # In that case:
      #  - If cartodb_id already exists, remove ogc_fid
      #  - If cartodb_id does not exist, treat this field as the auxiliary column
      aux_cartodb_id_column = [:ogc_fid, :gid].find { |col| valid_cartodb_id_candidate?(col) }

      # Remove primary key
      owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |user_database|
        existing_pk = user_database[%Q{
          SELECT c.conname AS pk_name
          FROM pg_class r, pg_constraint c, pg_namespace n
          WHERE r.oid = c.conrelid AND contype='p' AND relname = '#{name}'
          AND r.relnamespace = n.oid and n.nspname= '#{owner.database_schema}'
        }].first

      existing_pk = existing_pk[:pk_name] unless existing_pk.nil?

        user_database.run(%{
          ALTER TABLE #{qualified_table_name} DROP CONSTRAINT "#{existing_pk}"
        }) unless existing_pk.nil?
      end

      # All normal fields casted to text
      self.schema(reload: true, cartodb_types: false).each do |column|
        if column[1] =~ /^character varying/
          owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
            user_database.run(%{ALTER TABLE #{qualified_table_name} ALTER COLUMN "#{column[0]}" TYPE text})
          end
        end
      end

      # If there's an auxiliary column, copy to cartodb_id and restart the sequence to the max(cartodb_id)+1
      if aux_cartodb_id_column.present?
        begin
          already_had_cartodb_id = false
          owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
            user_database.run(%{ALTER TABLE #{qualified_table_name} ADD COLUMN cartodb_id SERIAL})
          end
        rescue StandardError
          already_had_cartodb_id = true
        end
        unless already_had_cartodb_id
          owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
            user_database.run(%{
              UPDATE #{qualified_table_name}
              SET cartodb_id = CAST(#{aux_cartodb_id_column} AS INTEGER)
            })

            cartodb_id_sequence_name = user_database[%{
              SELECT pg_get_serial_sequence('#{owner.database_schema}.#{name}', 'cartodb_id')
            }].first[:pg_get_serial_sequence]
            max_cartodb_id = user_database[%{SELECT max(cartodb_id) FROM #{qualified_table_name}}].first[:max]
            # only reset the sequence on real imports.

            if max_cartodb_id
              user_database.run("ALTER SEQUENCE #{cartodb_id_sequence_name} RESTART WITH #{max_cartodb_id + 1}")
            end
          end
        end
        owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
          user_database.run(%{ALTER TABLE #{qualified_table_name} DROP COLUMN #{aux_cartodb_id_column}})
        end
      end
  end

  def before_create
    raise CartoDB::QuotaExceeded if owner.over_table_quota?

    # The Table model only migrates now, never imports
    if migrate_existing_table.present?
      if @user_table.data_import_id.nil? #needed for non ui-created tables
        @data_import  = DataImport.new(:user_id => self.user_id)
        @data_import.updated_at = Time.now
        @data_import.save
      else
        @data_import  = DataImport.find(:id=>@user_table.data_import_id)
      end

      importer_result_name = import_to_cartodb(name)

      @data_import.reload
      @data_import.table_name = importer_result_name
      @data_import.save

      self[:name] = importer_result_name

      set_the_geom_column!

      import_cleanup
      self.cartodbfy

      @data_import.save
    else
      if !register_table_only.present?
        create_table_in_database!
        set_the_geom_column!(self.the_geom_type)
        self.cartodbfy
      end
    end

    self.schema(reload:true)
    set_table_id
  rescue StandardError => e
    self.handle_creation_error(e)
  end

  def after_create
    grant_select_to_tiler_user

    @force_schema = nil
    self.new_table = true

    # finally, close off the data import
    if @user_table.data_import_id && !register_table_only.present?
      @data_import = DataImport.find(id: @user_table.data_import_id)
      @data_import.table_id   = id
      @data_import.table_name = name
      @data_import.save

      if !@data_import.privacy.nil?
        if !self.owner.valid_privacy?(@data_import.privacy)
          raise "Error: User '#{self.owner.username}' doesn't have private tables enabled"
        end
        @user_table.privacy = @data_import.privacy
      end

      @user_table.save

      decorator = CartoDB::Datasources::Decorators::Factory.decorator_for(@data_import.service_name)
      if !decorator.nil? && decorator.decorates_layer?
        self.map.layers.each do |layer|
          decorator.decorate_layer!(layer)
          layer.save if decorator.layer_eligible?(layer)  # skip .save if nothing changed
        end
      end
    end
    add_table_to_stats

    update_table_pg_stats

  rescue StandardError => e
    handle_creation_error(e)
  end

  def before_save
    @user_table.updated_at = table_visualization.updated_at if table_visualization
  end

  def after_save
    manage_tags
    update_name_changes

    CartoDB::TablePrivacyManager.new(@user_table).apply_privacy_change(self, previous_privacy, privacy_changed?)
    update_cdb_tablemetadata if privacy_changed? || !@name_changed_from.nil?
  end

  def propagate_namechange_to_table_vis
    table_visualization.name = name
    table_visualization.store
  end

  def grant_select_to_tiler_user
    owner.in_database(:as => :superuser).run(%Q{GRANT SELECT ON #{qualified_table_name} TO #{CartoDB::TILE_DB_USER};})
  end

  def optimize
    owner.db_service.in_database_direct_connection({statement_timeout: STATEMENT_TIMEOUT}) do |user_direct_conn|
      user_direct_conn.run(%Q{
        VACUUM ANALYZE #{qualified_table_name}
        })
    end
  rescue StandardError => e
    CartoDB::notify_exception(e, { user: owner })
    false
  end

  def handle_creation_error(e)
    log_info(message: 'table#create error', exception: e)
    # Remove the table, except if it already exists
    unless self.name.blank? || e.message =~ /relation .* already exists/
      @data_import.log.append ("Import ERROR: Dropping table #{qualified_table_name}") if @data_import

      self.remove_table_from_user_database unless keep_user_database_table
    end
    @data_import.log.append ("Import ERROR: #{e.message} Trace: #{e.backtrace}") if @data_import
    raise e
  end

  def after_destroy
    Carto::Tag.where(user_id: user_id, table_id: id).each(&:destroy)
    remove_table_from_stats

    cache.del geometry_types_key

    update_cdb_tablemetadata if real_table_exists?
    remove_table_from_user_database unless keep_user_database_table

    related_templates.each { |template| template.destroy }
  end

  def remove_table_from_user_database
    owner.in_database(:as => :superuser) do |user_database|
      user_database.transaction do
        # Give up if it cannot get ExclusiveLocks for DDL operations in a reasonable time
        user_database.run(%{SET LOCAL lock_timeout = '1s'})
        Carto::OverviewsService.new(user_database).delete_overviews qualified_table_name
        user_database.run(%{DROP TABLE IF EXISTS #{qualified_table_name}})
      end
    end
  end

  def real_table_exists?
    !get_table_id.nil?
  end

  def name=(value)
    value = value.downcase if value
    return if value == @user_table.name || value.blank?

    new_name = register_table_only ? value : get_valid_name(value)

    # Do not keep track of name changes until table has been saved
    unless new_record?
      @name_changed_from = @user_table.name if @user_table.name.present?
      update_cdb_tablemetadata
    end

    @user_table.name = new_name
  end

  def privacy_changed?
    @user_table.privacy_changed?
  end

  def redis_key
    "rails:table:#{id}"
  end

  # TODO: change name and refactor for ActiveRecord
  def sequel
    owner.in_database.from(sequel_qualified_table_name)
  end

  def rows_estimated(user=nil)
    user ||= self.owner
    user.in_database["SELECT reltuples::integer FROM pg_class WHERE oid = '#{self.name}'::regclass"].first[:reltuples]
  end

  # Preferred: `actual_row_count`
  def rows_counted
    actual_row_count
  end

  # Returns table size in bytes
  def table_size(user=nil)
    user ||= self.owner
    @table_size ||= Table.table_size(name, connection: user.in_database)
  end

  def self.table_size(name, options)
    options[:connection]['SELECT pg_total_relation_size(?) AS size', name].first[:size] / 2
  rescue Sequel::DatabaseError
    nil
  end

  def schema(options = {})
    first_columns     = []
    middle_columns    = []
    last_columns      = []
    owner.in_database.schema(name, schema: owner.database_schema, reload: options.fetch(:reload, true)).each do |column|
      next if column[0] == THE_GEOM_WEBMERCATOR

      calculate_the_geom_type if column[0] == :the_geom

      col_db_type = column[1][:db_type].starts_with?('geometry') ? 'geometry' : column[1][:db_type]
      col = [
        column[0],
        # Default/unset or set to true means we want cartodb types
        (options.include?(:cartodb_types) && options[:cartodb_types] == false ? col_db_type : col_db_type.convert_to_cartodb_type),
        col_db_type == 'geometry' ? 'geometry' : nil,
        col_db_type == 'geometry' ? the_geom_type : nil
      ].compact

      # Make sensible sorting for UI
      case column[0]
        when :cartodb_id
          first_columns.insert(0,col)
        when :the_geom
          first_columns.insert(1,col)
        when :created_at, :updated_at
          last_columns.insert(-1,col)
        else
          middle_columns << col
      end
    end

    # sort middle columns alphabetically
    middle_columns.sort! {|x,y| x[0].to_s <=> y[0].to_s}

    # group columns together and return
    (first_columns + middle_columns + last_columns).compact
  end

  def insert_row!(raw_attributes)
    primary_key = nil
    owner.in_database do |user_database|
      schema = user_database.schema(name, schema: owner.database_schema, reload: true).map{|c| c.first}
      raw_attributes.delete(:id) unless schema.include?(:id)
      attributes = raw_attributes.dup.select{ |k,v| schema.include?(k.to_sym) }
      if attributes.keys.size != raw_attributes.keys.size
        raise CartoDB::InvalidAttributes.new("Invalid rows: #{(raw_attributes.keys - attributes.keys).join(',')}")
      end
      begin
        primary_key = user_database.from(name).insert(make_sequel_compatible(attributes))
      rescue Sequel::DatabaseError => e
        message = e.message.split("\n")[0]
        raise message if message =~ /Quota exceeded by/

        invalid_column = nil

        # If the type don't match the schema of the table is modified for the next valid type
        invalid_value = (m = message.match(/"([^"]+)"$/)) ? m[1] : nil
        if invalid_value
          invalid_column = attributes.invert[invalid_value] # which is the column of the name that raises error
        else
          m = message.match(/PGError: ERROR:  value too long for type (.+)$/)
          if m
            candidate = schema(cartodb_types: false).select{ |c| c[1].to_s == m[1].to_s }.first
            if candidate
              invalid_column = candidate[0]
            end
          end
        end

        if invalid_column.nil?
          raise e
        else
          new_column_type = get_new_column_type(invalid_column)
          user_database.set_column_type(self.name, invalid_column.to_sym, new_column_type)
          retry
        end
      end
    end
    update_the_geom!(raw_attributes, primary_key)
    primary_key
  end

  MAX_UPDATE_ROW_RETRIES = 3

  def update_row!(row_id, raw_attributes)
    retries = 0

    rows_updated = 0
    owner.in_database do |user_database|
      schema = user_database.schema(name, schema: owner.database_schema, reload: true).map{|c| c.first}
      raw_attributes.delete(:id) unless schema.include?(:id)

      attributes = raw_attributes.dup.select{ |k,v| schema.include?(k.to_sym) }
      if attributes.keys.size != raw_attributes.keys.size
        raise CartoDB::InvalidAttributes, "Invalid rows: #{(raw_attributes.keys - attributes.keys).join(',')}"
      end

      if attributes.except(THE_GEOM).empty?
        if attributes.size == 1 && attributes.keys == [THE_GEOM]
          rows_updated = 1
        end
      else
        begin
          # update row
          rows_updated = user_database.from(name).filter(:cartodb_id => row_id).update(make_sequel_compatible(attributes))
        rescue Sequel::DatabaseError => e
          # If the type don't match the schema of the table is modified for the next valid type
          # TODO: STOP THIS MADNESS
          message = e.message.split("\n")[0]

          invalid_value = (m = message.match(/"([^"]+)"$/)) ? m[1] : nil
          if invalid_value
            invalid_column = attributes.invert[invalid_value] # which is the column of the name that raises error
            new_column_type = get_new_column_type(invalid_column)
            if new_column_type
              user_database.set_column_type self.name, invalid_column.to_sym, new_column_type
              if (retries += 1) > MAX_UPDATE_ROW_RETRIES
                log_error(message: 'Max update_row! retries reached',
                                      user_id: user_id,
                                      qualified_table_name: qualified_table_name,
                                      row_id: row_id,
                                      raw_attributes: raw_attributes,
                                      exception: e)
              else
                retry
              end
            end
          else
            raise e
          end
        end
      end
    end
    update_the_geom!(raw_attributes, row_id)
    rows_updated
  end

  # make all identifiers SEQUEL Compatible
  # https://github.com/Vizzuality/cartodb/issues/331
  def make_sequel_compatible(attributes)
    attributes.except(THE_GEOM).convert_nulls.each_with_object({}) { |(k, v), h| h[Sequel.identifier(k)] = v }
  end

  def add_column!(options)
    raise CartoDB::InvalidColumnName if CartoDB::Importer2::Column.rejected?(options[:name])
    type = options[:type].convert_to_db_type
    cartodb_type = options[:type].convert_to_cartodb_type
    # FIXME: consider CartoDB::Importer2::Column.get_valid_column_name with CURRENT_COLUMN_SANITIZATION_VERSION
    column_name = CartoDB::Importer2::Column.sanitize_name(options[:name].to_s)
    owner.in_database.add_column name, column_name, type

    update_cdb_tablemetadata
    return {:name => column_name, :type => type, :cartodb_type => cartodb_type}
  rescue StandardError => e
    if e.message =~ /^(PG::Error|PGError|PG::UndefinedObject)/
      raise CartoDB::InvalidType, e.message
    else
      raise e
    end
  end

  def drop_column!(options)
    raise if CARTODB_COLUMNS.include?(options[:name].to_s)
    owner.in_database.drop_column name, options[:name].to_s

    update_cdb_tablemetadata
  end

  def modify_column!(options)
    old_name  = options.fetch(:name, '').to_s
    new_name  = options.fetch(:new_name, '').to_s

    # FIXME: consider CartoDB::Importer2::Column.get_valid_column_name with CURRENT_COLUMN_SANITIZATION_VERSION
    old_sanitized_name = CartoDB::Importer2::Column.sanitize_name(old_name)
    raise 'This column cannot be modified' if CARTODB_COLUMNS.include?(old_sanitized_name.to_s)

    if new_name.present? && new_name != old_name
      new_sanitized_name = CartoDB::Importer2::Column.sanitize_name(new_name)
      rename_column(old_sanitized_name, new_sanitized_name)
    end

    column_name = (new_name.present? ? new_sanitized_name : old_sanitized_name)
    convert_column_datatype(owner.in_database, name, column_name, options[:type])
    column_type = column_type_for(column_name)

    update_cdb_tablemetadata
    { name: column_name, type: column_type, cartodb_type: column_type.convert_to_cartodb_type }
  end #modify_column!

  def column_type_for(column_name)
    schema(cartodb_types: false, reload: true).select { |c|
      c[0] == column_name.to_sym
    }.first[1]
  end #column_type_for

  def self.column_names_for(db, table_name, owner)
    db.schema(table_name, schema: owner.database_schema, reload: true).map{ |s| s[0].to_s }
  end #column_names

  def rename_column(old_name, new_name='')
    raise 'Please provide a column name' if new_name.empty?
    raise 'This column cannot be renamed' if CARTODB_COLUMNS.include?(old_name.to_s)

    if CartoDB::Importer2::Column.reserved_or_unsupported?(new_name) || CARTODB_COLUMNS.include?(new_name)
      raise CartoDB::InvalidColumnName, 'That column name is reserved, please choose a different one'
    end

    self.owner.in_database do |user_database|
      if Table.column_names_for(user_database, name, self.owner).include?(new_name)
        raise 'Column already exists'
      end
      user_database.execute %{
          ALTER TABLE "#{name}" RENAME COLUMN "#{old_name}" TO "#{new_name}"
      }
    end
  end #rename_column

  def convert_column_datatype(database, table_name, column_name, new_type)
    CartoDB::ColumnTypecaster.new(
      user_database:  database,
      schema:         self.owner.database_schema,
      table_name:     table_name,
      column_name:    column_name,
      new_type:       new_type
    ).run
  end

  def records(options = {})
    rows = []
    records_count = 0
    page, per_page = CartoDB::Pagination.get_page_and_per_page(options)
    order_by_column = options[:order_by] || 'cartodb_id'
    mode = (options[:mode] || 'asc').downcase == 'asc' ? 'ASC' : 'DESC NULLS LAST'

    filters = options.slice(:filter_column, :filter_value).reject{|k,v| v.blank?}.values
    where = filters.present? ? "WHERE (#{filters.first})|| '' ILIKE '%#{filters.second}%'" : ''

    owner.in_database do |user_database|
      columns_sql_builder = <<-SQL
      SELECT array_to_string(ARRAY(SELECT '"#{name}"' || '.' || quote_ident(c.column_name)
        FROM information_schema.columns As c
        WHERE table_name = '#{name}'
        AND c.column_name <> 'the_geom_webmercator'
        ), ',') AS column_names
      SQL

      column_names = user_database[columns_sql_builder].first[:column_names].split(',')
      the_geom_index = column_names.index("\"#{name}\".the_geom")
      if the_geom_index
        column_names[the_geom_index] = <<-STR
            CASE
            WHEN GeometryType(the_geom) = 'POINT' THEN
              ST_AsGeoJSON(the_geom,8)
            WHEN (the_geom IS NULL) THEN
              NULL
            ELSE
              'GeoJSON'
            END the_geom
        STR
      end
      select_columns = column_names.join(',')

      # Counting results can be really expensive, so we estimate
      #
      # See https://github.com/Vizzuality/cartodb/issues/716
      #
      max_countable_rows = 65535 # up to this number we accept to count
      rows_count_is_estimated = true
      rows_count = rows_estimated
      if rows_count <= max_countable_rows
        rows_count = rows_counted
        rows_count_is_estimated = false
      end

      # If we force to get the name from an schema, we avoid the problem of having as
      # table name a reserved word, such 'as'
      #
      # NOTE: we fetch one more row to verify estimated rowcount is not short
      #
      rows = user_database[%Q{
        SELECT #{select_columns} FROM #{qualified_table_name} #{where} ORDER BY "#{order_by_column}" #{mode} LIMIT #{per_page}+1 OFFSET #{page}
      }].all

      # Tweak estimation if needed
      fetched = rows.length
      fetched += page if page

      have_more = rows.length > per_page
      rows.pop if have_more

      records_count = rows_count
      if rows_count_is_estimated
        if have_more
          records_count = fetched > rows_count ? fetched : rows_count
        else
          records_count = fetched
        end
      end

      # TODO: cache row count !!
      # See https://github.com/Vizzuality/cartodb/issues/459
    end

    {
      :id         => id,
      :name       => name,
      :total_rows => records_count,
      :rows       => rows
    }
  end

  def record(identifier)
    row = nil
    owner.in_database do |user_database|
      select_sql = schema.map { |column|
        name, type = column
        if name == THE_GEOM
          "ST_AsGeoJSON(the_geom,8) as the_geom"
        elsif type == DATATYPE_DATE
          %{CAST("#{name}" AS text) AS "#{name}"}
        else
          %{"#{name}"}
        end
      }.join(',')
      # If we force to get the name from an schema, we avoid the problem of having as
      # table name a reserved word, such 'as'
      row = user_database["SELECT #{select_sql} FROM #{qualified_table_name} WHERE cartodb_id = #{identifier}"].first
    end
    raise if row.nil?

    # `.schema` returns [name, type] pairs, except for geometry types where it returns additional data we don't need
    db_schema = schema.map { |col_data| col_data.first(2) }.to_h
    row.map { |name, value|
      parsed_value = db_schema[name] == DATATYPE_DATE && value ? DateTime.parse(value) : value
      [name, parsed_value]
    }.to_h
  end

  def run_query(query)
    owner.db_service.run_pg_query(query)
  end

  def georeference_from!(options = {})
    if !options[:latitude_column].blank? && !options[:longitude_column].blank?
      set_the_geom_column!('point')

      owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_conn|
        CartoDB::InternalGeocoder::LatitudeLongitude.new(user_conn).geocode(owner.database_schema, self.name, options[:latitude_column], options[:longitude_column])
      end
      schema(reload: true)
    else
      raise InvalidArgument
    end
  end

  def the_geom_type
    self.the_geom_type_value
  end

  def the_geom_type=(value)
    self.the_geom_type_value = case value.downcase
      when 'geometry'
        'geometry'
      when 'point'
        'point'
      when 'line'
        'multilinestring'
      when 'multipoint'
        'point'
      else
        value !~ /^multi/ ? "multi#{value.downcase}" : value.downcase
    end
    raise CartoDB::InvalidGeomType.new(self.the_geom_type_value) unless VALID_GEOMETRY_TYPES.include?(self.the_geom_type_value)
  end

  # if the table is already renamed, we just need to update the name attribute
  def synchronize_name(name)
    self[:name] = name
    save
  end

  def oid
    @oid ||= owner.in_database["SELECT '#{qualified_table_name}'::regclass::oid"].first[:oid]
  end

  # DB Triggers and things

  def has_trigger?(trigger_name)
    owner.in_database(:as => :superuser).select('trigger_name').from(:information_schema__triggers)
      .where(:event_object_catalog => owner.database_name,
             :event_object_table => self.name,
             :trigger_name => trigger_name).count > 0
  end

  def has_index?(column_name)
    pg_indexes.any? { |i| i[:column] == column_name }
  end

  def pg_indexes
    owner.in_database(as: :superuser).fetch(%{
      SELECT
        a.attname as column, i.relname as name, ix.indisvalid as valid
      FROM
        pg_class t, pg_class i, pg_index ix, pg_attribute a, pg_namespace n
      WHERE
        t.oid = ix.indrelid
        AND i.oid = ix.indexrelid
        AND a.attrelid = t.oid
        AND a.attnum = ANY(ix.indkey)
        AND t.relkind = 'r'
        AND t.relname = '#{name}'
        AND n.nspname = '#{owner.database_schema}'
        AND t.relnamespace = n.oid;
    }).all
  end

  def create_index(column, prefix = '', concurrent: false)
    concurrently = concurrent ? 'CONCURRENTLY' : ''
    owner.in_database.execute(%{CREATE INDEX #{concurrently} "#{index_name(column, prefix)}" ON "#{name}"("#{column}")})
  end

  def drop_index(column, prefix = '', concurrent: false)
    concurrently = concurrent ? 'CONCURRENTLY' : ''
    owner.in_database.execute(%{DROP INDEX #{concurrently} "#{index_name(column, prefix)}"})
  end

  def cartodbfy
    start = Time.now
    schema_name = owner.database_schema
    table_name = "#{owner.database_schema}.#{self.name}"

    importer_stats.timing('cartodbfy') do
      owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_conn|
        user_conn.run(%Q{
          SELECT cartodb.CDB_CartodbfyTable('#{schema_name}'::TEXT,'#{table_name}'::REGCLASS);
        })
      end
    end

    elapsed = Time.now - start
    if @data_import
      CartoDB::Importer2::CartodbfyTime::instance(@data_import.id).add(elapsed)
    end
  rescue StandardError => exception
    if !!(exception.message =~ /Error: invalid cartodb_id/)
      raise CartoDB::CartoDBfyInvalidID
    else
      raise exception
    end
  end

  def update_table_pg_stats
    owner.in_database.execute(%{ANALYZE #{qualified_table_name};})
  rescue StandardError => exception
    if exception.message =~ /canceling statement due to statement timeout/i
      log_info(exception: exception, message: 'Analyze in import raised statement timeout')
    else
      raise exception
    end
  end

  def update_table_geom_pg_stats
    owner.in_database.execute(%{ANALYZE #{qualified_table_name}(the_geom);})
  rescue StandardError => exception
    if exception.message =~ /canceling statement due to statement timeout/i
      log_info(exception: exception, message: 'Analyze in import raised statement timeout')
    elsif exception.cause.is_a?(PG::UndefinedColumn)
      log_info(exception: exception, message: 'Analyze in import raised column does not exist')
    else
      raise exception
    end
  end

  def owner
    @owner ||= ::User[self.user_id]
  end

  def table_style
    self.map.data_layers.first.options['tile_style']
  end

  def data_last_modified
    owner.in_database
         .select(:updated_at)
         .from(Sequel.qualify(:cartodb, :cdb_tablemetadata))
         .where(tabname: Sequel.lit("'#{name}'::regclass")).first[:updated_at]
  rescue StandardError
    nil
  end

  # Simplify certain privacy values for the vizjson
  def privacy_text_for_vizjson
    privacy == UserTable::PRIVACY_LINK ? 'PUBLIC' : @user_table.privacy_text
  end

  def relator
    @relator ||= CartoDB::TableRelator.new(SequelRails.connection, self)
  end

  def set_table_id
    @user_table.table_id = self.get_table_id
  end

  def get_table_id
    record = owner.in_database.select(:pg_class__oid)
      .from(:pg_class)
      .join_table(:inner, :pg_namespace, :oid => :relnamespace)
      .where(:relkind => 'r', :nspname => owner.database_schema, :relname => name).first
    record.nil? ? nil : record[:oid]
  end # get_table_id

  def changing_name?
    @name_changed_from.present?
  end

  def update_name_changes
    if @name_changed_from.present? && @name_changed_from != name
      reload

      unless register_table_only
        begin
          #  Underscore prefixes have a special meaning in PostgreSQL, hence the ugly hack
          Carto::OverviewsService.new(owner.in_database).rename_overviews @name_changed_from, name
          if name.start_with?('_')
            temp_name = "t" + "#{9.times.map { rand(9) }.join}" + name
            owner.in_database.rename_table(@name_changed_from, temp_name)
            owner.in_database.rename_table(temp_name, name)
          else
            owner.in_database.rename_table(@name_changed_from, name)
          end
        rescue StandardError => exception
          exception_to_raise = CartoDB::BaseCartoDBError.new(
              "Table update_name_changes(): '#{@name_changed_from}' doesn't exist", exception)
          CartoDB::notify_exception(exception_to_raise, user: owner)
          raise exception_to_raise
        end
      end

      begin
        propagate_name_change_to_analyses
        propagate_namechange_to_table_vis
        if @user_table.layers.blank?
          exception_to_raise = CartoDB::TableError.new("Attempt to rename table without layers #{qualified_table_name}")
          CartoDB::notify_exception(exception_to_raise, user: owner)
        else
          @user_table.layers.each do |layer|
            layer.rename_table(@name_changed_from, name).save
          end
        end
      rescue StandardError => exception
        log_error(
          exception: exception, message: 'Error renaming visualization',
          current_user: owner, error_detail: "Renaming from #{@name_changed_from} to #{name}"
        )
        raise exception
      end
    end
    @name_changed_from = nil
  end

  # @see https://github.com/jeremyevans/sequel#qualifying-identifiers-columntable-names
  def sequel_qualified_table_name
    Sequel.qualify(owner.database_schema, @user_table.name)
  end

  def qualified_table_name
    safe_schema_and_table_quoting(owner.database_schema, @user_table.name)
  end

  def database_schema
    owner.database_schema
  end

  # INFO: Qualified but without double quotes
  def self.is_qualified_name_valid?(name)
    (name =~ /^[a-z\-_0-9]+\.[a-z\-_0-9]+?$/) == 0
  end

  ############################### Sharing tables ##############################

  # @param [::User] organization_user Gives read permission to this user
  def add_read_permission(organization_user)
    perform_table_permission_change('CDB_Organization_Add_Table_Read_Permission', organization_user)
  end

  # @param [::User] organization_user Gives read and write permission to this user
  def add_read_write_permission(organization_user)
    perform_table_permission_change('CDB_Organization_Add_Table_Read_Write_Permission', organization_user)
  end

  # @param [::User] organization_user Removes all permissions to this user
  def remove_access(organization_user)
    perform_table_permission_change('CDB_Organization_Remove_Access_Permission', organization_user)
  end

  def add_organization_read_permission
    perform_organization_table_permission_change('CDB_Organization_Add_Table_Organization_Read_Permission')
  end

  def add_organization_read_write_permission
    perform_organization_table_permission_change('CDB_Organization_Add_Table_Organization_Read_Write_Permission')
  end

  def remove_organization_access
    perform_organization_table_permission_change('CDB_Organization_Remove_Organization_Access_Permission')
  end

  # Estimated row count and size
  def row_count_and_size
    begin
      # Keep in sync with lib/sql/scripts-available/CDB_Quota.sql -> CDB_UserDataSize()
      size_calc = is_raster? ? "pg_total_relation_size('\"' || ? || '\".\"' || relname || '\"')"
                                    : "pg_total_relation_size('\"' || ? || '\".\"' || relname || '\"') / 2"

      data = owner.in_database.fetch(
        %{
            SELECT
              #{size_calc} AS size,
              reltuples::integer AS row_count
            FROM pg_class
            JOIN pg_catalog.pg_namespace n on n.oid = pg_class.relnamespace
            WHERE relname = ?
            AND n.nspname = ?
          },
        owner.database_schema,
        name,
        database_schema
      ).first
    rescue StandardError => exception
      data = nil
      # INFO: we don't want code to fail because of SQL error
      CartoDB.notify_exception(exception)
    end
    data = { size: nil, row_count: nil } if data.nil?

    data
  end

  def estimated_row_count
    row_count_and_size = self.row_count_and_size
    row_count_and_size.nil? ? nil : row_count_and_size[:row_count]
  end

  def actual_row_count
    sequel.count
  end

  def pg_stats
    owner.in_database.fetch('SELECT * FROM pg_stats where schemaname = ? AND tablename = ?',
                            owner.database_schema, name).all
  end

  def beautify_name(name)
    return name unless name
    name.tr('_', ' ').split.map(&:capitalize).join(' ')
  end

  def update_cdb_tablemetadata
    owner.in_database(as: :superuser).run(%{ SELECT CDB_TableMetadataTouch(#{table_id}::oid::regclass) })
  rescue StandardError => e
    log_error(
      message: 'update_cdb_tablemetadata failed', exception: e, current_user: owner,
      table: { id: table_id, name: name, oid: get_table_id }
    )
  end

  def propagate_attribution_change(attributions)
    # This includes both the canonical and derived visualizations
    @user_table.layers.select(&:data_layer?).each do |layer|
      if layer.options['table_name'] == name
        layer.options['attribution'] = attributions
        layer.save
      end
    end
  end

  def table_visualization
    @user_table.table_visualization
  end

  def update_bounding_box
    update_table_geom_pg_stats
    bounds = Carto::BoundingBoxService.new(owner, name).table_bounds || Carto::BoundingBoxUtils::DEFAULT_BOUNDS

    polygon_sql = Carto::BoundingBoxUtils.to_polygon(bounds[:minx], bounds[:miny], bounds[:maxx], bounds[:maxy])
    update_sql = %{UPDATE visualizations SET bbox = #{polygon_sql} WHERE id = '#{table_visualization.id}';}
    SequelRails.connection.run(update_sql)
  end

  # Apply the sanitization defined for this table.
  # If the table_name parameter is passed, the sanitization which was originally applied to self
  # is applied to the table so named.
  # If no table_name parameters is paseed the normalization is applied to self.
  # This allows re-applying the same normalization to imported tables.
  def sanitize_columns(table_name: name, **options)
    self.class.sanitize_columns(table_name, column_sanitization_version, options)
  end

  def visualizations
    Carto::Visualization.where(id: user_table.affected_visualizations.map(&:id))
  end

  private

  def valid_cartodb_id_candidate?(col_name)
    return false unless column_names.include?(col_name)
    owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |db|
      return db["SELECT 1 FROM #{qualified_table_name} WHERE #{col_name} IS NULL LIMIT 1"].first.nil?
    end
  end

  def column_names
    schema.map(&:first)
  end

  def related_visualizations
    carto_layers = layers.map do |layer|
      Carto::Layer.find(layer.id) if layer.persisted?
    end

    carto_layers.flatten.compact.uniq.map(&:visualization).compact.uniq
  end

  def propagate_name_change_to_analyses
    related_visualizations.each do |visualization|
      visualization.analyses.each do |analysis|
        analysis.update_table_name(@name_changed_from, name)
      end
    end
  end

  def index_name(column, prefix)
    "#{prefix}#{name}_#{column}"
  end

  def previous_privacy
    # INFO: @user_table.initial_value(:privacy) weirdly returns incorrect value so using changes index instead
    privacy_changed? ? @user_table.privacy_was : nil
  end

  def importer_stats
    @importer_stats ||= CartoDB::Stats::Importer.instance
  end

  def calculate_the_geom_type
    return self.the_geom_type if self.the_geom_type.present?

    calculated = geometry_types.first
    calculated = calculated.present? ? calculated.downcase.sub('st_', '') : DEFAULT_THE_GEOM_TYPE
    self.the_geom_type = calculated
  end

  def query_geometry_types
    # We do not query the DB, if the_geom does not exist we just recover
    owner.in_database[distinct_geometry_sql].all.map { |r| r[:st_geometrytype] }
  rescue StandardError
    []
  end

  def distinct_geometry_sql
    %{
      SELECT DISTINCT ST_GeometryType(the_geom) FROM (
        SELECT the_geom
        FROM (
          SELECT the_geom
          FROM #{qualified_table_name}
          LIMIT 10000
        ) as limited
        WHERE (the_geom is not null) LIMIT 10
      ) as not_null
    }
  end

  def cache
    @cache ||= $tables_metadata
  end

  # Gets a valid postgresql table name for a given database
  # See http://www.postgresql.org/docs/9.5/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
  def get_valid_name(contendent)
    user_table_names = owner.tables.map(&:name)

    # We only want to check for UserTables names
    Carto::ValidTableNameProposer.new.propose_valid_table_name(contendent, taken_names: user_table_names)
  end

  def column_sanitization_version
    data_import&.column_sanitization_version || CartoDB::Importer2::Column::NO_COLUMN_SANITIZATION_VERSION
  end

  def self.sanitize_columns(table_name, column_sanitization_version, options={})

    connection = options.fetch(:connection)
    database_schema = options.fetch(:database_schema, 'public')

    connection.schema(table_name, schema: database_schema, reload: true).each do |column|
      column_name = column[0].to_s
      column_type = column[1][:db_type]
      column_name = ensure_column_has_valid_name(table_name, column_name, column_sanitization_version, options)
      if column_type == 'unknown'
        CartoDB::ColumnTypecaster.new(
          user_database:  connection,
          schema:         database_schema,
          table_name:     table_name,
          column_name:    column_name,
          new_type:       'text'
          ).run
      end
    end
  end

  def self.ensure_column_has_valid_name(table_name, column_name, column_sanitization_version, options={})
    connection = options.fetch(:connection)
    database_schema = options.fetch(:database_schema, 'public')

    valid_column_name = get_valid_column_name(table_name, column_name, column_sanitization_version, options)
    if valid_column_name != column_name
      connection.run(%Q{ALTER TABLE "#{database_schema}"."#{table_name}" RENAME COLUMN "#{column_name}" TO "#{valid_column_name}";})
    end

    valid_column_name
  end

  def self.get_valid_column_name(table_name, candidate_column_name, column_sanitization_version, options={})
    CartoDB::Importer2::Column.get_valid_column_name(candidate_column_name, column_sanitization_version, get_column_names(table_name, options))
  end

  def self.get_column_names(table_name, options={})
    connection = options.fetch(:connection)
    database_schema = options.fetch(:database_schema, 'public')
    table_schema = connection.schema(table_name, schema: database_schema, reload: true)
    table_schema.map { |column| column[0].to_s }
  end

  def get_new_column_type(invalid_column)
    next_cartodb_type = {
      "number" => "double precision",
      "string" => "text"
    }

    flatten_cartodb_schema = schema.flatten
    cartodb_column_type = flatten_cartodb_schema[flatten_cartodb_schema.index(invalid_column.to_sym) + 1]
    flatten_schema = schema(cartodb_types: false).flatten
    flatten_schema[flatten_schema.index(invalid_column.to_sym) + 1]
    next_cartodb_type[cartodb_column_type]
  end

  def set_the_geom_column!(type = nil)
    if type.nil?
      if self.schema(reload: true).flatten.include?(THE_GEOM)
        if self.schema.select{ |k| k[0] == THE_GEOM }.first[1] == 'geometry'
          row = owner.in_database["select GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} where #{THE_GEOM} is not null limit 1"].first
          if row
            type = row[:geometrytype]
          else
            type = DEFAULT_THE_GEOM_TYPE
          end
        else
          owner.in_database.execute %{
              ALTER TABLE #{qualified_table_name} RENAME COLUMN "#{THE_GEOM}" TO "the_geom_str"
          }
        end
      else # Ensure a the_geom column, of type point by default
        type = DEFAULT_THE_GEOM_TYPE
      end
    end
    return if type.nil?

    # if the geometry is MULTIPOINT we convert it to POINT
    if type.to_s.downcase == 'multipoint'
      owner.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
        user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_GeometryN(the_geom,1);")
      end
      type = 'point'
    end

    # if the geometry is LINESTRING or POLYGON we convert it to MULTILINESTRING and MULTIPOLYGON resp.
    if %w(linestring polygon).include?(type.to_s.downcase)
      owner.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
        user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_Multi(the_geom);")
        type = user_database["select GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} where #{THE_GEOM} is not null limit 1"].first[:geometrytype]
      end
    end

    raise "Error: unsupported geometry type #{type.to_s.downcase} in CARTO" unless VALID_GEOMETRY_TYPES.include?(type.to_s.downcase)

    type = type.to_s.upcase

    self.the_geom_type = type.downcase
    @user_table.save_changes unless @user_table.new_record?
  end

  def create_table_in_database!
    self.name ||= get_valid_name(self.name)

    owner.in_database do |user_database|
      if force_schema.blank?
        user_database.create_table sequel_qualified_table_name do
          column :cartodb_id, 'SERIAL PRIMARY KEY'
          String :name
          String :description, :text => true
        end
      else
        sanitized_force_schema = force_schema.split(',').map do |column|
          # Convert existing primary key into a unique key
          if column =~ /\A\s*\"([^\"]+)\"(.*)\z/
            "#{CartoDB::Importer2::Column.sanitize_name $1} #{$2.gsub(/primary\s+key/i,'UNIQUE')}"
          else
            column.gsub(/primary\s+key/i,'UNIQUE')
          end
        end
        sanitized_force_schema.unshift('cartodb_id SERIAL PRIMARY KEY')
        user_database.run(<<-SQL
          CREATE TABLE #{qualified_table_name} (#{sanitized_force_schema.join(', ')});
        SQL
        )
      end
    end
  end

  def update_the_geom!(attributes, primary_key)
    return unless attributes[THE_GEOM].present? && attributes[THE_GEOM] != 'GeoJSON'
    geojson = attributes[THE_GEOM]

    begin
      obj = JSON.parse(geojson)
      unless obj[:crs].present?
        obj[:crs] = JSON.parse('{"type":"name","properties":{"name":"EPSG:4326"}}');
      end
      geojson = JSON.generate(obj);

      owner.in_database(:as => :superuser).run(%Q{UPDATE #{qualified_table_name} SET the_geom =
      ST_Transform(ST_GeomFromGeoJSON('#{geojson}'),4326) where cartodb_id =
      #{primary_key}})
    rescue StandardError => e
      raise CartoDB::InvalidGeoJSONFormat, "Invalid geometry: #{e.message}"
    end
  end

  def manage_tags
    if @user_table[:tags].blank?
      Carto::Tag.where(user_id: user_id, table_id: id).each(&:destroy)
    else
      tag_names = @user_table.tags.split(',')
      table_tags = Carto::Tag.where(user_id: user_id, table_id: id).all
      unless table_tags.empty?
        # Remove tags that are not in the new names list
        table_tags.each do |tag|
          if tag_names.include?(tag.name)
            tag_names.delete(tag.name)
          else
            tag.destroy
          end
        end
      end
      # Create the new tags in the this table
      tag_names.each do |new_tag_name|
        new_tag = Carto::Tag.new(name: new_tag_name)
        new_tag.user_id = user_id
        new_tag.table_id = id
        new_tag.save
      end
    end
  end

  def add_table_to_stats
    CartoDB::Stats::UserTables.instance.update_tables_counter(1)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_user(1, self.owner.username)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_host(1)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_plan(1, self.owner.account_type)
  end

  def remove_table_from_stats
    CartoDB::Stats::UserTables.instance.update_tables_counter(-1)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_user(-1, self.owner.username)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_host(-1)
    CartoDB::Stats::UserTables.instance.update_tables_counter_per_plan(-1, self.owner.account_type)
  end

  ############################### Sharing tables ##############################

  # @param [String] cartodb_pg_func
  # @param [::User] organization_user
  def perform_table_permission_change(cartodb_pg_func, organization_user)
    from_schema = self.owner.database_schema
    table_name = self.name
    to_role_user = organization_user.database_username
    Carto::TableAndFriends.apply(owner.in_database, from_schema, table_name) do |schema, name|
      perform_cartodb_function(cartodb_pg_func, schema, name, to_role_user)
    end
  end

  def perform_organization_table_permission_change(cartodb_pg_func)
    from_schema = self.owner.database_schema
    table_name = self.name
    Carto::TableAndFriends.apply(owner.in_database, from_schema, table_name) do |schema, name|
      perform_cartodb_function(cartodb_pg_func, schema, name)
    end
  end

  def perform_cartodb_function(cartodb_pg_func, *args)
    self.owner.in_database do |user_database|
      query_args = args.join("','")
      user_database.run("SELECT cartodb.#{cartodb_pg_func}('#{query_args}');")
    end
  end
end