schwehr/libais

View on GitHub
test/nmea_queue_test.py

Summary

Maintainability
B
5 hrs
Test Coverage
"""Tests for ais.nmea_queue."""

import contextlib
import unittest

import pytest
import six
from six.moves import StringIO

import ais
from ais import nmea
from ais import nmea_queue

BARE_NMEA = """
# pylint: disable=line-too-long
$GPZDA,203003.00,12,07,2009,00,00,*47
!AIVDM,1,1,,B,23?up2001gGRju>Ap:;R2APP08:c,0*0E
!BSVDM,1,1,,A,15Mj23`PB`o=Of>KjvnJg8PT0L2R,0*7E
!SAVDM,1,1,,B,35Mj2p001qo@5tVKLBWmIDJT01:@,0*33
!AIVDM,1,1,,A,B5NWV1P0<vSE=I3QdK4bGwoUoP06,0*4F
!SAVDM,1,1,,A,403Owi1utn1W0qMtr2AKStg020S:,0*4B
!SAVDM,2,1,4,A,55Mub7P00001L@;SO7TI8DDltqB222222222220O0000067<0620@jhQDTVG,0*43
!SAVDM,2,2,4,A,30H88888880,2*49
"""

TAG_BLOCK = r"""
# pylint: disable=line-too-long
\n:440661,s:r3669963,c:1428537660*0F\$GPZDA,000253,09,04,2015,+00,00*6C
\g:1-2-4372,s:rORBCOMM109,c:1426032000,T:2015-03-11 00.00.00*32\!AIVDM,2,1,2,B,576u>F02>hOUI8AGR20tt<j104p4l62222222216H14@@Hoe0JPEDp1TQH88,0*16
\s:rORBCOMM999u,c:1426032000,T:2015-03-11 00.00.00*36\!AIVDM,1,1,,,;5Qu0v1utmGssvvkA`DRgm100000,0*46
\g:2-2-4372,s:rORBCOMM109,c:1426032000,T:2015-03-11 00.00.00*31\!AIVDM,2,2,2,B,88888888880,2*25
\g:1-2-27300,n:636994,s:b003669710,c:1428621738*5F\!SAVDM,2,1,2,B,55Mw@A7J1adAL@?;7WPl58F0U<h4pB222222220t1PN5553fN4g?`4iSp5Rc,0*26
\g:2-2-27300,n:636995*15\!SAVDM,2,2,2,B,iP`88888880,2*5E
\n:636996,s:b003669710,c:1428621738*19\!SAVDM,1,1,,B,35Mv4LPP@Go?FFtEbDDWQmlT20k@,0*04
\g:4-4-993623,n:577969*22\$ARVSI,r003669930,,233948.825272,1831,-97,0*24
\n:80677,s:b003669952,c:1428884269*2A\!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17
"""

USCG = r"""
# pylint: disable=line-too-long
!SAVDM,1,1,,A,15N4OMPP01I<cGrA1v>Id?vF060l,0*22,b003669978,1429287189
!SAVDM,2,1,4,B,54h@7?02BAF=`L4wN21<eTH4hj2222222222220U4HG6553U06T0C3H0Q@@j,0*5D,d-86,S389,t161310.00,T10.377780,D07MN-MI-LAKBS1,1429287190
!SAVDM,2,2,4,B,88888888880,2*39,d-86,S389,t161310.00,T10.377780,D07MN-MI-LAKBS1,1429287190
!AIVDM,1,1,,B,3592u`iP03GWEflBRosm0Ov@0000,0*70,d-107,S0297,t161407.00,T07.92201452,r11CSDO1,1429287248
!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17,rMySat,1429287258
"""

MIXED = r"""
!SAVDM,1,1,,A,15N4OMPP01I<cGrA1v>Id?vF060l,0*22,b003669978,1429287189
!SAVDM,1,1,,A,403Owi1utn1W0qMtr2AKStg020S:,0*4B
\n:80677,s:b003669952,c:1428884269*2A\!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17
random text
"""


