schwehr/libais

View on GitHub
utils/benthos_validate.py

Summary

Maintainability
F
1 wk
Test Coverage
#!/usr/bin/env python

"""
Validator to ensure libais can produce Benthos's expected input
"""

import json
import os
import unittest
import sys

# In order to ease test maintenance as outputs and inputs change the data structure below contains a test for every
# field, a value that will pass the test, and a value that will fail the test.  All information is pulled from GPSD
# (http://catb.org/gpsd/AIVDM.html) and assumes two things:
#   1. Fieldnames are unique across all messages
#   2. Fields appearing in different message types contain the same information.  For instance, the field 'sog'
#      appears in multiple types but is always speed over ground in the same units in every message.
#
# Keys are fieldnames and values are dictionaries containing three keys:
#   1. test - a function that verifies a value is acceptable for this field
#   2. good - a value that will pass the test function
#   3. bad  - a value that will not pass the test function
#
# Some fields are flags with values 0 or 1 that may be switched to Python's bool in the future but in the meantime
# we want to be positive these values are int's.  Since bool subclasses int expressions like `0 in (1, 0)` and
# `True in (0, 1)` both evaluate as `True` which could yield unexpected results.  Any test that expects an int
# also checks to make sure that int is NOT a bool, even if the field is a range and will never be bool.  Better to be
# safe here than be forced to debug some potentially ambiguous bugs elsewhere.
MSG_VALIDATION_LOOKUP = {

    # Comments about where fields are pulled from just reference where the description was found in the GPSD
    # specification http://catb.org/gpsd/AIVDM.html
    # They're not critical but if there is a discrepancy between what is expected and how the test is written
    # it may help us figure out why.

    # Pulled from types 1, 2, and 3 GPSD spec
    'type': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(1, 28),
        'good': 5,
        'bad': -1
    },
    'repeat': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 2 ** 2,
        'good': 4,
        'bad': -1
    },
    'mmsi': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 30,
        'good': '12345678902345678',
        'bad': 1234
    },
    'status': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 16),
        'good': 2,
        'bad': -10
    },
    'turn': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(-127, 129),
        'good': 125,
        'bad': -1000
    },
    'sog': {
        'test': lambda x: isinstance(x, float) and 0 <= x <= 102.2 or x == 1022,
        'good': 1022,
        'bad': 103
    },
    'accuracy': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),
        'good': 0,
        'bad': 2,
    },
    'lat': {
        'test': lambda x: isinstance(x, float) and -90 <= x <= 90 or x == 91,
        'good': 91,
        'bad': -100
    },
    'lon': {
        'test': lambda x: isinstance(x, float) and -180 <= x <= 180 or x == 181,  # TODO: Should -180 be a valid value?  Maybe `-180 < x` instead?
        'good': 181,
        'bad': -180.1
    },
    'course': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 359 or x == 3600,  # TODO: Should -90 be a valid value?  Maybe `-90 < x` instead?
        'good': 3600,
        'bad': 360
    },
    'heading': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 359 or x == 511,
        'good': 511,
        'bad': -102
    },
    'second': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 64),
        'good': 63,
        'bad': 64
    },
    'maneuver': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (1, 2),
        'good': 2,
        'bad': 3
    },
    'raim': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: bool is more Pythonic if the field is actually boolean and not state
        'good': 0,
        'bad': -2
    },
    'radio': {
        'test': lambda x: x is None,  # TODO: What will this value be?
        'good': None,
        'bad': False
    },

    # Pulled from type 5 GPSD spec
    'ais_version': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1, 2, 3),  # TODO: Should always be 0 right now.  The other vals are reserved.
        'good': 2,
        'bad': True
    },
    'imo': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 30,
        'good': 'value',
        'bad': True,
    },
    'callsign': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 42,
        'good': 'good',
        'bad': json
    },
    'shipname': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 120,
        'good': 'good value',
        'bad': False
    },
    'shiptype': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 100),
        'good': 87,
        'bad': str
    },
    'to_bow': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 2 ** 9,
        'good': 1,
        'bad': -1
    },
    'to_stern': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 2 ** 9,
        'good': 0,
        'bad': tuple
    },
    'to_port': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 2 ** 6,
        'good': 1,
        'bad': -34
    },
    'to_starboard': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 <= x <= 2 ** 6,
        'good': 0,
        'bad': False,
    },
    'epfd': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 9),
        'good': 8,
        'bad': 10
    },
    'month': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 13),
        'good': 12,
        'bad': 'asdf'
    },
    'day': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 32),
        'good': 1,
        'bad': ':'
    },
    'hour': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 25),
        'good': 23,
        'bad': ','
    },
    'minute': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in range(0, 61),
        'good': 54,
        'bad': '*'
    },
    'draught': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and 0 < x <= 2 ** 8,
        'good': 1,
        'bad': 2 ** 8 + 1
    },
    'destination': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 120,
        'good': 'v',
        'bad': 2 * 2
    },
    'dte': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: Switch to a more Pythonic bool if this is actually boolean and not a status
        'good': 0,
        'bad': 8
    },

    # Pulled from type 18 GPSD spec
    'reserved': {
        'test': lambda x: x is None,
        'good': None,
        'bad': 1
    },
    'regional': {
        'test': lambda x: x is None,
        'good': None,
        'bad': -1
    },
    'speed': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x <= 2 ** 10,
        'good': 0,
        'bad': str
    },
    'cs': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # Not bool - state
        'good': 0,
        'bad': 7
    },
    'display': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # Not bool - state
        'good': 1,
        'bad': 'j'
    },
    'dsc': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: Switch to a more Pythonic bool?
        'good': 1,
        'bad': -45
    },
    'band': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: Switch to a more Pythonic bool?
        'good': 0,
        'bad': 4
    },
    'msg22': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: Switch to a more Pythonic bool?
        'good': 0,
        'bad': -2
    },
    'assigned': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # TODO: Switch to a more Pythonic bool?
        'good': 1,
        'bad': -33
    },

    # All type 19 are covered between 1, 2, 3, and 18

    # Pulled from type 24 GPSD spec
    'partno': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),
        'good': 0,
        'bad': -1
    },
    'vendorid': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 18,
        'good': 'this is a gooooooooood value',
        'bad': int
    },
    'model': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 4,
        'good': 'something',
        'bad': 333
    },
    'serial': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 20,
        'good': 'pawoeiras',
        'bad': -1
    },
    'mothership_mmsi': {
        'test': lambda x: isinstance(x, str) and len(x) <= 2 ** 30,
        'good': 'done ... finally ...',
        'bad': -200
    },

    # Pulled from type 27 GPSD spec
    'gnss': {
        'test': lambda x: isinstance(x, int) and not isinstance(x, bool) and x in (0, 1),  # Not bool - state
        'good': 0,
        'bad': 3
    }
}


