BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/packets/parsers/xtce_converter.rb

Summary

Maintainability
D
1 day
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'nokogiri'
require 'cosmos/packets/parsers/xtce_parser'

module Cosmos
  class XtceConverter
    attr_accessor :current_target_name

    # Output a previously parsed definition file into the XTCE format
    #
    # @param commands [Hash<String=>Packet>] Hash of all the command packets
    #   keyed by the packet name.
    # @param telemetry [Hash<String=>Packet>] Hash of all the telemetry packets
    #   keyed by the packet name.
    #   that were created while parsing the configuration
    # @param output_dir [String] The name of the output directory to generate
    #   the XTCE files. A file is generated for each target.
    def self.convert(commands, telemetry, output_dir)
      XtceConverter.new(commands, telemetry, output_dir)
    end

    private

    def initialize(commands, telemetry, output_dir)
      FileUtils.mkdir_p(output_dir)

      # Build target list
      targets = []
      telemetry.each { |target_name, packets| targets << target_name }
      commands.each { |target_name, packets| targets << target_name }
      targets.uniq!

      targets.each do |target_name|
        next if target_name == 'UNKNOWN'

        # Reverse order of packets for the target so things are expected (reverse) order for xtce
        XtceParser.reverse_packet_order(target_name, commands)
        XtceParser.reverse_packet_order(target_name, telemetry)

        FileUtils.mkdir_p(File.join(output_dir, target_name, 'cmd_tlm'))
        filename = File.join(output_dir, target_name, 'cmd_tlm', target_name.downcase + '.xtce')
        begin
          File.delete(filename)
        rescue
          # Doesn't exist
        end

        # Create the xtce file for this target
        builder = Nokogiri::XML::Builder.new(:encoding => 'UTF-8') do |xml|
          xml['xtce'].SpaceSystem("xmlns:xtce" => "http://www.omg.org/space/xtce",
                                  "xmlns:xsi" => "http://www.w3.org/2001/XMLSchema-instance",
                                  "name" => target_name,
                                  "xsi:schemaLocation" => "http://www.omg.org/space/xtce http://www.omg.org/spec/XTCE/20061101/06-11-06.xsd") do
            create_telemetry(xml, telemetry, target_name)
            create_commands(xml, commands, target_name)
          end # SpaceSystem
        end # builder
        File.open(filename, 'w') do |file|
          file.puts builder.to_xml
        end
      end
    end

    def create_telemetry(xml, telemetry, target_name)
      # Gather and make unique all the packet items
      unique_items = telemetry[target_name] ? get_unique(telemetry[target_name]) : {}

      xml['xtce'].TelemetryMetaData do
        xml['xtce'].ParameterTypeSet do
          unique_items.each do |item_name, item|
            to_xtce_type(item, 'Parameter', xml)
          end
        end

        xml['xtce'].ParameterSet do
          unique_items.each do |item_name, item|
            to_xtce_item(item, 'Parameter', xml)
          end
        end

        if telemetry[target_name]
          xml['xtce'].ContainerSet do
            telemetry[target_name].each do |packet_name, packet|
              attrs = { :name => (packet_name + '_Base'), :abstract => "true" }
              xml['xtce'].SequenceContainer(attrs) do
                process_entry_list(xml, packet, :TELEMETRY)
              end

              attrs = { :name => packet_name }
              attrs['shortDescription'] = packet.description if packet.description
              xml['xtce'].SequenceContainer(attrs) do
                xml['xtce'].EntryList
                xml['xtce'].BaseContainer(:containerRef => (packet_name + '_Base')) do
                  if packet.id_items && packet.id_items.length > 0
                    xml['xtce'].RestrictionCriteria do
                      xml['xtce'].ComparisonList do
                        packet.id_items.each do |item|
                          xml['xtce'].Comparison(:parameterRef => item.name, :value => item.id_value)
                        end
                      end
                    end
                  end
                end
              end # SequenceContainer
            end # telemetry.each
          end # ContainerSet
        end # if telemetry[target_name]
      end # TelemetryMetaData
    end

    def create_commands(xml, commands, target_name)
      return unless commands[target_name]

      xml['xtce'].CommandMetaData do
        xml['xtce'].ArgumentTypeSet do
          get_unique(commands[target_name]).each do |arg_name, arg|
            to_xtce_type(arg, 'Argument', xml)
          end
        end
        xml['xtce'].MetaCommandSet do
          commands[target_name].each do |packet_name, packet|
            attrs = { :name => packet_name + "_Base", :abstract => "true" }
            xml['xtce'].MetaCommand(attrs) do
              xml['xtce'].ArgumentList do
                packet.sorted_items.each do |item|
                  next if item.data_type == :DERIVED

                  to_xtce_item(item, 'Argument', xml)
                end
              end # ArgumentList
              xml['xtce'].CommandContainer(:name => "#{target_name}_#{packet_name}_CommandContainer") do
                process_entry_list(xml, packet, :COMMAND)
              end
            end # Abstract MetaCommand

            attrs = { :name => packet_name }
            attrs['shortDescription'] = packet.description if packet.description
            xml['xtce'].MetaCommand(attrs) do
              xml['xtce'].BaseMetaCommand(:metaCommandRef => packet_name + "_Base") do
                if packet.id_items && packet.id_items.length > 0
                  xml['xtce'].ArgumentAssignmentList do
                    packet.id_items.each do |item|
                      xml['xtce'].ArgumentAssignment(:argumentName => item.name, :argumentValue => item.id_value)
                    end
                  end # ArgumentAssignmentList
                end
              end # BaseMetaCommand
            end # Actual MetaCommand
          end # commands.each
        end # MetaCommandSet
      end # CommandMetaData
    end

    def get_unique(items)
      unique = {}
      items.each do |packet_name, packet|
        packet.sorted_items.each do |item|
          next if item.data_type == :DERIVED

          unique[item.name] ||= []
          unique[item.name] << item
        end
      end
      unique.each do |item_name, unique_items|
        if unique_items.length <= 1
          unique[item_name] = unique_items[0]
          next
        end
        # TODO: need to make sure all the items in the array are exactly the same
        unique[item_name] = unique_items[0]
      end
      unique
    end

    # This method is almost the same for commands and telemetry except for the
    # XML element name: [Array]ArgumentRefEntry vs [Array]ParameterRefEntry,
    # and XML reference: argumentRef vs parameterRef.
    # Thus we build the name and use send to dynamically dispatch.
    def process_entry_list(xml, packet, cmd_vs_tlm)
      if cmd_vs_tlm == :COMMAND
        type = "Argument"
      else # :TELEMETRY
        type = "Parameter"
      end
      xml['xtce'].EntryList do
        packed = packet.packed?
        packet.sorted_items.each do |item|
          next if item.data_type == :DERIVED

          # TODO: Handle nonunique item names
          if item.array_size
            # Requiring parameterRef for argument arrays appears to be a defect in the schema
            xml['xtce'].public_send("Array#{type}RefEntry".intern, :parameterRef => item.name) do
              set_fixed_value(xml, item) if !packed
              xml['xtce'].DimensionList do
                xml['xtce'].Dimension do
                  xml['xtce'].StartingIndex do
                    xml['xtce'].FixedValue(0)
                  end
                  xml['xtce'].EndingIndex do
                    xml['xtce'].FixedValue((item.array_size / item.bit_size) - 1)
                  end
                end
              end
            end
          else
            if packed
              xml['xtce'].public_send("#{type}RefEntry".intern, "#{type.downcase}Ref".intern => item.name)
            else
              xml['xtce'].public_send("#{type}RefEntry".intern, "#{type.downcase}Ref".intern => item.name) do
                set_fixed_value(xml, item)
              end
            end
          end
        end
      end
    end

    def set_fixed_value(xml, item)
      if item.bit_offset >= 0
        xml['xtce'].LocationInContainerInBits(:referenceLocation => 'containerStart') do
          xml['xtce'].FixedValue(item.bit_offset)
        end
      else
        xml['xtce'].LocationInContainerInBits(:referenceLocation => 'containerEnd') do
          xml['xtce'].FixedValue(-item.bit_offset)
        end
      end
    end

    def to_xtce_type(item, param_or_arg, xml)
      # TODO: Spline Conversions
      case item.data_type
      when :INT, :UINT
        to_xtce_int(item, param_or_arg, xml)
      when :FLOAT
        to_xtce_float(item, param_or_arg, xml)
      when :STRING
        to_xtce_string(item, param_or_arg, xml, 'String')
      when :BLOCK
        to_xtce_string(item, param_or_arg, xml, 'Binary')
      when :DERIVED
        raise "DERIVED data type not supported in XTCE"
      end

      # Handle arrays
      if item.array_size
        # The above will have created the type for the array entries.   Now we create the type for the actual array.
        attrs = { :name => (item.name + '_ArrayType') }
        attrs[:shortDescription] = item.description if item.description
        attrs[:arrayTypeRef] = (item.name + '_Type')
        attrs[:numberOfDimensions] = '1' # COSMOS Only supports one-dimensional arrays
        xml['xtce'].public_send('Array' + param_or_arg + 'Type', attrs)
      end
    end

    def to_xtce_limits(item, xml)
      return unless item.limits && item.limits.values

      item.limits.values.each do |limits_set, limits_values|
        if limits_set == :DEFAULT
          xml['xtce'].DefaultAlarm do
            xml['xtce'].StaticAlarmRanges do
              xml['xtce'].WarningRange(:minInclusive => limits_values[1], :maxInclusive => limits_values[2])
              xml['xtce'].CriticalRange(:minInclusive => limits_values[0], :maxInclusive => limits_values[3])
            end
          end
        end
      end
    end

    def to_xtce_int(item, param_or_arg, xml)
      attrs = { :name => (item.name + '_Type') }
      attrs[:initialValue] = item.default if item.default and !item.array_size
      attrs[:shortDescription] = item.description if item.description
      if item.states and item.default and item.states.key(item.default)
        attrs[:initialValue] = item.states.key(item.default) and !item.array_size
      end
      if item.data_type == :INT
        signed = 'true'
        encoding = 'twosCompliment'
      else
        signed = 'false'
        encoding = 'unsigned'
      end
      if item.states
        xml['xtce'].public_send('Enumerated' + param_or_arg + 'Type', attrs) do
          to_xtce_endianness(item, xml)
          to_xtce_units(item, xml)
          xml['xtce'].IntegerDataEncoding(:sizeInBits => item.bit_size, :encoding => encoding)
          xml['xtce'].EnumerationList do
            item.states.each do |state_name, state_value|
              # Skip the special COSMOS 'ANY' enumerated state
              next if state_value == 'ANY'

              xml['xtce'].Enumeration(:value => state_value, :label => state_name)
            end
          end
        end
      else
        if (item.read_conversion and item.read_conversion.class == PolynomialConversion) or (item.write_conversion and item.write_conversion.class == PolynomialConversion)
          type_string = 'Float' + param_or_arg + 'Type'
        else
          type_string = 'Integer' + param_or_arg + 'Type'
          attrs[:signed] = signed
        end
        xml['xtce'].public_send(type_string, attrs) do
          to_xtce_endianness(item, xml)
          to_xtce_units(item, xml)
          if (item.read_conversion and item.read_conversion.class == PolynomialConversion) or (item.write_conversion and item.write_conversion.class == PolynomialConversion)
            xml['xtce'].IntegerDataEncoding(:sizeInBits => item.bit_size, :encoding => encoding) do
              to_xtce_conversion(item, xml)
            end
          else
            xml['xtce'].IntegerDataEncoding(:sizeInBits => item.bit_size, :encoding => encoding)
          end
          to_xtce_limits(item, xml)
          if item.range
            xml['xtce'].ValidRange(:minInclusive => item.range.first, :maxInclusive => item.range.last)
          end
        end # Type
      end # if item.states
    end

    def to_xtce_float(item, param_or_arg, xml)
      attrs = { :name => (item.name + '_Type'), :sizeInBits => item.bit_size }
      attrs[:initialValue] = item.default if item.default and !item.array_size
      attrs[:shortDescription] = item.description if item.description
      xml['xtce'].public_send('Float' + param_or_arg + 'Type', attrs) do
        to_xtce_endianness(item, xml)
        to_xtce_units(item, xml)
        if (item.read_conversion and item.read_conversion.class == PolynomialConversion) or (item.write_conversion and item.write_conversion.class == PolynomialConversion)
          xml['xtce'].FloatDataEncoding(:sizeInBits => item.bit_size, :encoding => 'IEEE754_1985') do
            to_xtce_conversion(item, xml)
          end
        else
          xml['xtce'].FloatDataEncoding(:sizeInBits => item.bit_size, :encoding => 'IEEE754_1985')
        end
        to_xtce_limits(item, xml)
        if item.range
          xml['xtce'].ValidRange(:minInclusive => item.range.first, :maxInclusive => item.range.last)
        end
      end
    end

    def to_xtce_string(item, param_or_arg, xml, string_or_binary)
      # TODO: COSMOS Variably sized strings are not supported in XTCE
      attrs = { :name => (item.name + '_Type') }
      attrs[:characterWidth] = 8 if string_or_binary == 'String'
      if item.default && !item.array_size
        unless item.default.is_printable?
          attrs[:initialValue] = '0x' + item.default.simple_formatted
        else
          attrs[:initialValue] = item.default.inspect
        end
      end
      attrs[:shortDescription] = item.description if item.description
      xml['xtce'].public_send(string_or_binary + param_or_arg + 'Type', attrs) do
        # Don't call to_xtce_endianness for Strings or Blocks
        to_xtce_units(item, xml)
        if string_or_binary == 'String'
          xml['xtce'].StringDataEncoding(:encoding => 'UTF-8') do
            xml['xtce'].SizeInBits do
              xml['xtce'].Fixed do
                xml['xtce'].FixedValue(item.bit_size.to_s)
              end
            end
          end
        else
          xml['xtce'].BinaryDataEncoding do
            xml['xtce'].SizeInBits do
              xml['xtce'].FixedValue(item.bit_size.to_s)
            end
          end
        end
      end
    end

    def to_xtce_item(item, param_or_arg, xml)
      if item.array_size
        xml['xtce'].public_send(param_or_arg, :name => item.name, "#{param_or_arg.downcase}TypeRef" => item.name + '_ArrayType')
      else
        xml['xtce'].public_send(param_or_arg, :name => item.name, "#{param_or_arg.downcase}TypeRef" => item.name + '_Type')
      end
    end

    def to_xtce_units(item, xml)
      if item.units
        xml['xtce'].UnitSet do
          xml['xtce'].Unit(item.units, :description => item.units_full)
        end
      else
        xml['xtce'].UnitSet
      end
    end

    def to_xtce_endianness(item, xml)
      if item.endianness == :LITTLE_ENDIAN and item.bit_size > 8
        xml['xtce'].ByteOrderList do
          (((item.bit_size - 1) / 8) + 1).times do |byte_significance|
            xml['xtce'].Byte(:byteSignificance => byte_significance)
          end
        end
      end
    end

    def to_xtce_conversion(item, xml)
      if item.read_conversion
        conversion = item.read_conversion
      else
        conversion = item.write_conversion
      end
      if conversion && conversion.class == PolynomialConversion
        xml['xtce'].DefaultCalibrator do
          xml['xtce'].PolynomialCalibrator do
            conversion.coeffs.each_with_index do |coeff, index|
              xml['xtce'].Term(:coefficient => coeff, :exponent => index)
            end
          end
        end
      end
    end
  end
end