cosmos/lib/cosmos/packets/parsers/xtce_parser.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
require 'nokogiri'
require 'ostruct'
module Cosmos
class XtceParser
attr_accessor :current_target_name
# Processes a XTCE formatted COSMOS configuration file
#
# @param commands [Hash<String=>Packet>] Hash of all the command packets
# keyed by the packet name.
# @param telemetry [Hash<String=>Packet>] Hash of all the telemetry packets
# keyed by the packet name.
# @param warnings [Array<String>] Array of strings listing all the warnings
# that were created while parsing the configuration
# @param filename [String] The name of the configuration file
# @param target_name [String] Override the target name found in the XTCE file
def self.process(commands, telemetry, warnings, filename, target_name = nil)
XtceParser.new(commands, telemetry, warnings, filename, target_name)
end
def self.reverse_packet_order(target_name, cmd_or_tlm_hash)
if cmd_or_tlm_hash[target_name]
packets = []
names_to_remove = []
cmd_or_tlm_hash[target_name].each do |packet_name, packet|
packets << packet
names_to_remove << packet_name
end
cmd_or_tlm_hash[target_name].length.times do |i|
cmd_or_tlm_hash[target_name].delete(names_to_remove[i])
end
packets.reverse_each do |packet|
cmd_or_tlm_hash[target_name][packet.packet_name] = packet
end
end
end
private
def initialize(commands, telemetry, warnings, filename, target_name = nil)
reset_processing_variables()
@commands = commands
@telemetry = telemetry
@warnings = warnings
@current_packet = nil
parse(filename, target_name)
end
def parse(filename, target_name)
# Fortify complains about Path Manipulation here
# We have previously validated the file is a .xtce file in packet_config
# The file is opened read-only and then immediately parsed by Nokogiri
doc = File.open(filename) { |f| Nokogiri::XML(f, nil, nil, Nokogiri::XML::ParseOptions::STRICT | Nokogiri::XML::ParseOptions::NOBLANKS) }
# Determine the @current_target_name
xtce_process_element(doc.root)
@current_target_name = target_name if target_name
doc.root.children.each do |child|
xtce_recurse_element(child) do |element|
xtce_process_element(element)
end
end
finish_packet()
# Remove abstract
if @commands[@current_target_name]
@commands[@current_target_name].delete_if { |packet_name, packet| packet.abstract }
end
if @telemetry[@current_target_name]
@telemetry[@current_target_name].delete_if { |packet_name, packet| packet.abstract }
end
# Reverse order of packets for the target so ids work correctly
XtceParser.reverse_packet_order(@current_target_name, @commands)
XtceParser.reverse_packet_order(@current_target_name, @telemetry)
reset_processing_variables()
end
# Add current packet into hash if it exists
def finish_packet
if @current_packet
@warnings += @current_packet.check_bit_offsets
set_packet_endianness()
if @current_cmd_or_tlm == PacketConfig::COMMAND
PacketParser.check_item_data_types(@current_packet)
@commands[@current_packet.target_name][@current_packet.packet_name] = @current_packet
else
@telemetry[@current_packet.target_name][@current_packet.packet_name] = @current_packet
end
@current_packet = nil
end
end
def set_packet_endianness
item_endianness = @current_packet.sorted_items.collect do |item|
# Ignore COSMOS reserved items
next if Packet::RESERVED_ITEM_NAMES.include?(item.name)
# Strings and Blocks endianness don't matter so ignore them
item.endianness if item.data_type != :STRING && item.data_type != :BLOCK
end
# Compact to get rid of nils from skipping Strings and Blocks
# Uniq to get rid of duplicates which results in an array of 1 or 2 items
item_endianness = item_endianness.compact.uniq
if item_endianness.length == 1 # All items have the same endianness
# default_endianness is read_only since it affects how items are added
# thus we have to use instance_variable_set here to override it
@current_packet.instance_variable_set(:@default_endianness, item_endianness[0])
end
end
def reset_processing_variables
@current_target_name = nil
@current_cmd_or_tlm = nil
@current_type = nil
@current_meta_command = nil
@current_parameter = nil
@current_argument = nil
@parameter_types = {}
@argument_types = {}
@parameters = {}
@arguments = {}
@containers = {}
end
def create_new_type(element)
current_type = OpenStruct.new
element.attributes.each do |att_name, att|
current_type[att.name] = att.value
end
if /Argument/.match?(element.name)
@argument_types[element["name"]] = current_type
else
@parameter_types[element["name"]] = current_type
end
current_type
end
XTCE_IGNORED_ELEMENTS = ['text', 'AliasSet', 'Alias', 'Header']
def xtce_process_element(element)
if XTCE_IGNORED_ELEMENTS.include?(element.name)
return false
end
case element.name
when 'SpaceSystem'
@current_target_name = element["name"].to_s.upcase
when 'TelemetryMetaData'
finish_packet()
@current_cmd_or_tlm = PacketConfig::TELEMETRY
when 'CommandMetaData'
finish_packet()
@current_cmd_or_tlm = PacketConfig::COMMAND
when 'ParameterTypeSet', 'EnumerationList', 'ParameterSet', 'ContainerSet',
'EntryList', 'DefaultCalibrator', 'DefaultAlarm', 'RestrictionCriteria',
'ComparisonList', 'MetaCommandSet', 'ArgumentTypeSet', 'ArgumentList',
'ArgumentAssignmentList', 'LocationInContainerInBits'
# Do Nothing
when 'EnumeratedParameterType', 'EnumeratedArgumentType',
'IntegerParameterType', 'IntegerArgumentType',
'FloatParameterType', 'FloatArgumentType',
'StringParameterType', 'StringArgumentType',
'BinaryParameterType', 'BinaryArgumentType'
@current_type = create_new_type(element)
@current_type.endianness = :BIG_ENDIAN
case element.name
when 'EnumeratedParameterType', 'EnumeratedArgumentType'
@current_type.xtce_encoding = 'IntegerDataEncoding'
@current_type.sizeInBits = 8 # This is undocumented but appears to be the design
when 'IntegerParameterType', 'IntegerArgumentType'
@current_type.xtce_encoding = 'IntegerDataEncoding'
@current_type.sizeInBits = 32
when 'FloatParameterType', 'FloatArgumentType'
@current_type.xtce_encoding = 'FloatDataEncoding'
@current_type.sizeInBits = 32
when 'StringParameterType', 'StringArgumentType'
@current_type.xtce_encoding = 'StringDataEncoding'
when 'BinaryParameterType', 'BinaryArgumentType'
@current_type.xtce_encoding = 'BinaryDataEncoding'
@current_type.sizeInBits = 8 # This is undocumented but appears to be the design
end
when 'ArrayParameterType', 'ArrayArgumentType'
@current_type = create_new_type(element)
when 'ByteOrderList'
byte_list = []
xtce_recurse_element(element) do |block_element|
if block_element.name == 'Byte'
if block_element['byteSignificance']
byte_list << block_element['byteSignificance'].to_i
end
end
true
end
if byte_list[0] == 0
# Little endian will always start with 0 - Its ok if a single byte item is marked little endian
@current_type.endianness = :LITTLE_ENDIAN
end
# Verify ordering of byte list is supported
if byte_list[0] >= byte_list[-1]
ordered_byte_list = byte_list.reverse
else
ordered_byte_list = byte_list.clone
end
if ordered_byte_list[0] != 0
msg = "Invalid ByteOrderList detected: #{byte_list.join(", ")}"
Logger.instance.warn msg
@warnings << msg
else
previous_byte = nil
ordered_byte_list.each do |byte|
if previous_byte
if byte - previous_byte != 1
msg = "Invalid ByteOrderList detected: #{byte_list.join(", ")}"
Logger.instance.warn msg
@warnings << msg
break
end
end
previous_byte = byte
end
end
return false # Already recursed
when "SizeInBits"
xtce_recurse_element(element) do |block_element|
if block_element.name == 'FixedValue'
@current_type.sizeInBits = Integer(block_element.text)
false
else
true
end
end
return false # Already recursed
when 'UnitSet'
xtce_recurse_element(element) do |block_element|
if block_element.name == 'Unit'
units = block_element.text.to_s
description = block_element['description'].to_s
description = units if description.empty?
units = description if units.empty?
@current_type.units ||= ''
if @current_type.units.empty?
@current_type.units << units
else
@current_type.units << ('/' + units)
end
@current_type.units << "^#{block_element['power']}" if block_element['power']
@current_type.units_full ||= ''
if @current_type.units_full.empty?
@current_type.units_full << description
else
@current_type.units_full << ('/' + description)
end
end
true
end
return false # Already recursed
when 'PolynomialCalibrator'
xtce_recurse_element(element) do |block_element|
if block_element.name == 'Term'
exponent = Float(block_element['exponent']).to_i
@current_type.conversion ||= PolynomialConversion.new()
@current_type.conversion.coeffs[exponent] = Float(block_element['coefficient'])
@current_type.conversion.coeffs.each_with_index do |value, index|
@current_type.conversion.coeffs[index] = 0.0 if value.nil?
end
end
true
end
return false # Already recursed
when 'StaticAlarmRanges'
xtce_recurse_element(element) do |block_element|
if block_element.name == 'WarningRange'
@current_type.limits ||= [0.0, 0.0, 0.0, 0.0]
@current_type.limits[1] = Float(block_element['minInclusive']) if block_element['minInclusive']
@current_type.limits[2] = Float(block_element['maxInclusive']) if block_element['maxInclusive']
elsif block_element.name == 'CriticalRange'
@current_type.limits ||= [0.0, 0.0, 0.0, 0.0]
@current_type.limits[0] = Float(block_element['minInclusive']) if block_element['minInclusive']
@current_type.limits[3] = Float(block_element['maxInclusive']) if block_element['maxInclusive']
end
true
end
return false # Already recursed
when "ValidRange"
@current_type.minInclusive = element['minInclusive']
@current_type.maxInclusive = element['maxInclusive']
when 'Enumeration'
@current_type.states ||= {}
@current_type.states[element['label']] = Integer(element['value'])
when 'IntegerDataEncoding', 'FloatDataEncoding', 'StringDataEncoding', 'BinaryDataEncoding'
@current_type.xtce_encoding = element.name
element.attributes.each do |att_name, att|
@current_type[att.name] = att.value
end
@current_type.sizeInBits = 8 unless element.attributes['sizeInBits']
case element.attributes['byteOrder'].to_s
when 'mostSignificantByteFirst'
@current_type.endianness = :BIG_ENDIAN
when 'leastSignificantByteFirst'
@current_type.endianness = :LITTLE_ENDIAN
end
when 'Parameter'
@current_parameter = OpenStruct.new
element.attributes.each do |att_name, att|
@current_parameter[att.name] = att.value
end
@parameters[element["name"]] = @current_parameter
when 'Argument'
@current_argument = OpenStruct.new
element.attributes.each do |att_name, att|
@current_argument[att.name] = att.value
end
@arguments[element["name"]] = @current_argument
when 'ParameterProperties'
element.attributes.each do |att_name, att|
@current_parameter[att.name] = att.value
end
when "SequenceContainer"
finish_packet()
@current_packet = Packet.new(@current_target_name, element['name'], :BIG_ENDIAN, element['shortDescription'])
@current_packet.abstract = ConfigParser.handle_true_false_nil(element['abstract'])
@containers[element['name']] = @current_packet
PacketParser.finish_create_telemetry(@current_packet, @telemetry, {}, @warnings)
# Need to check for a BaseContainer now because if we hit it later it will be too late
xtce_handle_base_container('BaseContainer', element)
when 'LongDescription'
if @current_packet && !@current_packet.description
@current_packet.description = element.text
end
when 'ParameterRefEntry', 'ArgumentRefEntry', 'ArrayParameterRefEntry', 'ArrayArgumentRefEntry'
process_ref_entry(element)
return false # Already recursed
when 'BaseContainer'
# Handled in SequenceContainer/CommandContainer
when 'BaseMetaCommand'
# Handled in MetaCommand
when 'Comparison'
# Need to set ID value for item
item = @current_packet.get_item(element['parameterRef'])
item.id_value = Integer(element['value'])
if @current_cmd_or_tlm == PacketConfig::COMMAND
item.default = item.id_value
end
@current_packet.update_id_items(item)
when 'MetaCommand'
finish_packet()
@current_packet = Packet.new(@current_target_name, element['name'], :BIG_ENDIAN, element['shortDescription'])
@current_packet.abstract = ConfigParser.handle_true_false_nil(element['abstract'])
PacketParser.finish_create_command(@current_packet, @commands, @warnings)
# Need to check for a BaseContainer now because if we hit it later it will be too late
xtce_handle_base_container('BaseMetaCommand', element)
when 'CommandContainer'
@containers[element['name']] = @current_packet
# Need to check for a BaseContainer now because if we hit it later it will be too late
xtce_handle_base_container('BaseContainer', element)
when 'ArgumentAssignment'
# Need to set ID value for item
item = @current_packet.get_item(element['argumentName'])
value = element['argumentValue']
if item.states && item.states[value.to_s.upcase]
item.id_value = item.states[value.to_s.upcase]
item.default = item.id_value
else
item.id_value = Integer(value)
item.default = item.id_value
end
@current_packet.update_id_items(item)
else
puts " Ignoring Unknown: <#{element.name}>"
end # case element.name
return true # Recurse further
end
def process_ref_entry(element)
reference_location, bit_offset = xtce_handle_location_in_container_in_bits(element)
object, type, data_type, array_type = get_object_types(element)
bit_size = Integer(type.sizeInBits)
if array_type
array_bit_size = process_array_type(element, bit_size)
else
array_bit_size = nil # in define_item, nil indicates the item is not an array
end
if bit_offset
case reference_location
when 'containerStart'
item = @current_packet.define_item(object.name, bit_offset, bit_size, data_type, array_bit_size, type.endianness) # overflow = :ERROR, format_string = nil, read_conversion = nil, write_conversion = nil, id_value = nil)
when 'containerEnd'
item = @current_packet.define_item(object.name, -bit_offset, bit_size, data_type, array_bit_size, type.endianness) # overflow = :ERROR, format_string = nil, read_conversion = nil, write_conversion = nil, id_value = nil)
when 'previousEntry', nil
item = @current_packet.define_item(object.name, @current_packet.length + bit_offset, bit_size, data_type, array_bit_size, type.endianness) # overflow = :ERROR, format_string = nil, read_conversion = nil, write_conversion = nil, id_value = nil)
when 'nextEntry'
raise 'nextEntry is not supported'
end
else
item = @current_packet.append_item(object.name, bit_size, data_type, array_bit_size, type.endianness) # overflow = :ERROR, format_string = nil, read_conversion = nil, write_conversion = nil, id_value = nil)
end
item.description = type.shortDescription if type.shortDescription
item.states = type.states if type.states
set_units(item, type)
set_conversion(item, type, data_type)
set_min_max_default(item, type, data_type)
set_limits(item, type)
end
def get_object_types(element)
array_type = nil
if /Parameter/.match?(element.name)
# Look up the parameter and parameter type
parameter = @parameters[element['parameterRef']]
raise "parameterRef #{element['parameterRef']} not found" unless parameter
parameter_type = @parameter_types[parameter.parameterTypeRef]
raise "parameterTypeRef #{parameter.parameterTypeRef} not found" unless parameter_type
if element.name == 'ArrayParameterRefEntry'
array_type = parameter_type
parameter_type = @parameter_types[array_type.arrayTypeRef]
raise "arrayTypeRef #{parameter.arrayTypeRef} not found" unless parameter_type
end
refName = 'parameterRef'
object = parameter
type = parameter_type
else
# Look up the argument and argument type
if element.name == 'ArrayArgumentRefEntry'
# Requiring parameterRef for argument arrays appears to be a defect in the schema
argument = @arguments[element['parameterRef']]
raise "parameterRef #{element['parameterRef']} not found" unless argument
argument_type = @argument_types[argument.argumentTypeRef]
raise "argumentTypeRef #{argument.argumentTypeRef} not found" unless argument_type
array_type = argument_type
argument_type = @argument_types[array_type.arrayTypeRef]
raise "arrayTypeRef #{array_type.arrayTypeRef} not found" unless argument_type
refName = 'parameterRef'
else
argument = @arguments[element['argumentRef']]
raise "argumentRef #{element['argumentRef']} not found" unless argument
argument_type = @argument_types[argument.argumentTypeRef]
raise "argumentTypeRef #{argument.argumentTypeRef} not found" unless argument_type
refName = 'argumentRef'
end
object = argument
type = argument_type
end
data_type = get_data_type(type)
raise "Referenced Parameter/Argument has no xtce_encoding: #{element[refName]}" unless data_type
return [object, type, data_type, array_type]
end
def process_array_type(element, bit_size)
array_num_items = 1
# Need to determine dimensions
xtce_recurse_element(element) do |block_element|
if block_element.name == 'Dimension'
starting_index = 0
ending_index = 0
block_element.children.each do |child_element|
if child_element.name == 'StartingIndex'
child_element.children.each do |child_element2|
if child_element2.name == 'FixedValue'
starting_index = child_element2.text.to_i
end
end
elsif child_element.name == 'EndingIndex'
child_element.children.each do |child_element2|
if child_element2.name == 'FixedValue'
ending_index = child_element2.text.to_i
end
end
array_num_items *= ((ending_index - starting_index).abs + 1)
end
false # Don't recurse again
end
false # Don't recurse again
else
true # Keep recursing
end
end
array_num_items * bit_size
end
def get_data_type(type)
data_type = nil
case type.xtce_encoding
when 'IntegerDataEncoding'
if type.signed == 'false' || type.encoding == 'unsigned'
data_type = :UINT
else
data_type = :INT
end
when 'FloatDataEncoding'
data_type = :FLOAT
when 'StringDataEncoding'
data_type = :STRING
when 'BinaryDataEncoding'
data_type = :BLOCK
end
data_type
end
def set_units(item, type)
if type.units && type.units_full
item.units = type.units
item.units_full = type.units_full
end
end
def set_conversion(item, type, data_type)
if type.conversion && type.conversion.class == PolynomialConversion
if @current_cmd_or_tlm == PacketConfig::COMMAND
item.write_conversion = type.conversion
else
item.read_conversion = type.conversion
end
end
end
def set_min_max_default(item, type, data_type)
return unless @current_cmd_or_tlm == PacketConfig::COMMAND
# Need to set min, max, and default
if data_type == :INT || data_type == :UINT
if data_type == :INT
item.range = (-(2**(Integer(type.sizeInBits) - 1)))..((2**(Integer(type.sizeInBits) - 1)) - 1)
else
item.range = 0..((2**Integer(type.sizeInBits)) - 1)
end
if type.minInclusive && type.maxInclusive
item.range = Integer(type.minInclusive)..Integer(type.maxInclusive)
end
if item.array_size
item.default = []
else
item.default = 0
if item.states && item.states[type.initialValue.to_s.upcase]
item.default = Integer(item.states[type.initialValue.to_s.upcase])
else
item.default = Integer(type.initialValue) if type.initialValue
end
end
elsif data_type == :FLOAT
if Integer(type.sizeInBits) == 32
item.range = -3.402823e38..3.402823e38
else
item.range = -Float::MAX..Float::MAX
end
if type.minInclusive && type.maxInclusive
item.range = Float(type.minInclusive)..Float(type.maxInclusive)
end
if item.array_size
item.default = []
else
item.default = 0.0
item.default = Float(type.initialValue) if type.initialValue
end
elsif data_type == :STRING || data_type == :BLOCK
if item.array_size
item.default = []
else
if type.initialValue
if type.initialValue.upcase.start_with?("0X")
item.default = type.initialValue.hex_to_byte_string
else
# Strip quotes from strings
if type.initialValue[0] == '"' && type.initialValue[-1] == '"'
item.default = type.initialValue[1..-2]
end
end
else
item.default = ''
end
end
end
end
def set_limits(item, type)
return unless @current_cmd_or_tlm == PacketConfig::TELEMETRY
if type.limits
item.limits.enabled = true
values = {}
values[:DEFAULT] = type.limits
item.limits.values = values
end
end
def xtce_format_attributes(element)
string = ''
element.attributes.each do |att_name, att|
string << "#{att.name}:#{att.value} "
end
if string.length > 0
string = '( ' + string + ')'
end
return string
end
def xtce_recurse_element(element, &block)
return unless yield(element)
element.children.each do |child_element|
xtce_recurse_element(child_element, &block)
end
end
def xtce_handle_base_container(base_name, element)
if element.name == base_name
# Need to add BaseContainer items to current_packet
# Lookup the base packet
if base_name == 'BaseMetaCommand'
base_packet = @commands[@current_packet.target_name][element['metaCommandRef'].to_s.upcase]
else
base_packet = @containers[element['containerRef']]
end
if base_packet
count = 0
base_packet.sorted_items.each do |item|
unless ['PACKET_TIMESECONDS', 'PACKET_TIMEFORMATTED', 'RECEIVED_TIMESECONDS', 'RECEIVED_TIMEFORMATTED', 'RECEIVED_COUNT'].include?(item.name)
begin
@current_packet.get_item(item.name)
rescue
# Item hasn't already been added so define it
@current_packet.define(item.clone)
count += 1
end
end
end
return
else
if base_name == 'BaseMetaCommand'
raise "Unknown #{base_name}: #{element['metaCommandRef']}"
else
raise "Unknown #{base_name}: #{element['containerRef']}"
end
end
end
element.children.each do |child_element|
xtce_handle_base_container(base_name, child_element)
end
end
def xtce_handle_location_in_container_in_bits(element)
element.children.each do |child_element|
if child_element.name == 'LocationInContainerInBits'
child_element.children.each do |child_element2|
if child_element2.name == 'FixedValue'
return [child_element['referenceLocation'], Integer(child_element2.text)]
end
end
end
end
return [nil, nil]
end
end
end