npolar/api.npolar.no

View on GitHub
external/cls.fr/bin/npolar-argos-publish-xml

Summary

Maintainability
Test Coverage
#!/usr/bin/env ruby

# Ruby-based command to publish CLS Argos-system XML tracking data
#
# Usage:
# cd ~/npolar/api.npolar.no/external/cls.fr
# ./bin/npolar-argos-publish-xml /mnt/datasets/Tracking/ARGOS/ws-argos.cls.fr/2018/program-11660 https://api.npolar.no/tracking/arctic-fox
#
# For more information:
# https://github.com/npolar/api.npolar.no/tree/master/external/cls.fr/README.md

require "bundler/setup"
require "logger"
require "nokogiri"
require "uri"
require "argos"
require "npolar/api/client"


log = Logger.new(STDERR)
#log.level = Logger::WARN
log.level = Logger::DEBUG

def decode_lambda

   lambda {|d, decoder=nil|

     if d.key? "decoder"
       return d
     end

     if decoder.nil?
       if d["platform_model"] =~ /^KiwiSat303/i
         decoder = Argos::KiwiSat303Decoder.new
       elsif d["platform_model"] =~ /^(KiwiSat202|GPSARGOS)/i
         d["platform_model"] = "KiwiSat202"
         decoder = Argos::KiwiSat202Decoder.new
       end
     end

     if decoder.nil?
       return d
     end

     unless d["sensor_data"].nil? and d["sensor_hex"].nil?

       if d.key?("sensor_hex") and d["sensor_hex"].size >= 2
         decoder.sensor_data = d["sensor_hex"].scan(/[0-9a-f]{2}/i).map {|h| h.to_i(16) }
       else
         decoder.sensor_data = d["sensor_data"] # .map {|c| c = c.rjust(2, '0')}.join("")
       end

       d["decoder"] = decoder.class.name
       d["parser"] = "argos-ruby-#{Argos::VERSION}"

       decoder.data.each do |k,v|
         d[k]=v
       end

       #puts decoder.data.to_json
       d["sensor_variables"] = decoder.data.keys
     end
     d

   }

 end

def latest(api_client, platform=nil)
  param = { q: "",
    format: "json",
    variant: "atom",
    limit: 1,
    "filter-type" => "xml",
    "date-day" => "measured",
    fields: "platform,measured",
    sort: "-measured"
  }
  if not platform.nil?
    param[:limit] = "all"
    param[:"filter-platform"] = platform
    param[:"fields"] += ",_id,_rev,platform_model"
  end

  api_client.param = param
  api_client.authorization = true
  response = api_client.get

  existing = JSON.parse(response.body)
  if existing.nil? or not existing.key? "feed"
    raise "Failed: #{response.status}"
  end
  existing["feed"]
end

def deployments(platforms)
  client = Npolar::Api::Client::JsonApiClient.new("https://api.npolar.no/tracking/deployment")

  log = Logger.new(STDERR)
  log.level = Logger::WARN
  client.log = log

  param = { q: "",
    format: "json",
    variant: "array",
    limit: "all",
    "filter-platform" => platforms.join("|"),
    sort: "platform"
  }

  client.param = param
  response = client.get
  if 200 != response.status
    raise "Failed accessing #{uri}"
  end
  JSON.parse(response.body)
end