class NmeaQueueTest(unittest.TestCase):

  def testTextData(self):
    # These lines should all pass straight through.
    src_lines = (
        '',
        'a',
        '123',
        # Not quite NMEA strings.
        '$GPZDA',
        '!AIVDM',
        '*FF',)
    queue = nmea_queue.NmeaQueue()
    for line in src_lines:
      queue.put(line)

    self.assertEqual(queue.qsize(), len(src_lines))
    for i in range(1, queue.qsize() + 1):
      msg = queue.get()
      self.assertEqual(msg['line_nums'], [i])
      self.assertEqual(msg['line_type'], nmea.TEXT)
      self.assertEqual(msg['lines'], list(src_lines[i-1:i]))

    self.assertEqual(msg,
                     {'line_nums': [6], 'line_type': 'TEXT', 'lines': ['*FF']})

  def testBareSingleLineData(self):
    queue = nmea_queue.NmeaQueue()
    lines = [line for line in BARE_NMEA.split('\n') if ',' in line]
    for line in lines:
      queue.put(line)
    self.assertEqual(queue.qsize(), 7)
    msgs = []
    while not queue.empty():
      msgs.append(queue.get())

    self.assertEqual(msgs[0],
                     {'line_nums': [1],
                      'line_type': 'BARE',
                      'lines': ['$GPZDA,203003.00,12,07,2009,00,00,*47']})
    self.assertEqual(
        msgs[1],
        {'decoded': {
            'cog': 52.099998474121094,
            'id': 2,
            'md5': '99c8c2804fde0481e6143051930b66c4',
            'mmsi': 218069000,
            'nav_status': 0,
            'position_accuracy': 0,
            'raim': False,
            'repeat_indicator': 0,
            'rot': 0.0,
            'rot_over_range': False,
            'slot_number': 683,
            'slot_timeout': 2,
            'sog': 11.100000381469727,
            'spare': 0,
            'special_manoeuvre': 0,
            'sync_state': 0,
            'timestamp': 16,
            'true_heading': 48,
            'x': -118.227775,
            'y': 31.24317},
         'line_nums': [2],
         'line_type': 'BARE',
         'lines': ['!AIVDM,1,1,,B,23?up2001gGRju>Ap:;R2APP08:c,0*0E'],
         'matches': [{
             'body': '23?up2001gGRju>Ap:;R2APP08:c',
             'chan': 'B',
             'checksum': '0E',
             'fill_bits': 0,
             'sen_num': 1,
             'sen_tot': 1,
             'seq_id': None,
             'talker': 'AI',
             'vdm_type': 'VDM',
             'vdm': '!AIVDM,1,1,,B,23?up2001gGRju>Ap:;R2APP08:c,0*0E'}]}
    )

  def testTagBlockLines(self):
    queue = nmea_queue.NmeaQueue()
    lines = [line for line in TAG_BLOCK.split('\n') if ',' in line]
    for line in lines:
      queue.put(line)
    self.assertEqual(queue.qsize(), 6)
    msgs = []
    while not queue.empty():
      msgs.append(queue.get())

    # self.assertNotIn('decoded', msgs[0])
    # TODO(schwehr): Check the ZDA message decoding.
    for msg_num in range(1, 5):
      self.assertIn('decoded', msgs[msg_num])
    ids = [msg['decoded']['id'] for msg in msgs[1:] if 'decoded' in msg]
    self.assertEqual(ids, [11, 5, 5, 3, 27])

    self.assertEqual(
        msgs[-1],
        {'decoded': {
            'cog': 131,
            'gnss': True,
            'id': 27,
            'md5': '50898a3435865cf76f1b502b2821672b',
            'mmsi': 577305000,
            'nav_status': 5,
            'position_accuracy': 1,
            'raim': False,
            'repeat_indicator': 0,
            'sog': 0,
            'spare': 0,
            'x': -90.20666666666666,
            'y': 29.145},
         'line_nums': [9],
         'line_type': 'TAGB',
         'lines': [
             '\\n:80677,s:b003669952,c:1428884269*2A'
             '\\!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17'],
         'matches': [{
             'dest': None,
             'group': None,
             'group_id': None,
             'line_num': 80677,
             'metadata': 'n:80677,s:b003669952,c:1428884269*2A',
             'payload': '!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17',
             'quality': None,
             'rcvr': 'b003669952',
             'rel_time': None,
             'sentence_num': None,
             'sentence_tot': None,
             'tag_checksum': '2A',
             'text': None,
             'text_date': None,
             'time': 1428884269}],
         'times': [1428884269]})

  def testUscgLines(self):
    queue = nmea_queue.NmeaQueue()
    lines = [line for line in USCG.split('\n') if ',' in line]
    for line in lines:
      queue.put(line)

    self.assertEqual(queue.qsize(), 4)
    msgs = []
    while not queue.empty():
      msgs.append(queue.get())

    for msg in msgs:
      self.assertIn('decoded', msg)
    ids = [msg['decoded']['id'] for msg in msgs]
    self.assertEqual(ids, [1, 5, 3, 27])

    self.assertEqual(
        msgs[3],
        {
            'decoded': {
                'cog': 131,
                'gnss': True,
                'id': 27,
                'md5': '50898a3435865cf76f1b502b2821672b',
                'mmsi': 577305000,
                'nav_status': 5,
                'position_accuracy': 1,
                'raim': False,
                'repeat_indicator': 0,
                'sog': 0,
                'spare': 0,
                'x': -90.20666666666666,
                'y': 29.145},
            'line_nums': [5],
            'line_type': 'USCG',
            'lines': ['!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17,rMySat,1429287258'],
            'matches': [{
                'body': 'K8VSqb9LdU28WP8<',
                'chan': 'B',
                'checksum': '17',
                'counter': None,
                'fill_bits': 0,
                'hour': None,
                'minute': None,
                'payload': '!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17',
                'receiver_time': None,
                'rssi': None,
                'second': None,
                'sen_num': 1,
                'sen_tot': 1,
                'seq_id': None,
                'signal_strength': None,
                'slot': None,
                'station': 'rMySat',
                'station_type': 'r',
                'talker': 'SA',
                'time': 1429287258,
                'time_of_arrival': None,
                'uscg_metadata': ',rMySat,1429287258',
                'vdm': '!SAVDM,1,1,,B,K8VSqb9LdU28WP8<,0*17',
                'vdm_type': 'VDM'}]})

  def testMixedLines(self):
    queue = nmea_queue.NmeaQueue()
    lines = [line for line in MIXED.split('\n') if line.strip()]
    for line in lines:
      queue.put(line)

    self.assertEqual(queue.qsize(), 4)
    msgs = []
    while not queue.empty():
      msgs.append(queue.get())

    for msg in msgs[:-1]:
      self.assertIn('decoded', msg)
    ids = [msg['decoded']['id'] for msg in msgs[:-1]]
    self.assertEqual(ids, [1, 4, 27])

    line_types = [msg['line_type'] for msg in msgs]
    self.assertEqual(
        line_types,
        [nmea.USCG, nmea.BARE, nmea.TAGB, nmea.TEXT])


@pytest.mark.parametrize("nmea", [
    six.text_type(BARE_NMEA.strip()),
    six.text_type(TAG_BLOCK.strip()),
    six.text_type(USCG.strip()),
    six.text_type(MIXED.strip())
])
def test_NmeaFile_against_queue(nmea):

    queue = nmea_queue.NmeaQueue()
    for line in nmea.splitlines():
        queue.put(line)

    expected = []
    msg = queue.GetOrNone()
    while msg:
        expected.append(msg)
        msg = queue.GetOrNone()

    with contextlib.closing(StringIO(nmea)) as f, ais.open(f) as src:
        actual = list(src)

    for e, a in zip(expected, actual):
        assert e == a


if __name__ == '__main__':
  unittest.main()