cosmos/lib/cosmos/interfaces/linc_interface.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
require 'cosmos/interfaces/tcpip_client_interface'
require 'uuidtools'
module Cosmos
# Interface for connecting to Ball Aerospace LINC Labview targets
class LincInterface < TcpipClientInterface
# The maximum number of asynchronous commands we can wait for at a time.
# We don't ever expect to get close to this but we need to limit it
# to ensure the Array doesn't grow out of control.
MAX_CONCURRENT_HANDSHAKES = 1000
def initialize(
hostname,
port,
handshake_enabled = true,
response_timeout = 5.0,
read_timeout = nil,
write_timeout = 5.0,
length_bitoffset = 0,
length_bitsize = 16,
length_value_offset = 4,
fieldname_guid = 'HDR_GUID',
endianness = 'BIG_ENDIAN',
fieldname_cmd_length = 'HDR_LENGTH'
)
# Initialize Super Class
super(hostname, port, port, write_timeout, read_timeout, 'LENGTH',
length_bitoffset, length_bitsize, length_value_offset, 1, endianness, 0, nil, nil)
# Configuration Settings
@handshake_enabled = ConfigParser.handle_true_false(handshake_enabled)
@handshake_enableds = nil
@response_timeout = response_timeout.to_f
@length_value_offset = Integer(length_value_offset)
@fieldname_guid = ConfigParser.handle_nil(fieldname_guid)
@fieldname_cmd_length = ConfigParser.handle_nil(fieldname_cmd_length)
# Other instance variables
@ignored_error_codes = {}
@handshake_cmds = []
@handshakes_mutex = Mutex.new
# Call this once now because the first time is slow
UUIDTools::UUID.random_create.raw
end # def initialize
def connect
# Packet definitions need to be retrieved here because @target_names is not filled in until after initialize
unless @handshake_enableds
@handshake_enableds = {}
@target_names.each do |target_name|
@handshake_enableds[target_name] = @handshake_enabled
@ignored_error_codes[target_name] = []
end
end
@handshake_packets = []
@error_packets = []
@error_ignore_commands = nil
@error_handle_commands = nil
@handshake_enable_commands = nil
@handshake_disable_commands = nil
@target_names.each do |target_name|
@handshake_packets << System.telemetry.packet(target_name, 'HANDSHAKE')
@error_packets << System.telemetry.packet(target_name, 'ERROR')
# Handle not defining the interface configuration commands (Targets may not want to support this functionality)
begin
command = System.commands.packet(target_name, 'COSMOS_ERROR_IGNORE')
@error_ignore_commands ||= []
@error_ignore_commands << command
rescue
end
begin
command = System.commands.packet(target_name, 'COSMOS_ERROR_HANDLE')
@error_handle_commands ||= []
@error_handle_commands << command
rescue
end
begin
command = System.commands.packet(target_name, 'COSMOS_HANDSHAKE_EN')
@handshake_enable_commands ||= []
@handshake_enable_commands << command
rescue
end
begin
command = System.commands.packet(target_name, 'COSMOS_HANDSHAKE_DS')
@handshake_disable_commands ||= []
@handshake_disable_commands << command
rescue
end
end
@handshakes_mutex.synchronize do
@handshake_cmds = []
end
# Actually connect
super()
end
def write(packet)
return if linc_interface_command(packet)
raise "Interface not connected" unless connected?()
# Add a GUID to the GUID field if its defined
# A GUID means it's an asychronous packet type.
if @fieldname_guid
guid = get_guid(packet)
else
# If @fieldname_guid is not defined (syncronous) we don't care what the
# GUID is because we're not trying to match it up with anything.
# As soon as we get a response we free the command.
guid = 0
end
# Fix the length field to handle the cases where a variable length packet
# is defined. COSMOS does not do this automatically.
update_length_field(packet) if @fieldname_cmd_length
# Always take the mutex (even if we aren't handshaking)
# We do not want any incoming telemetry to be missed because
# it could be the handshake to this command.
@handshakes_mutex.synchronize do
super(packet) # Send the command
wait_for_response(packet, guid) if @handshake_enableds[packet.target_name]
end
end
def linc_interface_command(packet)
if @error_ignore_commands
@error_ignore_commands.each do |error_ignore_command|
if error_ignore_command.identify?(packet.buffer(false))
linc_cmd = error_ignore_command.clone
linc_cmd.buffer = packet.buffer
code = linc_cmd.read('CODE')
@ignored_error_codes[error_ignore_command.target_name] << code unless @ignored_error_codes[error_ignore_command.target_name].include? code
return true
end
end
end
if @error_handle_commands
@error_handle_commands.each do |error_handle_command|
if error_handle_command.identify?(packet.buffer(false))
linc_cmd = error_handle_command.clone
linc_cmd.buffer = packet.buffer
code = linc_cmd.read('CODE')
@ignored_error_codes[error_handle_command.target_name].delete(code) if @ignored_error_codes[error_handle_command.target_name].include? code
return true
end
end
end
if @handshake_enable_commands
@handshake_enable_commands.each do |handshake_enable_command|
if handshake_enable_command.identify?(packet.buffer(false))
@handshake_enabled = true
@handshake_enableds[handshake_enable_command.target_name] = true
return true
end
end
end
if @handshake_disable_commands
@handshake_disable_commands.each do |handshake_disable_command|
if handshake_disable_command.identify?(packet.buffer(false))
@handshake_enabled = false
@handshake_enableds[handshake_disable_command.target_name] = false
return true
end
end
end
return false
end
def get_guid(packet)
if not packet.read(@fieldname_guid) =~ /[\x01-\xFF]/
# The GUID has not been set already (it has all \x00 values), so make a new one.
# This enables a router GUI to make the GUIDs so it can process handshakes too.
guid = UUIDTools::UUID.random_create.raw
packet.write(@fieldname_guid, guid, :RAW)
else
guid = packet.read(@fieldname_guid)
end
return guid
end
def update_length_field(packet)
length = packet.length - @length_value_offset
packet.write(@fieldname_cmd_length, length, :RAW)
end
def wait_for_response(packet, guid)
# Check the number of commands waiting for handshakes. This is just for sanity
# If the number of commands waiting for handshakes is very large then it can't be real
# So raise an error. Something has gone horribly wrong.
if @handshake_cmds.length > MAX_CONCURRENT_HANDSHAKES
len = @handshake_cmds.length
@handshake_cmds = []
raise "The number of commands waiting for handshakes to #{len}. Clearing all commands!"
end
# Create a handshake command object and add it to the list of commands waiting
handshake_cmd = LincHandshakeCommand.new(@handshakes_mutex, guid)
@handshake_cmds.push(handshake_cmd)
# wait for that handshake. This releases the mutex so that the telemetry and other commands can start running again.
timed_out = handshake_cmd.wait_for_handshake(@response_timeout)
# We now have the mutex again. This interface is blocked for the rest of the command handling,
# which should be quick because it's just checking variables and logging.
# We want to hold the mutex during that so that the commands get logged in order of handshake from here.
if timed_out
# Clean this command out of the array of items that require handshakes.
@handshake_cmds.delete_if { |hsc| hsc == handshake_cmd }
raise "Timeout waiting for handshake from #{System.commands.format(packet, System.targets[packet.target_name].ignored_parameters)}"
end
process_handshake_results(handshake_cmd)
rescue Exception => err
# If anything goes wrong after successfully writing the packet to the LINC target
# ensure that the packet gets updated in the CVT and logged to the packet log writer.
# COSMOS normally only does this if write returns successfully
if packet.identified?
command = System.commands.packet(packet.target_name, packet.packet_name)
else
command = System.commands.packet('UNKNOWN', 'UNKNOWN')
end
command.buffer = packet.buffer
@packet_log_writer_pairs.each do |packet_log_writer_pair|
packet_log_writer_pair.cmd_log_writer.write(packet)
end
raise err
end
def process_handshake_results(handshake_cmd)
status = handshake_cmd.handshake.handshake.read('STATUS')
code = handshake_cmd.handshake.handshake.read('CODE')
source = handshake_cmd.handshake.error_source
# Handle handshake warnings and errors
if status == "OK" and code != 0
unless @ignored_error_codes[handshake_cmd.handshake.handshake.target_name].include? code
Logger.warn "#{@name}: Warning sending command (#{code}): #{source}"
end
elsif status == "ERROR"
unless @ignored_error_codes[handshake_cmd.handshake.handshake.target_name].include? code
raise "Error sending command (#{code}): #{source}"
end
end
end
def read
packet = super()
if packet
@handshake_packets.each do |handshake_packet|
if handshake_packet.identify?(packet.buffer(false))
handshake_packet = handshake_packet.clone
handshake_packet.buffer = packet.buffer
linc_handshake = LincHandshake.new(handshake_packet, handshake_packet.target_name)
# Check for a local handshake
if handshake_packet.read('origin') == "LCL"
handle_local_handshake(linc_handshake)
else
handle_remote_handshake(linc_handshake) if @handshake_enableds[handshake_packet.target_name]
end # if handshake_packet.read('origin') == "LCL"
end # @handshake_packet.identify?(packet.buffer(false))
end
end # if packet
return packet
end
def handle_local_handshake(linc_handshake)
# Update the current value table for this command
command = System.commands.packet(linc_handshake.identified_command.target_name, linc_handshake.identified_command.packet_name)
command.received_time = linc_handshake.identified_command.received_time
command.buffer = linc_handshake.identified_command.buffer
command.received_count += 1
# Put a log of the command onto the server for the user to see
Logger.info("#{@name}: External Command: " + System.commands.format(linc_handshake.identified_command, System.targets[linc_handshake.identified_command.target_name].ignored_parameters))
# Log the command to the command log(s)
@packet_log_writer_pairs.each do |packet_log_writer_pair|
packet_log_writer_pair.cmd_log_writer.write(linc_handshake.identified_command)
end
end
def handle_remote_handshake(linc_handshake)
# This is a remote packet (sent from here).
# Add to the array of handshake packet responses
# The mutex is required by the command task due to the way it
# first looks up the handshake before removing it.
@handshakes_mutex.synchronize do
if @fieldname_guid
# A GUID means it's an asychronous packet type.
# So look at the list of incoming handshakes and pick off (deleting)
# the handshake from the list if it's for this command.
#
# The mutex is required because the telemetry task
# could enqueue a response between the index lookup and the slice
# function which would remove the wrong response. FAIL!
# Loop through all waiting commands to see if this handshake belongs to them
this_handshake_guid = linc_handshake.get_cmd_guid(@fieldname_guid)
handshake_cmd_index = @handshake_cmds.index { |hsc| hsc.get_cmd_guid == this_handshake_guid }
# If command was waiting (ie the loop above found one), then remove it from waiters and signal it
if handshake_cmd_index
handshake_cmd = @handshake_cmds.slice!(handshake_cmd_index)
handshake_cmd.got_your_handshake(linc_handshake)
else
# No command match found! Either it gave up and timed out or this wasn't originated from here.
# Ignore this typically. This case here for clarity.
end
else
# Synchronous version: just pop the array (pull the command off) and send it the handshake
handshake_cmd = @handshakes_cmds.pop
handshake_cmd.got_your_handshake(linc_handshake)
end # of handshaking type check
end
end
end # class LincInterface
# The LincHandshakeCommand class is used only by the LincInterface.
# It is the command with other required items that is passed to the telemetry
# thread so it can match it with the handshake.
class LincHandshakeCommand
attr_accessor :handshake
def initialize(handshakes_mutex, cmd_guid)
@cmd_guid = cmd_guid
@handshakes_mutex = handshakes_mutex
@resource = ConditionVariable.new
@handshake = nil
end
def wait_for_handshake(response_timeout)
timed_out = false
@resource.wait(@handshakes_mutex, response_timeout)
if @handshake
timed_out = false
else
Logger.warn "#{@name}: No handshake - must be timeout."
timed_out = true
end
return timed_out
end
def got_your_handshake(handshake)
@handshake = handshake
@resource.signal
end
def get_cmd_guid
return @cmd_guid
end
end # class LincHandshakeCommand
# The LincHandshake class is used only by the LincInterface. It processes the handshake and
# helps with finding the information regarding the internal command.
class LincHandshake
attr_accessor :handshake
attr_accessor :identified_command
attr_accessor :error_source
def initialize(handshake, interface_target_name)
@handshake = handshake
# Interpret the command field of the handshake packet
# Where DATA is defined as:
# 1 byte target name length
# X byte target name
# 1 byte packet name length
# X byte packet name
# 4 byte packet data length
# X byte packet data
# 4 byte error source length
# X byte error source
data = handshake.read('DATA')
raise "Data field too short for target name length" if data.length == 0
# Get target name length
target_name_length = data[0..0].unpack('C')[0]
raise "Invalid target name length" if target_name_length == 0
data = data[1..-1]
# get target name
raise "Data field too short for target name" if data.length < target_name_length
# target_name = data[0..(target_name_length - 1)] # Unused
data = data[target_name_length..-1]
# get packet name length
raise "Data field too short for packet name length" if data.length == 0
packet_name_length = data[0..0].unpack('C')[0]
raise "Invalid packet name length" if packet_name_length == 0
data = data[1..-1]
# get packet name
raise "Data field too short for packet name" if data.length < packet_name_length
packet_name = data[0..(packet_name_length - 1)]
data = data[packet_name_length..-1]
# get packet data length
raise "Data field too short for packet data length" if data.length < 4
packet_data_length = data[0..3].unpack('N')[0]
raise "Invalid data length" if packet_data_length == 0
data = data[4..-1]
# get packet data
raise "Data field too short for packet data" if data.length < packet_data_length
packet_data = data[0..(packet_data_length - 1)]
data = data[packet_data_length..-1]
# get error source length
raise "Data field too short for error source length" if data.length < 4
error_source_length = data[0..3].unpack('N')[0]
# note it is OK to have a 0 source length
data = data[4..-1]
# get error source - store on object
if error_source_length > 0
@error_source = data[0..(error_source_length - 1)]
else
@error_source = ''
end
# make packet - store on object as a defined packet of type command that this handshakes
@identified_command = System.commands.packet(interface_target_name, packet_name).clone
@identified_command.buffer = packet_data
@identified_command.received_time = Time.at(handshake.read('TIME_SECONDS'), handshake.read('TIME_MICROSECONDS')).sys
end
def get_cmd_guid(fieldname_guid)
return @identified_command.read(fieldname_guid)
end
end
end