cosmos/lib/cosmos/packets/packet_config.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
require 'cosmos/config/config_parser'
require 'cosmos/packets/packet'
require 'cosmos/packets/parsers/packet_parser'
require 'cosmos/packets/parsers/packet_item_parser'
require 'cosmos/packets/parsers/limits_parser'
require 'cosmos/packets/parsers/limits_response_parser'
require 'cosmos/packets/parsers/state_parser'
require 'cosmos/packets/parsers/format_string_parser'
require 'cosmos/packets/parsers/processor_parser'
require 'cosmos/packets/parsers/xtce_parser'
require 'cosmos/packets/parsers/xtce_converter'
require 'cosmos/conversions'
require 'cosmos/processors'
require 'nokogiri'
require 'ostruct'
module Cosmos
# Reads a command or telemetry configuration file and builds a hash of packets.
class PacketConfig
# @return [String] The name of this configuration. To be used by higher
# level classes to store information about the current PacketConfig.
attr_accessor :name
# @return [Hash<String=>Packet>] Hash of all the telemetry packets
# keyed by the packet name.
attr_reader :telemetry
# @return [Hash<String=>Packet>] Hash of all the command packets
# keyed by the packet name.
attr_reader :commands
# @return [Hash<String=>Array(String, String, String)>] Hash of all the
# limits groups keyed by the group name. The value is a three element
# array consisting of the target_name, packet_name, and item_name.
attr_reader :limits_groups
# @return [Array<Symbol>] The defined limits sets for all items in the
# packet. This will always include :DEFAULT.
attr_reader :limits_sets
# @return [Array<String>] Array of strings listing all the warnings
# that were created while parsing the configuration file.
attr_reader :warnings
# @return [Hash<String=>Hash<String=>Array(Packet)>>] Hash of hashes keyed
# first by the target name and then by the item name. This results in an
# array of packets containing that target and item. This structure is
# used to perform lookups when the packet and item are known but the
# packet is not.
attr_reader :latest_data
# @return [Hash<String>=>Hash<Array>=>Packet] Hash keyed by target name
# that returns a hash keyed by an array of id values. The id values resolve to the packet
# defined by that identification. Command version
attr_reader :cmd_id_value_hash
# @return [Hash<String>=>Hash<Array>=>Packet] Hash keyed by target name
# that returns a hash keyed by an array of id values. The id values resolve to the packet
# defined by that identification. Telemetry version
attr_reader :tlm_id_value_hash
COMMAND = "Command"
TELEMETRY = "Telemetry"
def initialize
@name = nil
@telemetry = {}
@commands = {}
@limits_groups = {}
@limits_sets = [:DEFAULT]
# Hash of Hashes. First index by target name and then item name.
# Returns an array of packets with that target and item.
@latest_data = {}
@warnings = []
@cmd_id_value_hash = {}
@tlm_id_value_hash = {}
# Create unknown packets
@commands['UNKNOWN'] = {}
@commands['UNKNOWN']['UNKNOWN'] = Packet.new('UNKNOWN', 'UNKNOWN', :BIG_ENDIAN)
@telemetry['UNKNOWN'] = {}
@telemetry['UNKNOWN']['UNKNOWN'] = Packet.new('UNKNOWN', 'UNKNOWN', :BIG_ENDIAN)
reset_processing_variables()
end
#########################################################################
# The following methods process a command or telemetry packet config file
#########################################################################
# Processes a COSMOS configuration file and uses the keywords to build up
# knowledge of the commands, telemetry, and limits groups.
#
# @param filename [String] The name of the configuration file
# @param process_target_name [String] The target name. Pass nil when parsing
# an xtce file to automatically determine the target name.
def process_file(filename, process_target_name)
# Handle .xtce files
if File.extname(filename).to_s.downcase == ".xtce"
XtceParser.process(@commands, @telemetry, @warnings, filename, process_target_name)
return
end
# Partial files are included into another file and thus aren't directly processed
return if File.basename(filename)[0] == '_' # Partials start with underscore
@converted_type = nil
@converted_bit_size = nil
@proc_text = ''
@building_generic_conversion = false
process_target_name = process_target_name.upcase
parser = ConfigParser.new("https://ballaerospace.github.io/cosmos-website/docs/v5")
parser.instance_variable_set(:@target_name, process_target_name)
parser.parse_file(filename) do |keyword, params|
if @building_generic_conversion
case keyword
# Complete a generic conversion
when 'GENERIC_READ_CONVERSION_END', 'GENERIC_WRITE_CONVERSION_END'
parser.verify_num_parameters(0, 0, keyword)
@current_item.read_conversion =
GenericConversion.new(@proc_text,
@converted_type,
@converted_bit_size) if keyword.include? "READ"
@current_item.write_conversion =
GenericConversion.new(@proc_text,
@converted_type,
@converted_bit_size) if keyword.include? "WRITE"
@building_generic_conversion = false
# Add the current config.line to the conversion being built
else
@proc_text << parser.line << "\n"
end # case keyword
else # not building generic conversion
case keyword
# Start a new packet
when 'COMMAND'
finish_packet()
@current_packet = PacketParser.parse_command(parser, process_target_name, @commands, @warnings)
@current_cmd_or_tlm = COMMAND
when 'TELEMETRY'
finish_packet()
@current_packet = PacketParser.parse_telemetry(parser, process_target_name, @telemetry, @latest_data, @warnings)
@current_cmd_or_tlm = TELEMETRY
# Select an existing packet for editing
when 'SELECT_COMMAND', 'SELECT_TELEMETRY'
usage = "#{keyword} <TARGET NAME> <PACKET NAME>"
finish_packet()
parser.verify_num_parameters(2, 2, usage)
target_name = process_target_name
target_name = params[0].upcase if target_name == 'SYSTEM'
packet_name = params[1].upcase
@current_packet = nil
if keyword.include?('COMMAND')
@current_cmd_or_tlm = COMMAND
if @commands[target_name]
@current_packet = @commands[target_name][packet_name]
end
else
@current_cmd_or_tlm = TELEMETRY
if @telemetry[target_name]
@current_packet = @telemetry[target_name][packet_name]
end
end
raise parser.error("Packet not found", usage) unless @current_packet
# Start the creation of a new limits group
when 'LIMITS_GROUP'
usage = "LIMITS_GROUP <GROUP NAME>"
parser.verify_num_parameters(1, 1, usage)
@current_limits_group = params[0].to_s.upcase
@limits_groups[@current_limits_group] = [] unless @limits_groups.include?(@current_limits_group)
# Add a telemetry item to the limits group
when 'LIMITS_GROUP_ITEM'
usage = "LIMITS_GROUP_ITEM <TARGET NAME> <PACKET NAME> <ITEM NAME>"
parser.verify_num_parameters(3, 3, usage)
@limits_groups[@current_limits_group] << [params[0].to_s.upcase, params[1].to_s.upcase, params[2].to_s.upcase] if @current_limits_group
#######################################################################
# All the following keywords must have a current packet defined
#######################################################################
when 'SELECT_ITEM', 'SELECT_PARAMETER', 'DELETE_ITEM', 'DELETE_PARAMETER', 'ITEM',\
'PARAMETER', 'ID_ITEM', 'ID_PARAMETER', 'ARRAY_ITEM', 'ARRAY_PARAMETER', 'APPEND_ITEM',\
'APPEND_PARAMETER', 'APPEND_ID_ITEM', 'APPEND_ID_PARAMETER', 'APPEND_ARRAY_ITEM',\
'APPEND_ARRAY_PARAMETER', 'ALLOW_SHORT', 'HAZARDOUS', 'PROCESSOR', 'META',\
'DISABLE_MESSAGES', 'HIDDEN', 'DISABLED'
raise parser.error("No current packet for #{keyword}") unless @current_packet
process_current_packet(parser, keyword, params)
#######################################################################
# All the following keywords must have a current item defined
#######################################################################
when 'STATE', 'READ_CONVERSION', 'WRITE_CONVERSION', 'POLY_READ_CONVERSION',\
'POLY_WRITE_CONVERSION', 'SEG_POLY_READ_CONVERSION', 'SEG_POLY_WRITE_CONVERSION',\
'GENERIC_READ_CONVERSION_START', 'GENERIC_WRITE_CONVERSION_START', 'REQUIRED',\
'LIMITS', 'LIMITS_RESPONSE', 'UNITS', 'FORMAT_STRING', 'DESCRIPTION',\
'MINIMUM_VALUE', 'MAXIMUM_VALUE', 'DEFAULT_VALUE', 'OVERFLOW', 'OVERLAP'
raise parser.error("No current item for #{keyword}") unless @current_item
process_current_item(parser, keyword, params)
else
# blank config.lines will have a nil keyword and should not raise an exception
raise parser.error("Unknown keyword '#{keyword}'") if keyword
end # case keyword
end # if building_generic_conversion
end
# Complete the last defined packet
finish_packet()
end
# Convert the PacketConfig back to COSMOS configuration files for each target
def to_config(output_dir)
FileUtils.mkdir_p(output_dir)
@telemetry.each do |target_name, packets|
next if target_name == 'UNKNOWN'
FileUtils.mkdir_p(File.join(output_dir, target_name, 'cmd_tlm'))
filename = File.join(output_dir, target_name, 'cmd_tlm', target_name.downcase + '_tlm.txt')
begin
File.delete(filename)
rescue
# Doesn't exist
end
packets.each do |packet_name, packet|
File.open(filename, 'a') do |file|
file.puts packet.to_config(:TELEMETRY)
file.puts ""
end
end
end
@commands.each do |target_name, packets|
next if target_name == 'UNKNOWN'
FileUtils.mkdir_p(File.join(output_dir, target_name, 'cmd_tlm'))
filename = File.join(output_dir, target_name, 'cmd_tlm', target_name.downcase + '_cmd.txt')
begin
File.delete(filename)
rescue
# Doesn't exist
end
packets.each do |packet_name, packet|
File.open(filename, 'a') do |file|
file.puts packet.to_config(:COMMAND)
file.puts ""
end
end
end
# Put limits groups into SYSTEM target
if @limits_groups.length > 0
FileUtils.mkdir_p(File.join(output_dir, 'SYSTEM', 'cmd_tlm'))
filename = File.join(output_dir, 'SYSTEM', 'cmd_tlm', 'limits_groups.txt')
File.open(filename, 'w') do |file|
@limits_groups.each do |limits_group_name, limits_group_items|
file.puts "LIMITS_GROUP #{limits_group_name.to_s.quote_if_necessary}"
limits_group_items.each do |target_name, packet_name, item_name|
file.puts " LIMITS_GROUP_ITEM #{target_name.to_s.quote_if_necessary} #{packet_name.to_s.quote_if_necessary} #{item_name.to_s.quote_if_necessary}"
end
file.puts ""
end
end
end
end # def to_config
def to_xtce(output_dir)
XtceConverter.convert(@commands, @telemetry, output_dir)
end
# Add current packet into hash if it exists
def finish_packet
finish_item()
if @current_packet
@warnings += @current_packet.check_bit_offsets
if @current_cmd_or_tlm == COMMAND
PacketParser.check_item_data_types(@current_packet)
@commands[@current_packet.target_name][@current_packet.packet_name] = @current_packet
hash = @cmd_id_value_hash[@current_packet.target_name]
hash = {} unless hash
@cmd_id_value_hash[@current_packet.target_name] = hash
update_id_value_hash(hash)
else
@telemetry[@current_packet.target_name][@current_packet.packet_name] = @current_packet
hash = @tlm_id_value_hash[@current_packet.target_name]
hash = {} unless hash
@tlm_id_value_hash[@current_packet.target_name] = hash
update_id_value_hash(hash)
end
@current_packet = nil
@current_item = nil
end
end
protected
def update_id_value_hash(hash)
if @current_packet.id_items.length > 0
key = []
@current_packet.id_items.each do |item|
key << item.id_value
end
hash[key] = @current_packet
else
hash['CATCHALL'.freeze] = @current_packet
end
end
def reset_processing_variables
@current_cmd_or_tlm = nil
@current_packet = nil
@current_item = nil
@current_limits_group = nil
end
def process_current_packet(parser, keyword, params)
case keyword
# Select or delete an item in the current packet
when 'SELECT_PARAMETER', 'SELECT_ITEM', 'DELETE_PARAMETER', 'DELETE_ITEM'
if (@current_cmd_or_tlm == COMMAND) && (keyword.split('_')[1] == 'ITEM')
raise parser.error("#{keyword} only applies to telemetry packets")
end
if (@current_cmd_or_tlm == TELEMETRY) && (keyword.split('_')[1] == 'PARAMETER')
raise parser.error("#{keyword} only applies to command packets")
end
usage = "#{keyword} <#{keyword.split('_')[1]} NAME>"
finish_item()
parser.verify_num_parameters(1, 1, usage)
begin
if keyword.include?("SELECT")
@current_item = @current_packet.get_item(params[0])
else # DELETE
@current_packet.delete_item(params[0])
end
rescue # Rescue the default execption to provide a nicer error message
raise parser.error("#{params[0]} not found in #{@current_cmd_or_tlm.downcase} packet #{@current_packet.target_name} #{@current_packet.packet_name}", usage)
end
# Start a new telemetry item in the current packet
when 'ITEM', 'PARAMETER', 'ID_ITEM', 'ID_PARAMETER', 'ARRAY_ITEM', 'ARRAY_PARAMETER',\
'APPEND_ITEM', 'APPEND_PARAMETER', 'APPEND_ID_ITEM', 'APPEND_ID_PARAMETER',\
'APPEND_ARRAY_ITEM', 'APPEND_ARRAY_PARAMETER'
start_item(parser)
# Allow this packet to be received with less data than the defined length
# without generating a warning.
when 'ALLOW_SHORT'
@current_packet.short_buffer_allowed = true
# Mark the current command as hazardous
when 'HAZARDOUS'
usage = "HAZARDOUS <HAZARDOUS DESCRIPTION (Optional)>"
parser.verify_num_parameters(0, 1, usage)
@current_packet.hazardous = true
@current_packet.hazardous_description = params[0] if params[0]
# Define a processor class that will be called once when a packet is received
when 'PROCESSOR'
ProcessorParser.parse(parser, @current_packet, @current_cmd_or_tlm)
when 'DISABLE_MESSAGES'
usage = "#{keyword}"
parser.verify_num_parameters(0, 0, usage)
@current_packet.messages_disabled = true
# Store user defined metadata for the packet or a packet item
when 'META'
usage = "META <META NAME> <META VALUES (optional)>"
parser.verify_num_parameters(1, nil, usage)
if params.length > 1
meta_values = params[1..-1]
else
meta_values = []
end
meta_values.each_with_index do |value, index|
if String === value
meta_values[index] = value.to_utf8
end
end
if @current_item
# Item META
@current_item.meta[params[0].to_s.upcase] = meta_values
else
# Packet META
@current_packet.meta[params[0].to_s.upcase] = meta_values
end
when 'HIDDEN'
usage = "#{keyword}"
parser.verify_num_parameters(0, 0, usage)
@current_packet.hidden = true
when 'DISABLED'
usage = "#{keyword}"
parser.verify_num_parameters(0, 0, usage)
@current_packet.hidden = true
@current_packet.disabled = true
end
end
def process_current_item(parser, keyword, params)
case keyword
# Add a state to the current telemety item
when 'STATE'
StateParser.parse(parser, @current_packet, @current_cmd_or_tlm, @current_item, @warnings)
# Apply a conversion to the current item after it is read to or
# written from the packet
when 'READ_CONVERSION', 'WRITE_CONVERSION'
usage = "#{keyword} <conversion class filename> <custom parameters> ..."
parser.verify_num_parameters(1, nil, usage)
begin
# require should be performed in target.txt
klass = params[0].filename_to_class_name.to_class
raise parser.error("#{params[0].filename_to_class_name} class not found. Did you require the file in target.txt?", usage) unless klass
conversion = klass.new(*params[1..(params.length - 1)])
@current_item.public_send("#{keyword.downcase}=".to_sym, conversion)
if klass != ProcessorConversion and (conversion.converted_type.nil? or conversion.converted_bit_size.nil?)
msg = "Read Conversion #{params[0].filename_to_class_name} on item #{@current_item.name} does not specify converted type or bit size"
@warnings << msg
Logger.instance.warn @warnings[-1]
end
rescue Exception => err
raise parser.error(err)
end
# Apply a polynomial conversion to the current item
when 'POLY_READ_CONVERSION', 'POLY_WRITE_CONVERSION'
usage = "#{keyword} <C0> <C1> <C2> ..."
parser.verify_num_parameters(1, nil, usage)
@current_item.read_conversion = PolynomialConversion.new(*params) if keyword.include? "READ"
@current_item.write_conversion = PolynomialConversion.new(*params) if keyword.include? "WRITE"
# Apply a segmented polynomial conversion to the current item
# after it is read from the telemetry packet
when 'SEG_POLY_READ_CONVERSION'
usage = "SEG_POLY_READ_CONVERSION <Lower Bound> <C0> <C1> <C2> ..."
parser.verify_num_parameters(2, nil, usage)
if !(@current_item.read_conversion &&
SegmentedPolynomialConversion === @current_item.read_conversion)
@current_item.read_conversion = SegmentedPolynomialConversion.new
end
@current_item.read_conversion.add_segment(params[0].to_f, *params[1..-1])
# Apply a segmented polynomial conversion to the current item
# before it is written to the telemetry packet
when 'SEG_POLY_WRITE_CONVERSION'
usage = "SEG_POLY_WRITE_CONVERSION <Lower Bound> <C0> <C1> <C2> ..."
parser.verify_num_parameters(2, nil, usage)
if !(@current_item.write_conversion &&
SegmentedPolynomialConversion === @current_item.write_conversion)
@current_item.write_conversion = SegmentedPolynomialConversion.new
end
@current_item.write_conversion.add_segment(params[0].to_f, *params[1..-1])
# Start the definition of a generic conversion.
# All config.lines following this config.line are considered part
# of the conversion until an end of conversion marker is found
when 'GENERIC_READ_CONVERSION_START', 'GENERIC_WRITE_CONVERSION_START'
usage = "#{keyword} <Converted Type (optional)> <Converted Bit Size (optional)>"
parser.verify_num_parameters(0, 2, usage)
@proc_text = ''
@building_generic_conversion = true
@converted_type = nil
@converted_bit_size = nil
if params[0]
@converted_type = params[0].upcase.intern
raise parser.error("Invalid converted_type: #{@converted_type}.") unless [:INT, :UINT, :FLOAT, :STRING, :BLOCK].include? @converted_type
end
@converted_bit_size = Integer(params[1]) if params[1]
if @converted_type.nil? or @converted_bit_size.nil?
msg = "Generic Conversion on item #{@current_item.name} does not specify converted type or bit size"
@warnings << msg
Logger.instance.warn @warnings[-1]
end
# Define a set of limits for the current telemetry item
when 'LIMITS'
@limits_sets << LimitsParser.parse(parser, @current_packet, @current_cmd_or_tlm, @current_item, @warnings)
@limits_sets.uniq!
# Define a response class that will be called when the limits state of the
# current item changes.
when 'LIMITS_RESPONSE'
LimitsResponseParser.parse(parser, @current_item, @current_cmd_or_tlm)
# Define a printf style formatting string for the current telemetry item
when 'FORMAT_STRING'
FormatStringParser.parse(parser, @current_item)
# Define the units of the current telemetry item
when 'UNITS'
usage = "UNITS <FULL UNITS NAME> <ABBREVIATED UNITS NAME>"
parser.verify_num_parameters(2, 2, usage)
@current_item.units_full = params[0]
@current_item.units = params[1]
# Update the description for the current telemetry item
when 'DESCRIPTION'
usage = "DESCRIPTION <DESCRIPTION>"
parser.verify_num_parameters(1, 1, usage)
@current_item.description = params[0]
# Mark the current command parameter as required.
# This means it must be given a value and not just use its default.
when 'REQUIRED'
usage = "REQUIRED"
parser.verify_num_parameters(0, 0, usage)
if @current_cmd_or_tlm == COMMAND
@current_item.required = true
else
raise parser.error("#{keyword} only applies to command parameters")
end
# Update the mimimum value for the current command parameter
when 'MINIMUM_VALUE'
if @current_cmd_or_tlm == TELEMETRY
raise parser.error("#{keyword} only applies to command parameters")
end
usage = "MINIMUM_VALUE <MINIMUM VALUE>"
parser.verify_num_parameters(1, 1, usage)
min = ConfigParser.handle_defined_constants(
params[0].convert_to_value, @current_item.data_type, @current_item.bit_size
)
@current_item.range = Range.new(min, @current_item.range.end)
# Update the maximum value for the current command parameter
when 'MAXIMUM_VALUE'
if @current_cmd_or_tlm == TELEMETRY
raise parser.error("#{keyword} only applies to command parameters")
end
usage = "MAXIMUM_VALUE <MAXIMUM VALUE>"
parser.verify_num_parameters(1, 1, usage)
max = ConfigParser.handle_defined_constants(
params[0].convert_to_value, @current_item.data_type, @current_item.bit_size
)
@current_item.range = Range.new(@current_item.range.begin, max)
# Update the default value for the current command parameter
when 'DEFAULT_VALUE'
if @current_cmd_or_tlm == TELEMETRY
raise parser.error("#{keyword} only applies to command parameters")
end
usage = "DEFAULT_VALUE <DEFAULT VALUE>"
parser.verify_num_parameters(1, 1, usage)
if (@current_item.data_type == :STRING) ||
(@current_item.data_type == :BLOCK)
@current_item.default = params[0]
else
@current_item.default = ConfigParser.handle_defined_constants(
params[0].convert_to_value, @current_item.data_type, @current_item.bit_size
)
end
# Update the overflow type for the current command parameter
when 'OVERFLOW'
usage = "OVERFLOW <OVERFLOW VALUE - ERROR, ERROR_ALLOW_HEX, TRUNCATE, or SATURATE>"
parser.verify_num_parameters(1, 1, usage)
@current_item.overflow = params[0].to_s.upcase.intern
when 'OVERLAP'
parser.verify_num_parameters(0, 0, 'OVERLAP')
@current_item.overlap = true
end
end
def start_item(parser)
finish_item()
@current_item = PacketItemParser.parse(parser, @current_packet, @current_cmd_or_tlm, @warnings)
end
# Finish updating packet item
def finish_item
if @current_item
@current_packet.set_item(@current_item)
if @current_cmd_or_tlm == TELEMETRY
target_latest_data = @latest_data[@current_packet.target_name]
target_latest_data[@current_item.name] ||= []
latest_data_packets = target_latest_data[@current_item.name]
latest_data_packets << @current_packet unless latest_data_packets.include?(@current_packet)
end
@current_item = nil
end
end
end
end