# Keys are message types and values are lists of fields that type expects
MSG_TYPE_FIELDS = {
    1: [
        'type', 'repeat', 'mmsi', 'status', 'turn', 'sog', 'accuracy', 'lat', 'lon', 'course', 'heading', 'second',
        'maneuver', 'raim', 'radio'
    ],
    2: [
        'type', 'repeat', 'mmsi', 'status', 'turn', 'sog', 'accuracy', 'lat', 'lon', 'course', 'heading', 'second',
        'maneuver', 'raim', 'radio'
    ],
    3: [
        'type', 'repeat', 'mmsi', 'status', 'turn', 'sog', 'accuracy', 'lat', 'lon', 'course', 'heading', 'second',
        'maneuver', 'raim', 'radio'
    ],
    5: [
        'type', 'repeat', 'mmsi', 'ais_version', 'imo', 'callsign', 'shipname', 'shiptype', 'to_bow', 'to_stern',
        'to_port', 'to_starboard', 'epfd', 'month', 'day', 'hour', 'minute', 'draught', 'destination', 'dte'
    ],
    18: [
        'type', 'repeat', 'mmsi', 'reserved', 'speed', 'accuracy', 'lon', 'lat', 'course', 'heading', 'second',
        'regional', 'cs', 'display', 'dsc', 'band', 'msg22', 'assigned', 'raim', 'radio', 'dte', 'assigned'
    ],
    19: [
        'type', 'repeat', 'mmsi', 'reserved', 'speed', 'accuracy', 'lon', 'lat', 'course', 'heading', 'second',
        'regional', 'shipname', 'shiptype', 'to_bow', 'to_stern', 'to_port', 'to_starboard', 'epfd', 'raim', 'dte',
        'assigned'
    ],
    24: [
        'type', 'repeat', 'mmsi', 'partno', 'shipname', 'shiptype', 'vendorid', 'model', 'serial', 'callsign',
        'to_bow', 'to_stern', 'to_port', 'to_starboard', 'mothership_mmsi'
    ],
    27: [
        'type', 'repeat', 'mmsi', 'accuracy', 'raim', 'status', 'lon', 'lat', 'speed', 'course', 'gnss'
    ]
}


