schwehr/libais

View on GitHub
ais/stats.py

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env python

import argparse
import datetime
import collections
import logging
import pprint
import sys

from ais import nmea_queue

logger = logging.getLogger('libais')


class TrackRange(object):

  def __init__(self):
    self.min = None
    self.max = None

  def AddValues(self, *values):
    print ('AddValues', values)
    values = [v for v in values if v is not None]
    print ('AV3 ', values, self.min, self.max)
    if not len(values):
      raise ValueError('Must specify at least 1 value.')
    if self.min is None:
      self.min = min(values)
      self.max = max(values)
      return
    self.min = min(self.min, *values)
    self.max = max(self.max, *values)


class Stats(object):

  def __init__(self):
    self.counts = collections.Counter()
    self.queue = nmea_queue.NmeaQueue()
    self.time_range = TrackRange()
    self.time_delta_range = TrackRange()

  def AddFile(self, iterable, filename=None):
    self.counts['files'] += 1

    for line in iterable:
      self.AddLine(line)

  def AddLine(self, line):
    print(line.rstrip())
    self.counts['lines'] += 1
    self.queue.put(line)
    msg = self.queue.GetOrNone()
    if not msg:
      return

    # logging.info('stats found msg: %s', msg)
    # print ()
    # pprint.pprint(msg)
    self.counts[msg['line_type']] += 1
    if 'decoded' in msg:
      decoded = msg['decoded']
      if 'id' in decoded:
        self.counts['msg_VDM_%s' % decoded['id']] += 1
      if 'msg' in decoded:
        self.counts['msg_%s' % decoded['msg']] += 1

    if 'times' in msg:
      times = [t for t in msg['times'] if t is not None]
      if times:
        if self.time_range.min is None:
          self.time_range.AddValues(*times)
          # self.time_delta_range.AddValues(msg['times'])
        else:
          # print (self.time_range.min, self.time_range.max)
          time_delta = max(times) - self.time_range.max
          self.time_delta_range.AddValues(time_delta)
          self.time_range.AddValues(*times)


  def PrintSummary(self):
    pprint.pprint(self.counts)

    logger.info('time_range: [%s to %s]',
                 self.time_range.min,
                 self.time_range.max)

    logger.info('%s', datetime.datetime.utcfromtimestamp(self.time_range.min))
    logger.info('%s', datetime.datetime.utcfromtimestamp(self.time_range.max))

    logger.info('time_delta_range: [%s to %s]',
                 self.time_delta_range.min,
                 self.time_delta_range.max)



def main():
  logger.setLevel(logging.INFO)
  logger.info('in main')

  parser = argparse.ArgumentParser()
  parser.add_argument('filenames', type=str, nargs='+', help='NMEA files')
  args = parser.parse_args()
  logger.info('args: %s', args)

  stats = Stats()
  for filename in args.filenames:
      stats.AddFile(open(filename), filename)

  stats.PrintSummary()