lib/argos/ds.rb
module Argos
# Argos DS|DAT file parser
#
# Usage
#
# ds = Argos::Ds.new
# puts ds.parse(filename).to_json
#
# @author Espen Egeland
# @author Conrad Helgeland
#
class Ds < Array
include Argos::Ascii
attr_writer :log, :filename, :bundle
attr_reader :bundle, :filename, :filter, :filtername, :valid, :filesize, :updated, :sha1, :messages, :multiplicates, :errors
START_REGEX = /^\d{5} \d{5,6} +\d+ +\d+/
START_REGEX_LEGACY = /\s+\d\.\d{3}\s\d{9}\s+\w{4}$/
LOCATION_CLASS = [nil, "0","1","2","3","A","B","G","Z"]
def initialize
@errors = []
@log = Logger.new(STDERR)
end
def filter?
not @filter.nil?
end
def filter=filter
if filter.respond_to? :call
@filter = filter
elsif filter =~ /lambda|Proc/
@filtername = filter
@filter = eval(filter)
end
end
def log
if @log.nil?
@log = Logger.new(STDERR)
end
@log
end
# Parses Argos DS file and returns Argos::Ds -> Array
#
# The parser loops all messages (stored in @messages), before #unfold
# creates a sorted Array of measurements
#
#@param filename [String] Filename of Argos DS file
#@return [Argos::Ds]
def parse(filename=nil)
self.clear # Needed if you parse multiple times
@messages = []
@valid = false
if filename.nil?
filename = @filename
end
filename = File.realpath(filename)
@filename = filename
if filename.nil? or not File.exists? filename
raise ArgumentError, "Missing ARGOS DS file: \"#{filename}\""
end
@sha1 = Digest::SHA1.file(filename).hexdigest
contact = []
file = File.open(filename)
@filesize = file.size
@updated = file.mtime.utc
log.debug "Parsing ARGOS DS file #{filename} source:#{sha1} (#{filesize} bytes)"
if filter?
log.debug "Using filter: #{@filtername.nil? ? filter : @filtername }"
end
firstline = file.readline
file.rewind
if firstline =~ START_REGEX_LEGACY
return parse_legacy(file)
end
file.each_with_index do |line, c|
line = line.strip
#if (c+1) % 1000 == 0
# log.debug "Line: #{c+1}"
#end
if line =~ START_REGEX
@valid = true
if contact.any?
item = parse_message(contact)
if self.class.valid_item? item
if not filter? or filter.call(item)
@messages << item
end
else
raise "Argos DS message #{filename}:#{c} lacks required program and/or platform"
end
end
contact = [line]
else
# 2010-12-14 15:11:34 1 00 37 01 52
if contact.any? and line != ""
contact << line
end
end
end
if false == @valid
#log.debug file.read
message = "Cannot parse file: #{filename}"
raise ArgumentError, message
end
last = parse_message(contact)
# The last message
if last
if not filter? or filter.call(last)
@messages << last
end
end
log.debug "Parsed #{@messages.size} Argos DS messages into #{self.class.name} Array"
@segments = @messages.size
unfold.each do |d|
self << d
end
@multiplicates = group_by { |e| e }.select { |k, v| v.size > 1 }.map(&:first)
if multiplicates.any?
log.warn "#{multiplicates.size} multiplicates in source sha1 #{sha1} #{filename}): #{multiplicates.map {|a|a[:id]} }"
self.uniq!
log.info "Unique DS messages: #{self.size} sha1: #{sha1} #{filename}"
end
self.sort_by! {|ds| ds[:measured]}
self
end
# Pare one DS segment
def parse_message(contact)
header = contact[0]
body = contact[1,contact.count]
items = process_item_body(body)
combine_header_with_transmission(items, header)
end
def type
"ds"
end
# @param [String] header
# Header is is a space-separated string containing
# [0] Program number
# [1] Platform number
# [2] Number of lines of data per satellite pass
# [3] Number of sensors
# [4] Satellite identifier
# [5] Location class (lc)
# [6] Location date 2007-03-02
# [7] Location UTC time
# [8] Latitude (decimal degrees)
# [9] Longitude, may be > 180 like 255.452°, equivalent to 255.452 - 360 = -104.548 (°E)
# [10] Altitude (km)
# [11] Frequency (calculated)
#
# The header varies in information elemenet, often either 0..4|5 or 0..11.
# Header examples (plit on " "):
# ["09660", "10788", "4", "3", "D", "0"]
# ["09660", "10788", "5", "3", "H", "2", "1992-04-06", "22:12:16", "78.248", "15.505", "0.000", "401649604"]
# ["09660", "10788", "2", "3", "D"]
# http://www.argos-system.org/files/pmedia/public/r363_9_argos_users_manual-v1.5.pdf page 42
#
# Warning, the parser does not support this header format from 1989 [AUO89.DAT]
# 19890800-19891000: ["09660", "14653", "10", "41", "14", "1", "-.42155E+1", "00", "112", "17DD"]
def combine_header_with_transmission(measurements, header)
unless header.is_a? Array
header = header.split(" ")
end
latitude = longitude = positioned = nil
warn = []
errors = []
lc = header[5]
if not header[6].nil? and not header[7].nil?
positioned = convert_datetime(header[6]+" "+header[7])
end
if header[8] != nil && valid_float?(header[8])
latitude = header[8].to_f
end
if header[9] != nil && valid_float?(header[9])
longitude = header[9].to_f
if (180..360).include? longitude
longitude = (longitude - 360)
end
end
altitude = header[10]
if not altitude.nil?
altitude = altitude.to_f*1000
end
if positioned.nil? and measurements.nil?
warn << "missing-time"
end
if latitude.nil? or longitude.nil?
#warn << "missing-position"
else
unless latitude.between?(-90, 90) and longitude.between?(-180, 180)
errors << "invalid-position"
end
end
unless LOCATION_CLASS.include? lc
errors << "invalid-lc"
end
# Satellites
# ["A", "B", "K", "L", "M", "N", "P", "R"]
document = { program: header[0].to_i,
platform: header[1].to_i,
lines: header[2].to_i,
sensors: header[3].to_i,
satellite: header[4],
lc: lc,
positioned: positioned,
latitude: latitude,
longitude: longitude,
altitude: altitude,
measurements: measurements,
headers: header.size
}
if warn.any?
document[:warn]=warn
end
if errors.any?
document[:errors]=errors
end
document
end
# Merge position and all other top-level DS fields with each measurement line
# (containing sensor data)
# The 3 lines below will unfold to *2* documents, each with
# "positioned":2010-03-05T14:19:06Z, "platform": "23695", "latitude":"79.989", etc.
# 23695 074772 3 4 M B 2010-03-05 14:19:06 79.989 12.644 0.036 401639707
# 2010-03-05 14:17:35 1 01 25 37630 36
# 2010-03-05 14:20:38 1 00 28 00 65
def unfold
# First, grab all segments *without* measurements (if any)
unfolded = messages.reject {|ds| ds.key?(:measurements) or ds[:measurements].nil? }.map {|m| m[:cardinality] = 0 }
log.debug "#{messages.size - unfolded.size} / #{messages.size} messages contained measurements"
messages.select {|ds|
ds.key?(:measurements) and not ds[:measurements].nil?
}.each do |ds|
ds[:measurements].each_with_index do |measurement,cardinality|
# Cardinality 0 will filter out repeated positions due to higher sensor sampling rate...
unfolded << merge(ds,measurement, cardinality)
end
end
unfolded = unfolded.sort_by {|ds|
if not ds[:measured].nil?
DateTime.parse(ds[:measured])
elsif not ds[:positioned].nil?
DateTime.parse(ds[:positioned])
else
ds[:program]
end
}
log.info "Unfolded #{messages.size} ARGOS DS position and sensor messages into #{unfolded.size} new documents source:#{sha1} #{filename}"
unfolded
end
# Merges a DS header hash into each measurement
# @return [Array] Measurements with header and static metadata merged in
def merge(ds, measurement, cardinality)
m = ds.select {|k,v| k != :measurements and k != :errors and k != :warn }
m = m.merge(measurement)
m = m.merge ({ technology: "argos",
type: type,
cardinality: cardinality
#file: "file://"+filename,
#source: sha1
})
# if not ds[:errors].nil? and ds[:errors].any?
# m[:errors] = ds[:errors].clone
# end
#
# if not ds[:warn].nil? and ds[:warn].any?
# m[:warn] = ds[:warn].clone
# end
#
# if not m[:sensor_data].nil? and m[:sensor_data].size != ds[:sensors]
# if m[:warn].nil?
# m[:warn] = []
# end
# m[:warn] << "sensors-count-mismatch"
# end
# Create id as SHA1 hash of measurement minus stuff that may vary (like filename)
#
# Possible improvement for is to base id on a static list of keys
# :program,
# :platform,
# :lines,
# :sensors,
# :satellite,
# :lc,
# :positioned,
# :latitude,
# :longitude,
# :altitude,
# :headers,
# :measured,
# :identical,
# :sensor_data,
# :technology,
# :type,
# :source
idbase = m.clone
idbase.delete :errors
idbase.delete :file
idbase.delete :warn
id = Digest::SHA1.hexdigest(idbase.to_json)
#m[:parser] = Argos.library_version
m[:id] = id
#m[:bundle] = bundle
m
end
def process_item_body(body_arr)
@buf =""
@transmission_arr = []
@transmission_arr = recursive_transmission_parse(body_arr)
end
# @param [Array] body_arr
# @return [Aray]
def recursive_transmission_parse(body_arr)
if body_arr.nil? or body_arr.empty?
return
end
@buf =@buf + " " + body_arr[0]
if body_arr[1] =~ /\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/ or body_arr[1]==nil
@transmission_arr << transmission_package(@buf)
@buf=""
end
recursive_transmission_parse(body_arr[1,body_arr.length])
@transmission_arr
end
def transmission_package(data)
transmission_time = data[/(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})/,1]
transmission_time = convert_datetime(transmission_time)
identical = data.split(" ")[2].to_i
data = data.strip[23,data.length]
if not data.nil?
sensor_data = data.split(" ")
end
{ measured: transmission_time,
identical: identical,
sensor_data: sensor_data
}
end
def start
positioned.map {|ds| ds [:positioned] }.first
end
def stop
positioned.map {|ds| ds [:positioned] }.last
end
def source
@sha1
end
protected
# "1999-04-02 01:28:54"
def convert_datetime(datetime)
#AUO89.DAT/home/ch/github.com/argos-ruby/lib/ds.rb:143:in `parse': can't convert nil into String (TypeError)
#/home/ch/github.com/api.npolar.no/seed/tracking/argos/19890800-19891000
#AUO89.DAT/home/ch/github.com/argos-ruby/lib/ds.rb:149:in `parse': invalid date (ArgumentError)
begin
datetime = ::DateTime.parse(datetime).iso8601.to_s
datetime['+00:00'] = "Z"
datetime
rescue
log.error "Invalid date #{datetime}"
DateTime.new(0).xmlschema.gsub(/\+00:00/, "Z")
end
end
def positioned
select {|ds|
ds.key? :positioned and not ds[:positioned].nil?
}
end
# Argos format until 1991
#
#09660 6 09691 2 14286 89 042 17 18 05 1 3 G 0.000 401650000 0VDI
#09660 2 59.891 10.629 401649651 0VDJ
#09660 14286 17 14 26 1 -.72543E+1 00 0VDK
#09660 14286 17 16 52 1 -.72410E+1 00 0VDL
#09660 14286 17 19 18 2 -.72376E+1 00 0VDM
#09660 14286 17 21 44 3 -.72376E+1 00 0VDN
# Header: 09660[program] 6[lines] ????? 2 ????? 89[year] 042[day?] 17 18 05[time?] 1 3[?] G[?] 0.000 \d{9}[f] \d\w{3}[ident]
def parse_legacy(file)
raise "Legacy DS file parser: not implemented"
#file.each_with_index do |line, c|
# line = line.strip
# log.debug line
#end
end
def valid_float?(str)
!!Float(str) rescue false
end
def self.valid_item?(item)
unless item.respond_to?(:key)
return false
end
item.key?(:program) and item.key?(:platform)
end
end
end