def validate_messages(messages, err=None):

    """
    Determine whether or not an input message conforms to the Benthos spec.

    Example:

        >>> import json
        >>> with open('Messages.json') as infile:
        ...     with open('Logfile') as logfile:
        ...         print(validate_messages(
        ...             (json.loads(msg) for msg in infile), err=logfile))

    Parameters
    ----------
    msg : iter
        An iterable producing one AIS message as a dictionary every iteration.
    err : file, optional
        File-like object where errors are logged and failed messages are written.
        A message with multiple invalid fields will have multiple errors in this
        file.

    Returns
    -------
    bool
        True if every message passes
    """

    return_val = True

    for msg in messages:

        # Make sure the message specifies its type and that the type is one we can validate
        if 'type' not in msg or msg['type'] not in MSG_TYPE_FIELDS:
            if err is not None:
                err.write("No 'type' key in msg or type is invalid or not testable: %s" % msg)
            return_val = False

        # Normal field validation
        else:
            msg_type = msg['type']
            for field in MSG_TYPE_FIELDS[msg_type]:
                if not MSG_VALIDATION_LOOKUP[field]['test'](msg[field]):
                    if err is not None:
                        sys.stdout.write("Field `%s' failed: %s" % (field, json.dumps(msg) + os.linesep))
                    return_val = False

    return return_val


class TestValidateMessages(unittest.TestCase):

    def test_all_types(self):
        for msg_type, msg_fields in MSG_TYPE_FIELDS.items():

            # Check type field individually since the other tests force it to be correct
            assert not validate_messages([{'field': 'val'}])
            assert not validate_messages([{'type': MSG_VALIDATION_LOOKUP['type']['bad']}])

            # Construct a good message
            good_message = {f: MSG_VALIDATION_LOOKUP[f]['good'] for f in msg_fields}
            good_message['type'] = msg_type

            assert validate_messages([good_message]), \
                "Supposed 'good' msg failed validation: %s" % good_message

            # Creating a bad message from all of the bad values is an insufficient test because the validator
            # will start checking fields and as soon as it gets to a bad one it will flag the message as invalid.
            # Every field is checked in every message and every bad field is logged but we can't validate individual
            # fields without taking a good message and then changing one field at a time to a bad field.
            for field in msg_fields:
                if field != 'type':
                    bad_message = good_message.copy()
                    bad_message[field] = MSG_VALIDATION_LOOKUP[field]['bad']
                    assert not validate_messages([bad_message]), \
                        "Field `%s' should have caused message to fail: %s" % (field, bad_message)


if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1].lower() == 'test':
        del sys.argv[1]
        sys.exit(unittest.main())