external/cls.fr/bin/npolar-argos-publish-xml
#!/usr/bin/env ruby
# Ruby-based command to publish CLS Argos-system XML tracking data
#
# Usage:
# cd ~/npolar/api.npolar.no/external/cls.fr
# ./bin/npolar-argos-publish-xml /mnt/datasets/Tracking/ARGOS/ws-argos.cls.fr/2018/program-11660 https://api.npolar.no/tracking/arctic-fox
#
# For more information:
# https://github.com/npolar/api.npolar.no/tree/master/external/cls.fr/README.md
require "bundler/setup"
require "logger"
require "nokogiri"
require "uri"
require "argos"
require "npolar/api/client"
log = Logger.new(STDERR)
#log.level = Logger::WARN
log.level = Logger::DEBUG
def decode_lambda
lambda {|d, decoder=nil|
if d.key? "decoder"
return d
end
if decoder.nil?
if d["platform_model"] =~ /^KiwiSat303/i
decoder = Argos::KiwiSat303Decoder.new
elsif d["platform_model"] =~ /^(KiwiSat202|GPSARGOS)/i
d["platform_model"] = "KiwiSat202"
decoder = Argos::KiwiSat202Decoder.new
end
end
if decoder.nil?
return d
end
unless d["sensor_data"].nil? and d["sensor_hex"].nil?
if d.key?("sensor_hex") and d["sensor_hex"].size >= 2
decoder.sensor_data = d["sensor_hex"].scan(/[0-9a-f]{2}/i).map {|h| h.to_i(16) }
else
decoder.sensor_data = d["sensor_data"] # .map {|c| c = c.rjust(2, '0')}.join("")
end
d["decoder"] = decoder.class.name
d["parser"] = "argos-ruby-#{Argos::VERSION}"
decoder.data.each do |k,v|
d[k]=v
end
#puts decoder.data.to_json
d["sensor_variables"] = decoder.data.keys
end
d
}
end
def latest(api_client, platform=nil)
param = { q: "",
format: "json",
variant: "atom",
limit: 1,
"filter-type" => "xml",
"date-day" => "measured",
fields: "platform,measured",
sort: "-measured"
}
if not platform.nil?
param[:limit] = "all"
param[:"filter-platform"] = platform
param[:"fields"] += ",_id,_rev,platform_model"
end
api_client.param = param
api_client.authorization = true
response = api_client.get
existing = JSON.parse(response.body)
if existing.nil? or not existing.key? "feed"
raise "Failed: #{response.status}"
end
existing["feed"]
end
def deployments(platforms)
client = Npolar::Api::Client::JsonApiClient.new("https://api.npolar.no/tracking/deployment")
log = Logger.new(STDERR)
log.level = Logger::WARN
client.log = log
param = { q: "",
format: "json",
variant: "array",
limit: "all",
"filter-platform" => platforms.join("|"),
sort: "platform"
}
client.param = param
response = client.get
if 200 != response.status
raise "Failed accessing #{uri}"
end
JSON.parse(response.body)
end
begin
if not ENV.key?("NPOLAR_API_USERNAME") or not ENV.key?("NPOLAR_API_PASSWORD")
raise "Please set NPOLAR_API_USERNAME and NPOLAR_API_PASSWORD"
end
if ARGV.size < 2
raise "Usage: #{__dir__} /path/to/followit/archive https://api.npolar.no/tracking/endpoint"
end
if not File.exist? ARGV[0]
raise "Source does not exist: #{ARGV[0]}"
end
if ARGV[1] !~ URI::regexp
raise "Invalid destination URI #{ARGV[1]}"
end
source = ARGV[0]
uri = ARGV[1]
glob = File.join(source, "**/*.xml")
api_client = Npolar::Api::Client::JsonApiClient.new(uri)
api_client.log = log
# get latest date already published
latest_feed = latest(api_client)
last = DateTime.new(1000)
if not latest_feed["entries"].any?
log.debug "0 existing documents"
last = DateTime.new(1000)
else
last = DateTime.parse latest_feed["entries"][0]["measured"]
log.debug "#{last.to_time.utc.iso8601} = last measured observation in #{uri} [totalResults: #{latest_feed["opensearch"]["totalResults"]}]"
end
#
# argos_json_xslt = File.join(File.dirname(Argos.method(:library_version).source_location[0]), "argos", "_xslt", "argos-json.xslt")
# if not File.exists? argos_json_xslt
# raise "Cannot find Argis XML to JSON stylesheet: #{argos_json_xslt}"
# end
argos_xml = Argos::Xml.new
docs = [] # Holds all local XML messages
add = {} # Hash with platform as key, holds messages to add (newer in XML that latest in API for that platform)
c=0
# or (File.mtime(f) < last.to_time.utc )
Dir[glob].reject {|f|
(File.directory? f or File.size(f) == 0)
}.map do |f|
log.debug f
argos_xml.xml = f
msgs = argos_xml.to_a.map {|d| decode_lambda.call(d)}
if msgs.any?
c = c + msgs.size
#log.debug "#{msgs.size} messages / #{c} <- #{f} [file modified #{File.mtime(f).to_json}]"
docs += msgs
end
end
if docs.any?
platforms = docs.map {|d| d["platform"]}.uniq.sort
depl = deployments(platforms)
log.debug "Platforms (N=#{platforms.size}) in source XML: #{platforms.to_json} - found #{depl.size} deployments"
platforms.each do |platform|
deployments_for_platform = depl.select {|d| d["platform"] == platform }
# Only add known platforms (ie. with deployment metadata)
if deployments_for_platform.any?
# 113909 has this problem
if deployments_for_platform.size > 1
log.warn "Multiple deployments for platform #{platform}:\n#{deployments_for_platform.to_json}"
end
local_xml_msgs_for_platform = docs.select {|d| d["platform"] == platform }.sort_by {|d|
d["measured"]
}
api_latest_feed_for_platform = latest(api_client, platform)
remote_api_msgs_for_platform = api_latest_feed_for_platform["opensearch"]["totalResults"]
if api_latest_feed_for_platform["entries"].any?
last_remote = api_latest_feed_for_platform["entries"][0]["measured"]
else
last_remote = nil
end
first_local = local_xml_msgs_for_platform.first["measured"]
last_local = local_xml_msgs_for_platform.last["measured"]
days_local = local_xml_msgs_for_platform.map {|m| Date.parse(m["measured"]).iso8601 }.uniq
log.debug "Platform #{platform} local days (#{days_local.size}): #{days_local.to_json}, local XML: #{source} remote API: #{uri}"
log.debug "Platform #{platform} local messages (XML): #{local_xml_msgs_for_platform.size}, remote messages (API): #{remote_api_msgs_for_platform}"
log.debug "Platform #{platform} [first local, last local, last remote]: [#{first_local.to_json}, #{last_local.to_json}, #{last_remote.to_json}]"
log.debug "Platform #{platform} deployments (N=#{deployments_for_platform.size}): #{deployments_for_platform}"
remote_days_not_before_or_after_local_days = api_latest_feed_for_platform["facets"].select {|f|
f.keys[0] == "day-measured"
}[0]["day-measured"].map {|f| f["term"]}.sort.uniq.select {|isodate|
(DateTime.parse(isodate) <= DateTime.parse(last_local)) and (DateTime.parse(isodate) >= DateTime.parse(first_local)) # might be newer messages in API for example next year...
}
log.debug "Remote days (#{remote_days_not_before_or_after_local_days.size}) #{remote_days_not_before_or_after_local_days} [not before or after local days]"
#DateTime.parse(m["measured"]) > DateTime.parse(last_remote||"1000-01-01")
days_local.each do |day|
in_api_for_this_day = api_latest_feed_for_platform["entries"].select{|e|
Date.parse(e["measured"]).iso8601 == day
}
if not add.key? platform
add[platform] = []
end
local_xml_msgs_for_platform_for_this_day = local_xml_msgs_for_platform.select{|x|
Date.parse(x["measured"]).iso8601 == day
}
adding = 0
if in_api_for_this_day.size == 0 and local_xml_msgs_for_platform_for_this_day.any?
adding = local_xml_msgs_for_platform_for_this_day.size
add[platform] += local_xml_msgs_for_platform_for_this_day
elsif in_api_for_this_day.size > 0 and local_xml_msgs_for_platform_for_this_day.size != in_api_for_this_day.size
log.fatal "Messages in API (#{in_api_for_this_day.size}) != XML on disk (#{local_xml_msgs_for_platform_for_this_day.size})"
elsif in_api_for_this_day.size == local_xml_msgs_for_platform_for_this_day.size
# @todo merge in id, _id and _rev
api_ks202 = in_api_for_this_day.select {|api| api["platform_model"] =~ /^(GPSARGOS|KiwiSat202)/i }
local_ks202 = local_xml_msgs_for_platform_for_this_day.select {|l| l["platform_model"] =~ /^(GPSARGOS|KiwiSat202)/i }
if local_ks202.any? and local_ks202.size == api_ks202.size
# Uncomment the following block to UPDATE existing data in API
# id_revs = api_ks202.map {|api| [api["_id"], api["_rev"]]}
# i = 0
# adding = local_ks202.size
# add[platform] += local_ks202.map {|l|
#
# log.info id_revs[i]
# #if l["platform_model"] = "GPSARGOS"
# l["platform_model"] = "KiwiSat202"
# #end
# l["id"] = id_revs[i][0]
# l["_id"] = id_revs[i][0]
# l["_rev"] = id_revs[i][1]
# i += 1
# l
# }
end
end
log.debug "#{day} (platform #{platform}): messages in API: #{in_api_for_this_day.size}, in XML: #{local_xml_msgs_for_platform_for_this_day.size}, adding #{adding} [add total: #{add[platform].size}]"
end
else
log.warn "Missing deployment metadata for Argos platform #{platform}"
end
end
log.debug "New data for platforms (N=#{add.size}): #{add.keys}"
add.each do |platform,msgs|
log.info "#{msgs.size} new messages for platform #{platform}"
if msgs.any?
log.debug "About to POST #{msgs.size} messages to #{uri}"
api_client.param = {}
response = api_client.post(msgs)
log.debug response.map {|r| r.status }
end
end
end
exit(true)
rescue => e
log.fatal e
exit(false)
end