schwehr/libais

View on GitHub
test/vdm_test.py

Summary

Maintainability
D
3 days
Test Coverage
#!/usr/bin/env python

"""Tests for ais.vdm."""

import unittest

from ais import vdm
import six


class TestCase(unittest.TestCase):

  def assertDictContainsSubset2(self, actual, expected):
    # assertDictContainsSubset was deprecated because of incorrect arg order.
    # This method has the correct order.
    self.assertIsInstance(expected, dict)
    self.assertIsInstance(actual, dict)
    for key, value in expected.items():
      self.assertEqual(actual[key], value, 'kv: %s, %s' % (key, value))


class VdmRegexTest(unittest.TestCase):
  """Test handling simple NMEA messages without TAG BLOCK info."""

  def testRegexSingleLineMessage(self):
    line = '!AIVDM,1,1,,A,14VIk0002sMM04vE>V9jGimn08RP,0*0D'
    match = vdm.VDM_RE.match(line).groupdict()
    expected = {
        'body': '14VIk0002sMM04vE>V9jGimn08RP',
        'chan': 'A',
        'checksum': '0D',
        'fill_bits': '0',
        'sen_num': '1',
        'seq_id': None,
        'vdm_type': 'VDM',
        'talker': 'AI',
        'sen_tot': '1',
        'vdm': '!AIVDM,1,1,,A,14VIk0002sMM04vE>V9jGimn08RP,0*0D'}
    self.assertEqual(match, expected)

  def testRegexTwoLineMessage(self):
    lines = (
        # pylint: disable=line-too-long
        '!AIVDM,2,1,2,B,5KNp?6@00000=Phb220u1@Tlv0TV22222222220N0h:22400000000000000,0*2A',
        '!AIVDM,2,2,2,B,00000000000,2*25')
    expected = (
        {'body': '5KNp?6@00000=Phb220u1@Tlv0TV22222222220N0h:22400000000000000',
         'chan': 'B',
         'checksum': '2A',
         'fill_bits': '0',
         'sen_num': '1',
         'seq_id': '2',
         'vdm_type': 'VDM',
         'talker': 'AI',
         'sen_tot': '2',
         # pylint: disable=line-too-long
         'vdm': '!AIVDM,2,1,2,B,5KNp?6@00000=Phb220u1@Tlv0TV22222222220N0h:22400000000000000,0*2A'
        },
        {'body': '00000000000',
         'chan': 'B',
         'checksum': '25',
         'fill_bits': '2',
         'sen_num': '2',
         'seq_id': '2',
         'vdm_type': 'VDM',
         'talker': 'AI',
         'sen_tot': '2',
         'vdm': '!AIVDM,2,2,2,B,00000000000,2*25'
        })
    matches = [vdm.VDM_RE.match(line).groupdict() for line in lines]
    self.assertEqual(matches[0], expected[0])
    self.assertEqual(matches[1], expected[1])


class VdmLinesTest(unittest.TestCase):

  def testVdmLinesGenerator(self):
    lines = (
        '',
        '!AIVDM,2,2,2,B,00000000000,2*25 \n',
        '$ARVSI,r003669945,5,201704.05687473,0152,-085,0*2E',
        r'\g:3-3-42349,n:111460*1E\$',
    )
    generator = vdm.VdmLines(lines)
    self.assertEqual(six.next(generator), '!AIVDM,2,2,2,B,00000000000,2*25')
    self.assertRaises(StopIteration, six.next, generator)


class ParseTest(TestCase):

  def testSingleLineParse(self):
    line = '!AIVDM,1,1,,A,B7OeqLP0mB4<LFKO7noASwcUoP06,0*51'
    message = vdm.Parse(line)
    expected = {
        'body': 'B7OeqLP0mB4<LFKO7noASwcUoP06',
        'checksum': '51',
        'vdm_type': 'VDM',
        'fill_bits': 0,
        'talker': 'AI',
        'chan': 'A',
        'sen_num': 1,
        'seq_id': None,
        'sen_tot': 1}
    self.assertDictContainsSubset2(message, expected)


