dpla/heidrun

View on GitHub
app/harvesters/nypl_harvester.rb

Summary

Maintainability
A
35 mins
Test Coverage
##
# A harvester for NYPL's API
#
class NyplHarvester
  include Krikri::Harvester

  DEFAULT_URI = 'http://api.repo.nypl.org/api/v1'
  DEFAULT_NAME = 'nypl'
  DEFAULT_BATCHSIZE = 10
  DEFAULT_THREAD_COUNT = 5

  ##
  # Initialize, and set default options as appropriate.
  #
  # @param opts [Hash] a hash of options as defined by {.expected_opts}
  #
  # @example
  #    Typical instantiation, good for most cases:
  #
  #        NyplHarvester.new(nypl: { apikey: '[key]' })
  #
  # Parameters to 'opts':
  #
  # - uri:        See Krikri::Harvester#initialize.
  #               Defaults to "http://api.repo.nypl.org/api/v1'"
  # - name:       See Krikri::Harvester#initialize.  Defaults to "nypl"
  # - nypl:
  #   - apikey:     The authorization token required to access the API.
  #   - batchsize:  The number of records to fetch with each API request.
  #                 Maps to the 'per_page' parameter on NYPL's API.
  #                 Defaults to 10.
  #   - threads:    The number of record pages to fetch in parallel.
  #                 Defaults to 5.
  #
  def initialize(opts = {})
    opts[:name] ||= DEFAULT_NAME
    @opts = opts.fetch(:nypl)
    @opts[:threads] ||= DEFAULT_THREAD_COUNT
    @opts[:batchsize] ||= DEFAULT_BATCHSIZE

    unless @opts[:apikey]
      msg = ':apikey option is required but missing'
      Krikri::Logger.log(:error, msg)
      fail msg
    end

    super({ uri: DEFAULT_URI }.merge(opts))

    @http = Krikri::AsyncUriGetter.new
  end

  ##
  # @see Krikri::Harvester.expected_opts
  def self.expected_opts
    {
      key: :nypl,
      opts: {
        apikey:  { type: :string, required: true },
        threads:  { type: :integer, required: false },
        batchsize:  { type: :integer, required: false }
      }
    }
  end

  ##
  # @see Krikri::Harvester#count
  def count
    collection_counts = list_collections.map do |collection_uuid|
      req = request("/items/#{collection_uuid}", page: 1, per_page: 0)
      req.with_response do |response|
        if response.status == 200
          Integer(Nokogiri::XML(response.body)
                   .xpath('//nyplAPI/response/numResults')[0].text)
        else
          msg = "request failed for URI #{req.uri}"
          Krikri::Logger.log(:error, msg)
          raise msg
        end
      end
    end

    collection_counts.reduce(0) { |a, e| a + e }
  end

  ##
  # @return [Enumerator] an enumerator of the records targeted by this
  #   harvester.
  def records
    Enumerator.new do |en|
      list_collections.each do |collection_uuid|
        page_count = collection_page_count(collection_uuid, @opts[:batchsize])

        # Fetch `thread_count` pages of records in parallel.
        (1..page_count).each_slice(@opts[:threads]) do |pages|
          requests = pages.map do |page|
            request("/items/#{collection_uuid}",
                    page: page,
                    per_page: @opts[:batchsize])
          end

          # Not strictly necessary, but just to ensure we wait for the
          # outstanding threads to finish before launching any others so we're
          # not hitting the API harder than we planned.
          requests.each(&:join)

          requests.each do |request|
            request.with_response do |response|
              if response.status == 200
                enumerate_records(response.body).each { |doc| en << doc }
              else
                msg = "request failed for URI #{request.uri}"
                Krikri::Logger.log(:error, msg)
              end
            end
          end
        end
      end
    end
  end

  private

  ##
  # Determine the number of pages of records we'll need to fetch for a given
  # collection.
  #
  # @param collection_uuid [String] The identifier of the collection
  #
  # @param page_size [Integer] The page size we're using
  #
  def collection_page_count(collection_uuid, page_size)
    req = request("/items/#{collection_uuid}", page: 1, per_page: page_size)
    req.with_response do |response|
      if response.status == 200
        Integer(Nokogiri::XML(response.body)
                 .xpath('//nyplAPI/request/totalPages').text)
      else
        msg = "request failed for URI #{req.uri}"
        Krikri::Logger.log(:error, msg)
        raise msg
      end
    end
  end

  ##
  # Convert an (XML) page of capture records into OriginalRecords
  #
  # @param response_xml [String] A page of capture records
  #
  # @return [Array<OriginalRecord>]
  #
  def enumerate_records(response_xml)
    record_list = Nokogiri::XML(response_xml)
                  .xpath('//nyplAPI/response/capture')
                  .map do |capture|
      {
        identifier: capture.xpath('./uuid').text,
        mods_item_url: capture.xpath('./apiUri').text + '.xml',
        capture_record: capture
      }
    end

    mods_records =
      fetch_item_mods_records(record_list.map { |r| r[:mods_item_url] })

    record_list.zip(mods_records).map do |record, mods_record|
      # We won't have a record if the fetch failed for some reason.
      next if mods_record.nil?

      item_record = mods_record.xpath('//nyplAPI/response/mods')[0]

      item_record.add_namespace('mods', 'http://www.loc.gov/mods/v3')
      item_record.add_child('<extension />')[0]
        .add_child(record[:capture_record])

      @record_class.build(mint_id(record[:identifier]), item_record.to_xml)
    end.compact
  end

  ##
  # @param list [List<String>] a list of mods URLs to fetch
  #
  # @return [List<Nokogiri::XML::Document, nil>] the list of parsed MODS
  #   records. The list may contain nils if fetching a given URL fails.
  #
  def fetch_item_mods_records(urls)
    urls.each_slice(@opts[:threads]).flat_map do |suburls|
      requests = suburls.map { |url| request(url) }
      requests.map do |request|
        request.with_response do |response|
          if response.status == 200
            Nokogiri::XML(response.body)
          else
            Krikri::Logger.log(:error,
                               "Failure when fetching #{request.uri}. " \
                               'Record skipped')
            nil
          end
        end
      end
    end
  end

  ##
  # @param endpoint_uri [String] the (relative or absolute) URI to be fetched
  #
  # @param params [Hash<String, String>] Additional URL parameters to be sent
  # with the request
  #
  def request(endpoint_uri, params = {})
    abs_url = if endpoint_uri.start_with?('/')
                "#{uri}#{endpoint_uri}.xml"
              else
                endpoint_uri
              end

    request_uri = URI.parse(abs_url)
    request_uri.query = URI.encode_www_form(params) unless params.empty?

    @http.add_request(uri: request_uri, headers: headers)
  end

  ##
  # @return [Hash] A suitable Authorization header for NYPL
  #
  def headers
    {
      'Authorization' => "Token token=\"#{@opts.fetch(:apikey)}\""
    }
  end

  ##
  # @return [Array<String>] A list of collection UUIDs eligible for harvest
  #
  def list_collections
    request('/items/roots').with_response do |response|
      if response.status == 200
        Nokogiri::XML(response.body)
          .xpath('//nyplAPI/response/uuids/uuid')
          .map(&:text)
      else
        msg = "couldn't fetch collection list: #{response.body}"
        Krikri::Logger.log(:error, msg)
        raise msg
      end
    end
  end
end