begin

  if not ENV.key?("NPOLAR_API_USERNAME") or not ENV.key?("NPOLAR_API_PASSWORD")
    raise "Please set NPOLAR_API_USERNAME and NPOLAR_API_PASSWORD"
  end
  if ARGV.size < 2
    raise "Usage: #{__dir__} /path/to/followit/archive https://api.npolar.no/tracking/endpoint"
  end
  if not File.exist? ARGV[0]
    raise "Source does not exist: #{ARGV[0]}"
  end
  if ARGV[1] !~ URI::regexp
    raise "Invalid destination URI #{ARGV[1]}"
  end

  source = ARGV[0]
  uri = ARGV[1]
  glob = File.join(source, "**/*.xml")


  api_client = Npolar::Api::Client::JsonApiClient.new(uri)
  api_client.log = log

  # get latest date already published
  latest_feed = latest(api_client)

  last = DateTime.new(1000)
  if not latest_feed["entries"].any?
    log.debug "0 existing documents"
    last = DateTime.new(1000)
  else
    last = DateTime.parse latest_feed["entries"][0]["measured"]
    log.debug "#{last.to_time.utc.iso8601} = last measured observation in #{uri} [totalResults: #{latest_feed["opensearch"]["totalResults"]}]"
  end
  #
  # argos_json_xslt = File.join(File.dirname(Argos.method(:library_version).source_location[0]), "argos", "_xslt", "argos-json.xslt")
  # if not File.exists? argos_json_xslt
  #  raise "Cannot find Argis XML to JSON stylesheet: #{argos_json_xslt}"
  # end
  argos_xml = Argos::Xml.new

  docs = [] # Holds all local XML messages
  add = {} # Hash with platform as key, holds messages to add (newer in XML that latest in API for that platform)
  c=0

  # or (File.mtime(f) < last.to_time.utc )
  Dir[glob].reject {|f|
    (File.directory? f or File.size(f) == 0)
  }.map do |f|
    log.debug f
    argos_xml.xml = f
    msgs = argos_xml.to_a.map {|d| decode_lambda.call(d)}


    if msgs.any?
      c = c + msgs.size
      #log.debug "#{msgs.size} messages / #{c}  <- #{f} [file modified #{File.mtime(f).to_json}]"

      docs += msgs
    end
  end

  if docs.any?

    platforms = docs.map {|d| d["platform"]}.uniq.sort
    depl = deployments(platforms)

    log.debug "Platforms (N=#{platforms.size}) in source XML: #{platforms.to_json} - found #{depl.size} deployments"

    platforms.each do |platform|

      deployments_for_platform = depl.select {|d| d["platform"] == platform }
      # Only add known platforms (ie. with deployment metadata)
      if deployments_for_platform.any?

        # 113909 has this problem
        if deployments_for_platform.size > 1
          log.warn "Multiple deployments for platform #{platform}:\n#{deployments_for_platform.to_json}"
        end

        local_xml_msgs_for_platform = docs.select {|d| d["platform"] == platform }.sort_by {|d|
          d["measured"]
        }

        api_latest_feed_for_platform = latest(api_client, platform)
        remote_api_msgs_for_platform = api_latest_feed_for_platform["opensearch"]["totalResults"]

        if api_latest_feed_for_platform["entries"].any?
          last_remote = api_latest_feed_for_platform["entries"][0]["measured"]
        else
          last_remote = nil
        end

        first_local = local_xml_msgs_for_platform.first["measured"]
        last_local = local_xml_msgs_for_platform.last["measured"]
        days_local = local_xml_msgs_for_platform.map {|m| Date.parse(m["measured"]).iso8601 }.uniq
        log.debug "Platform #{platform} local days (#{days_local.size}): #{days_local.to_json}, local XML: #{source} remote API: #{uri}"
        log.debug "Platform #{platform} local messages (XML): #{local_xml_msgs_for_platform.size}, remote messages (API): #{remote_api_msgs_for_platform}"
        log.debug "Platform #{platform} [first local, last local, last remote]: [#{first_local.to_json}, #{last_local.to_json}, #{last_remote.to_json}]"
        log.debug "Platform #{platform} deployments (N=#{deployments_for_platform.size}): #{deployments_for_platform}"

        remote_days_not_before_or_after_local_days = api_latest_feed_for_platform["facets"].select {|f|
          f.keys[0] == "day-measured"
        }[0]["day-measured"].map {|f| f["term"]}.sort.uniq.select {|isodate|
          (DateTime.parse(isodate) <= DateTime.parse(last_local)) and (DateTime.parse(isodate) >= DateTime.parse(first_local)) # might be newer messages in API for example next year...
        }
        log.debug "Remote days (#{remote_days_not_before_or_after_local_days.size}) #{remote_days_not_before_or_after_local_days} [not before or after local days]"
        #DateTime.parse(m["measured"]) > DateTime.parse(last_remote||"1000-01-01")

        days_local.each do |day|
          in_api_for_this_day = api_latest_feed_for_platform["entries"].select{|e|
            Date.parse(e["measured"]).iso8601 == day
          }

          if not add.key? platform
            add[platform] = []
          end
          local_xml_msgs_for_platform_for_this_day = local_xml_msgs_for_platform.select{|x|
            Date.parse(x["measured"]).iso8601 == day
          }
          adding = 0
          if in_api_for_this_day.size == 0 and local_xml_msgs_for_platform_for_this_day.any?
            adding = local_xml_msgs_for_platform_for_this_day.size
            add[platform] += local_xml_msgs_for_platform_for_this_day
          elsif in_api_for_this_day.size > 0 and local_xml_msgs_for_platform_for_this_day.size != in_api_for_this_day.size
            log.fatal "Messages in API (#{in_api_for_this_day.size}) != XML on disk (#{local_xml_msgs_for_platform_for_this_day.size})"
          elsif in_api_for_this_day.size == local_xml_msgs_for_platform_for_this_day.size
            # @todo merge in id, _id and _rev
            api_ks202 = in_api_for_this_day.select {|api| api["platform_model"] =~ /^(GPSARGOS|KiwiSat202)/i }
            local_ks202 = local_xml_msgs_for_platform_for_this_day.select {|l| l["platform_model"] =~ /^(GPSARGOS|KiwiSat202)/i }
            if local_ks202.any? and local_ks202.size == api_ks202.size

              # Uncomment the following block to UPDATE existing data in API
              # id_revs = api_ks202.map {|api| [api["_id"], api["_rev"]]}
              # i = 0
              # adding = local_ks202.size
              # add[platform] += local_ks202.map {|l|
              #
              #   log.info id_revs[i]
              #   #if l["platform_model"] = "GPSARGOS"
              #     l["platform_model"] = "KiwiSat202"
              #   #end
              #   l["id"] = id_revs[i][0]
              #   l["_id"] = id_revs[i][0]
              #   l["_rev"] = id_revs[i][1]
              #   i += 1
              #   l
              # }

            end
          end

          log.debug "#{day} (platform #{platform}): messages in API: #{in_api_for_this_day.size}, in XML: #{local_xml_msgs_for_platform_for_this_day.size}, adding #{adding} [add total: #{add[platform].size}]"
        end

      else
        log.warn "Missing deployment metadata for Argos platform #{platform}"
      end


    end

    log.debug "New data for platforms (N=#{add.size}): #{add.keys}"

    add.each do |platform,msgs|

      log.info "#{msgs.size} new messages for platform #{platform}"

      if msgs.any?
        log.debug "About to POST #{msgs.size} messages to #{uri}"
        api_client.param = {}
        response = api_client.post(msgs)
        log.debug response.map {|r| r.status }
      end
    end

  end
  exit(true)

rescue => e
  log.fatal e
  exit(false)
end