lib/parallel_dhis2.rb

Summary

Maintainability
A
25 mins
Test Coverage
A
94%
# frozen_string_literal: true

class ParallelDhis2
  # A class that takes a regular `Dhis2::Client` and allows us to use
  # parallel requests to send data values. It does this by using the
  # `typhoeus` gem, which handles all the parallelization without us
  # having to worry about it.
  #
  #      client = Dhis2::Client.new(dhis_configuration)
  #      parallel_dhis2 = ParallelDhis2.new(client)
  #      parallel_dhis2.post_data_value_sets(array_of_data_values)
  #
  # This class tries to borrow as much configuration from the regular
  # DHIS2 client as possible, it also tries to mimick the behavior of
  # the regular client.
  #
  # This means that the response gotten from `post_data_value_sets`
  # will be a `Dhis2::Status` as well, the only difference will be
  # that the `Dhis2::Status#raw_status` will contain a rolled up
  # version of the parallel requests being made.
  #
  # If HTTP errors are encountered, similar to the normal client,
  # exceptions will be raised, these are not the same ones that the
  # `Dhis2::Client` is raising (they would raise `RestClient`
  # exceptions, while here we're using our own namespace).
  #
  # Cookies!
  #
  # If the internal `Dhis2::Client` already obtained a JSESSION_ID, we
  # will reuse it by fetching it out of its cookie jar. The parallel
  # publisher will also run with cookies writing/gathering enabled, so
  # if we have already found a cookie we will use it. If not the first
  # request will create a new one, and then reuse it in the other
  # ones.
  #
  # The generated cookie will never be written back to the `Dhis2::Client`
  class HttpError < StandardError; end
  class TimedOut < HttpError; end
  class HttpException < HttpError; end

  # Amount of items to include in a single request
  SEND_VALUES_PER = 1000
  # Max amount of concurrent request, others will be queued until
  # queue has room again
  MAX_CONCURRENT_REQUESTS = 20

  class RollUpResponses
    ERROR = "ERROR"
    WARNING = "WARNING"
    SUCCESS = "SUCCESS"

    # Takes an array of Dhis2-responses and rolls them up into one
    # Dhis2-response which has the same layout as each of the
    # individual ones. It will have the status of the worst one. So
    # Error > Warning > Success.
    def initialize(responses)
      @responses = responses
    end

    def call
      return {} if @responses.empty?

      response = {
        "status"            => status,
        "description"       => description,
        "import_count"      => import_count,
        "response_type"     => response_type,
        "import_options"    => import_options,
        "data_set_complete" => data_set_complete
      }
      response.merge!("conflicts" => conflicts) if conflicts.any?
      response
    end

    def rolled_up
      @rolled_up ||= @responses.each_with_object({}) do |response, result|
        response.each do |key, value|
          result[key] ||= []
          result[key] << value
        end
      end
    end

    def status
      return ERROR if rolled_up["status"].include? ERROR
      return WARNING if rolled_up["status"].include? WARNING
      return SUCCESS if rolled_up["status"].include? SUCCESS

      ERROR
    end

    def conflicts
      (rolled_up["conflicts"] || []).flatten
    end

    def description
      rolled_up["description"].uniq.join(" && ") + " [parallel]"
    end

    def import_count
      # "{\"deleted\": 0, \"ignored\": 3, \"updated\": 411, \"imported\": 0}"
      rolled_up["import_count"].each_with_object({}) do |hash, result|
        hash.each do |key, value|
          result[key] ||= 0
          result[key] += value
        end
      end
    end

    def response_type
      # Always ImportSummary
      rolled_up["response_type"].uniq.first
    end

    def import_options
      rolled_up["import_options"].flatten
    end

    def data_set_complete
      # Practically always false
      (rolled_up["data_set_complete"] || []).compact.uniq.first
    end
  end

  class ClientWrapper
    # Wraps a regular Dhis2::Client in a loving embrace so we can get
    # access to `base_url`, `verify_ssl` and others.
    #
    # The tests are setup to break if a nicer interface than directly
    # getting the instance variables is set up.
    def initialize(dhis2_client)
      @client = dhis2_client
    end

    def user
      CGI.unescape(base_uri.user)
    end

    def password
      CGI.unescape(base_uri.password)
    end

    def url
      uri = base_uri.dup
      uri.user = nil
      uri.password = nil
      uri.to_s
    end

    def base_uri
      @base_uri ||= URI.parse(base_url)
    end

    # Returns a fully authenticated url to the DHIS2-instance
    def base_url
      @client.instance_variable_get(:@base_url)
    end

    def debug?
      @client.instance_variable_get(:@debug)
    end

    # Returns one of these:
    #  OpenSSL::SSL::VERIFY_NONE
    #  OpenSSL::SSL::VERIFY_PEER
    def verify_ssl_settings
      @client.instance_variable_get(:@verify_ssl)
    end

    def ssl_verify_peer?
      verify_ssl_settings == OpenSSL::SSL::VERIFY_PEER
    end

    def time_out_settings
      @client.instance_variable_get(:@timeout)
    end

    def cookies
      @client.class.class_variable_get(:@@cookie_jar)[base_url] || {}
    end

    # Most likely it will return:
    #       {Accept: "json", Content-Type: "application/json"}
    def post_headers
      headers = @client.send(:headers, "post", {})
      headers.delete(:params)
      headers["Accept"] = headers.delete(:accept).to_s
      headers["Content-Type"] = "application/#{headers.delete(:content_type)}"
      headers
    end
  end

  # dhis2_client - An instance of `Dhis2::Client`
  def initialize(dhis2_client)
    @client = ClientWrapper.new(dhis2_client)
    @cookie_jar = Tempfile.new
  end

  def cookie_file_path
    @cookie_jar.path
  end

  def prepare_payload(payload)
    Dhis2::Case.deep_change(payload, :camelize).to_json
  end

  def headers
    result = @client.post_headers
    if (session_id = @client.cookies.fetch("JSESSIONID", nil))
      result["Cookie"] = "JSESSIONID=#{session_id}"
    end
    result
  end

  # Mostly here as a sanity check if the client starts misbehaving,
  # will issue a single request and return the response, which will
  # have `body` and `code` and `return_code`, which helps debugging.
  def get(url = "/api/system/info")
    url = File.join(@client.url, url)
    request = Typhoeus::Request.new(url,
                                    method:         :get,
                                    headers:        headers,
                                    ssl_verifypeer: @client.ssl_verify_peer?,
                                    cookiefile:     cookie_file_path, # read from
                                    cookiejar:      cookie_file_path, # write to
                                    userpwd:        [@client.user, @client.password].join(":"))
    request.run
    request.response
  end

  def build_post_request(url, payload)
    body = prepare_payload(payload)
    Typhoeus::Request.new(url,
                          method:         :post,
                          headers:        headers,
                          body:           body,
                          timeout:        @client.time_out_settings,
                          ssl_verifypeer: @client.ssl_verify_peer?,
                          cookiefile:     cookie_file_path, # read from
                          cookiejar:      cookie_file_path, # write to
                          userpwd:        [@client.user, @client.password].join(":"))
  end

  def post_data_value_sets(all_values)
    hydra = Typhoeus::Hydra.new(max_concurrency: MAX_CONCURRENT_REQUESTS)
    requests = []
    url = File.join(@client.url, "api", "dataValueSets")
    all_values.each_slice(SEND_VALUES_PER).with_index do |values, i|
      request = build_post_request(url, dataValues: values)
      if @client.debug?
        request.on_complete do |response|
          # rubocop:disable Style/FormatStringToken
          #
          # I like that I can use the %02d part, right in the
          # formatting string, that's why I've disabled rubocop here.
          message = format("[parallel_dhis2] %s [%02d] completed. (%s took %s)",
                           url,
                           i + 1,
                           response.code,
                           response.total_time)
          # rubocop:enable Style/FormatStringToken
          puts message
        end
      end

      hydra.queue(request)
      requests << request
    end
    # This blocks until all requests are done.
    hydra.run

    responses = requests.map(&:response)
    parsed_responses = parse_responses(responses)
    raw_status = RollUpResponses.new(parsed_responses).call
    Dhis2::Status.new(raw_status)
  end

  def parse_responses(responses)
    check_for_errors!(responses)

    parsed = responses.map do |response|
      next if [nil, ""].include?(response.body)

      parsed_response = JSON.parse(response.body)
      Dhis2::Case.deep_change(parsed_response, :underscore)
    end
    parsed.compact
  end

  def check_for_errors!(responses)
    responses.each do |response|
      next if response.success?

      if response.timed_out?
        message = "#{response.effective_url} timed out"
        raise TimedOut, message
      else
        message = "#{response.effective_url} returned #{response.code}: #{response.return_message}"
        raise HttpException, message
      end
    end
  end
end