class BareQueueTest(unittest.TestCase):

  def setUp(self):
    self.queue = vdm.BareQueue()

  def testPassThroughText(self):
    # Pass through anything that does not totally match a NMEA sentence.
    lines = (
        '',
        '\n',
        ' \n',
        ' \n\r',
        '# comment',
        '$GPZDA,050004,29,11,2012,-5,00*',  # No checksum.
        'GPZDA,050013,29,11,2012,-5,00*5D',  # No initial [$!].
    )
    for line_num, line in enumerate(lines):
      self.queue.put(line)
      self.assertEqual(self.queue.qsize(), 1)
      expected = {
          'line_nums': [line_num + 1],
          'lines': [line.rstrip()]}
      message = self.queue.get()
      self.assertEqual(message, expected,
                       'pass through fail. %d "%s"\n  %s != %s'
                       % (line_num, line, message, expected))
      self.assertEqual(self.queue.qsize(), 0)

  def testSingleLineNmeaSentencesIgnored(self):
    lines = (
        '$GPGGA,000000,4308.1252,N,07056.3763,W,2,9,0.9,35.2,M,,,,*0A',
        '$HCHDT,25.6,T*18',
        '$GPVTG,269.5,T,284.9,M,0.0,N,0.0,K,D*29',
        '$WIMDA,29.1166,I,0.9860,B,19.6,C,,,,,,,175.8,T,191.2,M,2.0,N,1.0,M*2B',
        '$GPVTG,268.6,T,284.0,M,0.1,N,0.1,K,D*2',
        '$GPGGA,000003,4308.1252,N,07056.3763,W,2,9,0.9,35.3,M,,,,*08')

    for line_num, line in enumerate(lines):
      self.queue.put(line)
      self.assertEqual(self.queue.qsize(), 1)
      expected = {
          'line_nums': [line_num + 1],
          'lines': [line.rstrip()]}
      message = self.queue.get()
      self.assertEqual(message, expected)
      self.assertEqual(self.queue.qsize(), 0)

  def testSingleLineVdm(self):
    # Fully check a single line AIS VDM message going through the
    # queue and being decoded.

    line_num = 100

    line = '!SAVDM,1,1,,B,K8VSqb9LdU28WP8P,0*7B'
    self.queue.put(line, line_num)
    self.assertEqual(self.queue.qsize(), 1)
    expected = {
        'line_nums': [line_num],
        'lines': [line.rstrip()],
        'decoded': {
            'sog': 0,
            'spare': 0,
            'raim': False,
            'gnss': True,
            'position_accuracy': 1,
            'nav_status': 5,
            'repeat_indicator': 0,
            'y': 29.145,
            'x': -90.20666666666666,
            'cog': 136,
            'md5': '6e1a4872825054e91ee7cfcfb9cc87e0',
            'mmsi': 577305000,
            'id': 27},
        'matches': [{
            'body': 'K8VSqb9LdU28WP8P',
            'fill_bits': 0,
            'sen_num': 1,
            'sen_tot': 1,
            'vdm_type': 'VDM',
            'talker': 'SA',
            'chan': 'B',
            'seq_id': None,
            'checksum': '7B',
            'vdm': '!SAVDM,1,1,,B,K8VSqb9LdU28WP8P,0*7B'}]
    }
    message = self.queue.get()
    self.assertEqual(message, expected)
    self.assertEqual(self.queue.qsize(), 0)

  def testManySingleLineVdms(self):
    # Pass a series of independent one line messages through a queue.
    # All of these lines should be decoded.  The only interaction
    # between messages is that the line number is incremented.
    # Only do spot checks on the results.
    lines = (
        '!AIVDM,1,1,,A,15N:pmP002Jd``FGB:hm619`00R5,0*42',
        '!BSVDO,1,1,,B,4h3OdJQutqGssIw1T`JFhg700d09,0*34',
        '!SAVDM,1,1,,B,78KDut1BAFeu,0*5A',
        '!AIVDM,1,1,,B,9oVAuAI5;rRRv2OqTi?1uoP?=a@1,0*74',
        '!AIVDM,1,1,,A,:4`bLl0p3;Qd,0*77',
        '!AIVDM,1,1,,A,;3P<f6iuiq00aOUu8DOD@j100000,0*44',
        '!AIVDM,1,1,,B,B6:VU2P0<:;2r84N5obLOwR2P0S9,0*23')

    for count, line in enumerate(lines):
      self.queue.put(line)
      self.assertEqual(self.queue.qsize(), count+1)

    messages = []
    while not self.queue.empty():
      messages.append(self.queue.get())

    self.assertEqual(len(messages), len(lines))
    self.assertEqual(
        [message['line_nums'][0] for message in messages],
        [1, 2, 3, 4, 5, 6, 7])
    for message in messages:
      self.assertEqual(len(message['line_nums']), 1)
      self.assertEqual(len(message['lines']), 1)
      self.assertEqual(len(message['matches']), 1)

    # Spot check each message.
    self.assertEqual(messages[0]['decoded']['id'], 1)
    self.assertEqual(messages[0]['decoded']['true_heading'], 36)
    self.assertEqual(messages[0]['matches'][0]['chan'], 'A')

    self.assertEqual(messages[1]['decoded']['id'], 4)
    self.assertEqual(messages[1]['decoded']['fix_type'], 7)
    self.assertEqual(messages[1]['matches'][0]['vdm_type'], 'VDO')

    self.assertEqual(messages[2]['decoded']['id'], 7)
    self.assertEqual(messages[2]['decoded']['acks'][0], (345070303, 1))
    self.assertEqual(messages[2]['matches'][0]['talker'], 'SA')

    self.assertEqual(messages[3]['decoded']['id'], 9)
    self.assertEqual(messages[3]['decoded']['alt'], 2324)
    self.assertEqual(messages[3]['matches'][0]['sen_num'], 1)

    self.assertEqual(messages[4]['decoded']['id'], 10)
    self.assertEqual(messages[4]['decoded']['dest_mmsi'], 235089435)
    self.assertEqual(messages[4]['matches'][0]['sen_tot'], 1)

    self.assertEqual(messages[5]['decoded']['id'], 11)
    self.assertEqual(messages[5]['decoded']['year'], 2012)
    self.assertIsNone(messages[5]['matches'][0]['seq_id'])

    self.assertEqual(messages[6]['decoded']['id'], 18)
    self.assertEqual(messages[6]['decoded']['band_flag'], 1)
    self.assertEqual(messages[6]['matches'][0]['checksum'], '23')

  def testTwoLineMessage(self):
    lines = (
        # pylint: disable=line-too-long
        '!ABVDM,2,1,2,A,55NJPwP00001L@K?77@DhhU>0@5HU>222222220O18@374B<08CCm2EPH0kk,0*6D',
        '!ABVDM,2,2,2,A,UQCU8888880,2*3F')
    self.assertEqual(len(lines), 2)

    self.queue.put(lines[0])
    self.assertEqual(self.queue.qsize(), 0)
    self.queue.put(lines[1])
    self.assertEqual(self.queue.qsize(), 1)
    expected = {
        'line_nums': [1, 2],
        'lines': [line.rstrip() for line in lines],
        'decoded': {
            'ais_version': 0,
            'callsign': 'WDF3114',
            'destination': 'MOTIVA CONVENT      ',
            'dim_a': 9,
            'dim_b': 16,
            'dim_c': 3,
            'dim_d': 7,
            'draught': 3.299999952316284,
            'dte': 0,
            'eta_day': 4,
            'eta_hour': 12,
            'eta_minute': 0,
            'eta_month': 1,
            'fix_type': 1,
            'id': 5,
            'imo_num': 0,
            'md5': '403a171048302b7f1515f09131238db4',
            'mmsi': 367436030,
            'name': 'ELLIS DAVIS         ',
            'repeat_indicator': 0,
            'spare': 0,
            'type_and_cargo': 31},
        'matches': [
            {
                'body': ('55NJPwP00001L@K?77@DhhU>0@5HU>222222220O18@374B<08C'
                         'Cm2EPH0kk'),
                'chan': 'A',
                'checksum': '6D',
                'fill_bits': 0,
                'sen_num': 1,
                'sen_tot': 2,
                'seq_id': 2,
                'talker': 'AB',
                'vdm_type': 'VDM',
                # pylint: disable=line-too-long
                'vdm': '!ABVDM,2,1,2,A,55NJPwP00001L@K?77@DhhU>0@5HU>222222220O18@374B<08CCm2EPH0kk,0*6D'},
            {
                'body': 'UQCU8888880',
                'chan': 'A',
                'checksum': '3F',
                'fill_bits': 2,
                'sen_num': 2,
                'sen_tot': 2,
                'seq_id': 2,
                'talker': 'AB',
                'vdm_type': 'VDM',
                'vdm': '!ABVDM,2,2,2,A,UQCU8888880,2*3F'}]}
    message = self.queue.get()
    self.assertEqual(message, expected)
    self.assertEqual(self.queue.qsize(), 0)

  def testThreeLineMessage(self):
    lines = (
        # pylint: disable=line-too-long
        '!AIVDM,3,1,4,A,81mg=5@0EP:4R40807P>0<D1>MNt00000f>FNVfnw7>6>FNU=?B5PD5HDPD8,0*26',
        '!AIVDM,3,2,4,A,1Dd2J09jL08JArJH5P=E<D9@<5P<9>0`bMl42Q0d2Pc2T59CPCE@@?C54PD?,0*60',
        '!AIVDM,3,3,4,A,d0@d0IqhH:Pah:U54PD?75D85Bf00,0*03')
    self.assertEqual(len(lines), 3)

    self.queue.put(lines[0])
    self.assertEqual(self.queue.qsize(), 0)

    self.queue.put(lines[1])
    self.assertEqual(self.queue.qsize(), 0)

    self.queue.put(lines[2])
    self.assertEqual(self.queue.qsize(), 1)

    expected = {
        'line_nums': [1, 2, 3],
        'lines': [line.rstrip() for line in lines],
        'decoded': {
            'md5': 'f463fcabb7540e412af5d7238f955b0e',
            'dac': 1,
            'day': 1,
            'duration_minutes': 1,
            'fi': 22,
            'hour': 0,
            'id': 8,
            'link_id': 10,
            'minute': 1,
            'mmsi': 123456789,
            'month': 1,
            'notice_type': 9,
            'notice_type_str': 'Caution Area: Marine event',
            'repeat_indicator': 0,
            'spare': 0,
            'sub_areas': [
                {
                    'precision': 4,
                    'radius': 0,
                    'sub_area_type': 0,
                    'sub_area_type_str': 'point',
                    'x': -69.8,
                    'y': 42.849983333333334},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': '12345678901234'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': 'MORE TEXT THAT'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': ' SPANS ACROSS@'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': ' MULTIPLE LIN@'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': 'ES.  THE TEXT '},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': 'IS SUPPOSED TO'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': ' BE CONCATENAT'},
                {
                    'sub_area_type': 5,
                    'sub_area_type_str': 'text',
                    'text': 'ED TOGETHER.@@'}]},
        'matches': [
            {
                'talker': 'AI',
                'seq_id': 4,
                'vdm_type': 'VDM',
                'sen_tot': 3,
                'sen_num': 1,
                'body': ('81mg=5@0EP:4R40807P>0<D1>MNt00000f>FNVfnw7>6>FNU=?B'
                         '5PD5HDPD8'),
                'checksum': '26',
                'chan': 'A',
                'fill_bits': 0,
                # pylint: disable=line-too-long
                'vdm': '!AIVDM,3,1,4,A,81mg=5@0EP:4R40807P>0<D1>MNt00000f>FNVfnw7>6>FNU=?B5PD5HDPD8,0*26'},
            {
                'talker': 'AI',
                'seq_id': 4,
                'vdm_type': 'VDM',
                'sen_tot': 3,
                'sen_num': 2,
                'body': ('1Dd2J09jL08JArJH5P=E<D9@<5P<9>0`bMl42Q0d2Pc2T59CPCE@@'
                         '?C54PD?'),
                'checksum': '60',
                'chan': 'A',
                'fill_bits': 0,
                # pylint: disable=line-too-long
                'vdm': '!AIVDM,3,2,4,A,1Dd2J09jL08JArJH5P=E<D9@<5P<9>0`bMl42Q0d2Pc2T59CPCE@@?C54PD?,0*60',},
            {
                'talker': 'AI',
                'seq_id': 4,
                'vdm_type': 'VDM',
                'sen_tot': 3,
                'sen_num': 3,
                'body': 'd0@d0IqhH:Pah:U54PD?75D85Bf00',
                'checksum': '03',
                'chan': 'A',
                'fill_bits': 0,
                'vdm': '!AIVDM,3,3,4,A,d0@d0IqhH:Pah:U54PD?75D85Bf00,0*03'}]
    }
    message = self.queue.get()

    self.assertEqual(message, expected)
    self.assertEqual(self.queue.qsize(), 0)

  def testInterspersedMessages(self):
    # The grand test of mixing messages between each other.  The
    # queue should correctly use all lines here to decode 4 messages.
    lines = (
        # pylint: disable=line-too-long
        '!AIVDM,3,1,4,A,81mg=5@0EP:4R40807P>0<D1>MNt00000f>FNVfnw7>6>FNU=?B5PD5HDPD8,0*26',
        '!SAVDM,2,1,7,B,54QBqQ403dR4dP`j220`tPr1N098uLr2222222168pqB16Ne0<PPC52CClQH,0*6E',
        '!AIVDM,3,2,4,A,1Dd2J09jL08JArJH5P=E<D9@<5P<9>0`bMl42Q0d2Pc2T59CPCE@@?C54PD?,0*60',
        '!SAVDM,1,1,1,A,BEN:gg00bekP?aVR9C9UCwUUoP00,0*05',
        '!SAVDM,2,1,6,A,55NVS2000001L@??W3DU8tr0D4LhF22222222200000006hd07SClR1@A80j,0*57',
        '!SAVDM,2,2,6,A,E6H3Pp88880,2*10',
        '!AIVDM,3,3,4,A,d0@d0IqhH:Pah:U54PD?75D85Bf00,0*03',
        '!SAVDM,2,2,7,B,88888888880,2*3A')
    self.assertEqual(len(lines), 8)

    expected_queue_size = [0, 0, 0, 1, 1, 2, 3, 4]
    for count, line in enumerate(lines):
      self.queue.put(line)
      self.assertEqual(self.queue.qsize(), expected_queue_size[count])

    messages = []
    while not self.queue.empty():
      messages.append(self.queue.get())

    self.assertEqual(len(messages), 4)

    self.assertEqual(
        [message['line_nums'] for message in messages],
        [[4], [5, 6], [1, 3, 7], [2, 8]])

    # Spot check each message.
    self.assertEqual(messages[0]['decoded']['id'], 18)
    self.assertAlmostEqual(messages[0]['decoded']['x'], -122.767435)
    self.assertEqual(messages[0]['matches'][0]['seq_id'], 1)

    self.assertEqual(messages[1]['decoded']['id'], 5)
    self.assertEqual(messages[1]['decoded']['name'], 'IRON EAGLE          ')
    self.assertEqual(messages[1]['matches'][0]['seq_id'], 6)

    self.assertEqual(messages[2]['decoded']['id'], 8)
    self.assertEqual(
        messages[2]['decoded']['sub_areas'][8]['text'], 'ED TOGETHER.@@')
    self.assertEqual(messages[2]['matches'][0]['seq_id'], 4)

    self.assertEqual(messages[3]['decoded']['id'], 5)
    self.assertEqual(messages[3]['decoded']['callsign'], 'KHJL   ')
    self.assertEqual(messages[3]['matches'][0]['seq_id'], 7)

  def testUnhandledSingleLineVdmMessageType(self):
    # AIS 6:669:11 not handled.
    line = '!AIVDM,1,1,,B,6B?n;be:cbapalgc;i6?Ow4,2*4A'
    self.queue.put(line)
    self.assertEqual(self.queue.qsize(), 0)


if __name__ == '__main__':
  unittest.main()