SpeciesFileGroup/taxonworks

View on GitHub
app/models/import_dataset/darwin_core.rb

Summary

Maintainability
A
0 mins
Test Coverage
class ImportDataset::DarwinCore < ImportDataset
  # self.abstract_class = true # TODO: Why causes app/views/shared/data/project/_show.html.erb to fail when visiting /import_datasets/list if uncommented?

  validate :core_records_are_readable, on: :create

  after_create -> (dwc) { ImportDatasetStageJob.perform_later(dwc) }

  before_destroy :destroy_namespace

  CHECKLIST_ROW_TYPE = 'http://rs.tdwg.org/dwc/terms/Taxon'.freeze
  OCCURRENCES_ROW_TYPE = 'http://rs.tdwg.org/dwc/terms/Occurrence'.freeze

  def initialize(params)
    import_settings = params&.delete(:import_settings)
    super(params)

    self.metadata = {
      core_headers: [],
      namespaces: {
        core: nil,
        eventID: nil
      }
    }

    set_import_settings(import_settings || {})
  end

  def core_records_fields
    dataset_record_fields.with_record_class(core_records_class)
  end

  # @param [string] file_path
  #   Path to DwC-A file
  # @return [Checklist, Occurrences, Unknown]
  # Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params
  def self.create_with_subtype_detection(params)
    core_type = nil

    return Unknown.new unless params[:source]

    begin
      path = params[:source].tempfile.path
      if path =~ /\.zip\z/i
        dwc = ::DarwinCore.new(path)
        core_type = dwc.core.data[:attributes][:rowType]

        ### Check all files are readable
        [dwc.core, *dwc.extensions].each do |table|
          table.read { |data, errors| raise 'Errors found when reading data' unless errors.empty? }
        end
      else
        if path =~ /\.(xlsx?|ods)\z/i
          headers = CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f.strip}).headers
        else
          col_sep = default_if_absent(params.dig(:import_settings, :col_sep), "\t")
          quote_char = default_if_absent(params.dig(:import_settings, :qoute_char), '"')
          headers = CSV.read(path, headers: true, col_sep: col_sep, quote_char: quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f.strip}).headers
        end

        row_type = params.dig(:import_settings, :row_type)
        if row_type
          core_type = row_type
        elsif headers.include? 'occurrenceID'
          core_type = OCCURRENCES_ROW_TYPE
        elsif headers.include? 'taxonID'
          core_type = CHECKLIST_ROW_TYPE
        end
      end
    rescue Errno::ENOENT, RuntimeError => e # TODO: dwc-archive gem should probably detect missing (or wrongly mapped) files and raise its own exception
      return Unknown.new(params.merge({error_message: "#{e.message}"}))
    end

    case core_type
    when OCCURRENCES_ROW_TYPE
      Occurrences.new(params)
    when CHECKLIST_ROW_TYPE
      Checklist.new(params)
    else
      Unknown.new(params.merge({error_message: "unknown DwC-A core type '#{core_type}'."}))
    end
  end

  # @return [String]
  # Sets up import dataset for import and returns UUID. If already started same UUID is returned (unless last activity was more than 10 minutes ago).
  # Do not call if there are changes that have not been persisted
  def start_import(&block)
    with_lock do
      case self.status
      when 'Ready'
        self.status = 'Importing'
        self.metadata['import_uuid'] = SecureRandom.uuid
      when 'Importing'
        self.metadata['import_uuid'] = SecureRandom.uuid if self.updated_at < 10.minutes.ago
      else
        raise 'Invalid initial state'
      end
      save!

      yield if block_given?
    end

    self.metadata['import_uuid']
  end

  # Sets import dataset to stop importing data. Do not call if there are changes that have not been persisted.
  def stop_import
    with_lock do
      if self.status == 'Importing'
        self.status = 'Ready'
        self.metadata.except!('import_uuid', 'import_start_id', 'import_filters', 'import_retry_errored')
        save!
      end
    end
  end

  # @return [Hash]
  # @param [Integer] max_time
  #   Maximum time to spend processing records.
  # @param [Integer] max_records
  #   Maximum number of records to be processed.
  # @param [Boolean] retry_errored
  #   Also looks up for errored records when importing (default is looking for records with Status=Ready)
  # @param [Hash] filters
  #   (Column-index, value) pairs of filters to apply when searching for records to import (default none)
  # @param [Integer] record_id
  #   Indicates the record to be imported (default none). When used filters are ignored.
  # Returns the updated dataset records. Do not call if there are changes that have not been persisted
  def import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil)
    imported = []

    lock_time = Time.now
    old_uuid = self.metadata['import_uuid']
    start_import do
      lock_time = Time.now - lock_time
      filters = self.metadata['import_filters'] if filters.nil?
      retry_errored = self.metadata['import_retry_errored'] if retry_errored.nil?
      start_id = self.metadata['import_start_id'] if retry_errored

      status = ['Ready']
      status << 'Errored' if retry_errored
      records = add_filters(core_records.where(status:), filters).order(:id).limit(max_records) #.preload_fields

      records = records.where(id: start_id..) if start_id
      records = core_records.where(id: record_id, status: %w{Ready Errored}) if record_id

      records = records.all
      start_time = Time.now - lock_time

      dwc_data_attributes = project.preferences['model_predicate_sets'].map do |model, predicate_ids|
        [model, Hash[
          *Predicate.where(id: predicate_ids)
            .select { |p| /^http:\/\/rs\.tdwg\.org\/dwc\/terms\/.*/ =~ p.uri }
            .map {|p| [p.uri.split('/').last, p]}
            .flatten
          ]
        ]
      end.to_h

      records.each do |record|
        imported << record.import(dwc_data_attributes)

        break if 1000.0*(Time.now - start_time).abs > max_time
      end

      if imported.any? && record_id.nil?
        reload
        self.metadata.merge!({
          'import_start_id' => imported.last&.id + 1,
          'import_filters' => filters,
          'import_retry_errored' => retry_errored
        })
        save!

        new_uuid = self.metadata['import_uuid']
        ImportDatasetImportJob.perform_later(self, new_uuid, max_time, max_records) unless old_uuid == new_uuid
      else
        self.stop_import
      end
    end

    imported
  end

  # @return [Hash]
  # Returns a hash with the record counts grouped by status
  def progress(filters: nil)
    add_filters(core_records, filters).group(:status).count
  end

  # Stages DwC-A records into DB.
  def stage
    if status == 'Staging' # ActiveJob being retried could cause this state
      transaction do
        core_records_fields.delete_all
        dataset_records.delete_all
      end
    end

    update!(status: 'Staging') if status == 'Uploaded'

    if status != 'Ready'
      perform_staging
      update!(status: 'Ready')
    end
  end

  # Sets import settings for this dataset
  def set_import_settings(import_settings)
    metadata['import_settings'] ||= {}
    import_settings.each { |k, v| metadata['import_settings'].merge!({k => v}) }

    metadata['import_settings']
  end

  def get_core_record_identifier_namespace
    id = metadata.dig('namespaces', 'core')

    if id.nil? || (@core_record_identifier_namespace ||= Namespace.find_by(id:)).nil?
      random = SecureRandom.hex(4)
      project_name = Project.find(Current.project_id).name

      namespace_name = "#{core_records_identifier_name} namespace for \"#{description}\" dataset in \"#{project_name}\" project [#{random}]"

      @core_record_identifier_namespace = Namespace.create!(
        name: namespace_name,
        short_name: "#{core_records_identifier_name}-#{random}",
        verbatim_short_name: core_records_identifier_name,
        delimiter: ':'
      )

      metadata.deep_merge!({
        'namespaces' => {
          'core' => @core_record_identifier_namespace.id
        }
      })
      save!
    end

    @core_record_identifier_namespace
  end

  def default_nomenclatural_code
    self.metadata.dig('import_settings', 'nomenclatural_code')&.downcase&.to_sym || :iczn
  end

  protected

  def get_records(path)
    records = { core: [], extensions: {} }
    headers = { core: [], extensions: {} }

    if path =~ /\.zip\z/i
      dwc = ::DarwinCore.new(path)

      headers[:core] = get_dwc_headers(dwc.core)
      records[:core] = get_dwc_records(dwc.core)

      dwc.extensions.each do |extension|
        type = extension.properties[:rowType]
        records[:extensions][type] = get_dwc_records(extension)
        headers[:extensions][type] = get_dwc_headers(extension)
      end
    elsif path =~ /\.(csv|txt|tsv|xlsx?|ods)\z/i
      # only strip whitespace on the headers with lambda functions because whitespace is stripped from the data elsewhere
      if path =~ /\.(csv|txt|tsv)\z/i
        records[:core] = CSV.read(path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f&.strip})
      else
        records[:core] = CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f&.strip})
      end
      records[:core] = records[:core].map { |r| r.to_h }
      headers[:core] = records[:core].first.to_h.keys
    else
      raise 'Unsupported input format'
    end

    return records, headers
  end

  def get_dwc_headers(table)
    headers = []

    headers[table.id[:index]] = 'id' if table.id
    table.fields.each { |f| headers[f[:index]] = get_normalized_dwc_term(f) if f[:index] }

    table.read_header.first&.each_with_index { |f, i| headers[i] ||= f.strip }

    get_dwc_default_values(table).each.with_index(headers.length) { |f, i| headers[i] = get_normalized_dwc_term(f) }

    headers
  end

  def get_dwc_records(table)
    records = []
    headers = get_dwc_headers(table)

    records = table.read.first.map do |row|
      record = {}
      row.each_with_index { |v, i| record[headers[i]] = v }
      defaults = get_dwc_default_values(table)
      defaults.each.with_index(headers.length - defaults.length) { |f, i| record[headers[i]] = f[:default] }
      record
    end

    return records
  end

  def get_field_mapping(field_name)
    get_fields_mapping[field_name.to_s.downcase]
  end

  private

  def self.default_if_absent(value, default)
    return default if value.nil? || value.empty?
    value
  end

  def get_col_sep
    DarwinCore.default_if_absent(metadata.dig("import_settings", "col_sep"), "\t")
  end

  def get_quote_char
    DarwinCore.default_if_absent(metadata.dig("import_settings", "quote_char"), "\"")
  end

  def get_fields_mapping
    @fields_mapping ||= metadata['core_headers']
      .reject(&:nil?)
      .each.with_index.inject({}) { |m, (h, i)| m.merge({ h.downcase => i, i => h}) }
  end

  def get_dwc_default_values(table)
    table.fields.select { |f| f.has_key? :default }
  end

  def get_normalized_dwc_term(field)
    # TODO: Think what to do about complex namespaces like "/std/Iptc4xmpExt/2008-02-29/" (currently returning the full URI as header)
    term = field[:term].match(/\/([^\/]+)\/terms\/.*(?<=\/)([^\/]+)\/?$/)
    #headers[field[:index]] = term ? term[1..2].join(":") : field[:term]
    term ? term[2] : field[:term]
  end

  def destroy_namespace
    Namespace.find_by(id: metadata['identifier_namespace'])&.destroy # If in use or gone no deletion happens
  end

  def add_filters(records, filters)
    filters&.each do |key, value|
      records = records.where(id: core_records_fields.at(key.to_i).having_value(value).select(:dataset_record_id))
    end
    records
  end

  def core_records_are_readable
    begin
      get_records(source.staged_path)
    rescue RuntimeError
      errors.add(:source, "A problem occurred when reading the data file. If this is a text file please make sure the selected string and field delimiters are correct.")
    end
    true
  end

  def check_field_set
    if source.staged?
      if source.staged_path =~ /\.zip\z/i
        headers = get_dwc_headers(::DarwinCore.new(source.staged_path).core)
      else
        if source.staged_path =~ /\.(xlsx?|ods)\z/i
          headers = CSV.parse(Roo::Spreadsheet.open(source.staged_path).to_csv, headers: true).headers
        else
          headers = CSV.read(source.staged_path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8').headers
        end
      end

      missing_headers = self.class::MINIMUM_FIELD_SET - headers

      missing_headers.each do |header|
        errors.add(:source, "required field #{header} missing.")
      end
    end
  end
end