Syncleus/apex

View on GitHub
src/apex/kiss/kiss.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""KISS Core Classes."""

# These imports are for python3 compatibility inside python2
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import threading
from abc import ABCMeta
from abc import abstractmethod
from six import with_metaclass

from apex.kiss import constants as kiss_constants

__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
__email__ = 'jeffrey.freeman@syncleus.com'
__license__ = 'Apache License, Version 2.0'
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
__credits__ = []


class Kiss(with_metaclass(ABCMeta, object)):

    """Abstract KISS Object Class."""

    logger = logging.getLogger(__name__)
    logger.setLevel(kiss_constants.LOG_LEVEL)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(kiss_constants.LOG_LEVEL)
    formatter = logging.Formatter(kiss_constants.LOG_FORMAT)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    logger.propagate = False

    frame_buffer = []

    def __init__(self, strip_df_start=True):
        self.strip_df_start = strip_df_start
        self.exit_kiss = False
        self.lock = threading.Lock()

    @staticmethod
    def __strip_df_start(frame):
        """
        Strips KISS DATA_FRAME start (0x00) and newline from frame.

        :param frame: APRS/AX.25 frame.
        :type frame: str
        :returns: APRS/AX.25 frame sans DATA_FRAME start (0x00).
        :rtype: str
        """
        while frame[0] is kiss_constants.DATA_FRAME:
            del frame[0]
        while chr(frame[0]).isspace():
            del frame[0]
        while chr(frame[-1]).isspace():
            del frame[-1]
        return frame

    @staticmethod
    def __escape_special_codes(raw_code_bytes):
        """
        Escape special codes, per KISS spec.

        "If the FEND or FESC codes appear in the data to be transferred, they
        need to be escaped. The FEND code is then sent as FESC, TFEND and the
        FESC is then sent as FESC, TFESC."
        - http://en.wikipedia.org/wiki/KISS_(TNC)#Description
        """
        encoded_bytes = []
        for raw_code_byte in raw_code_bytes:
            if raw_code_byte is kiss_constants.FESC:
                encoded_bytes += kiss_constants.FESC_TFESC
            elif raw_code_byte is kiss_constants.FEND:
                encoded_bytes += kiss_constants.FESC_TFEND
            else:
                encoded_bytes += [raw_code_byte]
        return encoded_bytes

    @staticmethod
    def __command_byte_combine(port, command_code):
        """
        Constructs the command byte for the tnc which includes the tnc port and command code.
        :param port: integer from 0 to 127 indicating the TNC port
        :type port: int
        :param command_code: A command code constant, a value from 0 to 127
        :type command_code: int
        :return: An integer combining the two values into a single byte
        """
        if port > 127 or port < 0:
            raise Exception('port out of range')
        elif command_code > 127 or command_code < 0:
            raise Exception('command_Code out of range')
        return (port << 4) & command_code

    @abstractmethod
    def _read_interface(self):
        pass

    @abstractmethod
    def _write_interface(self, data):
        pass

    def connect(self, mode_init=None, **kwargs):
        """
        Initializes the KISS device and commits configuration.

        This method is abstract and must be implemented by a concrete class.

        See http://en.wikipedia.org/wiki/KISS_(TNC)#Command_codes
        for configuration names.

        :param **kwargs: name/value pairs to use as initial config values.
        """
        pass

    def close(self):
        if self.exit_kiss is True:
            self._write_interface(kiss_constants.MODE_END)

    def _write_setting(self, name, value):
        """
        Writes KISS Command Codes to attached device.

        http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes

        :param name: KISS Command Code Name as a string.
        :param value: KISS Command Code Value to write.
        """
        self.logger.debug('Configuring %s = %s', name, repr(value))

        # Do the reasonable thing if a user passes an int
        if isinstance(value, int):
            value = chr(value)

        return self._write_interface(
            [kiss_constants.FEND] +
            [getattr(kiss_constants, name.upper())] +
            [Kiss.__escape_special_codes(value)] +
            [kiss_constants.FEND]
        )

    def __fill_buffer(self):
        """
        Reads any pending data in the interface and stores it in the frame_buffer
        """

        new_frames = []
        read_buffer = []
        read_data = self._read_interface()
        while read_data is not None and len(read_data):
            split_data = [[]]
            for read_byte in read_data:
                if read_byte is kiss_constants.FEND:
                    split_data.append([])
                else:
                    split_data[-1].append(read_byte)
            len_fend = len(split_data)

            # No FEND in frame
            if len_fend == 1:
                read_buffer += split_data[0]
            # Single FEND in frame
            elif len_fend == 2:
                # Closing FEND found
                if split_data[0]:
                    # Partial frame continued, otherwise drop
                    new_frames.append(read_buffer + split_data[0])
                    read_buffer = []
                # Opening FEND found
                else:
                    new_frames.append(read_buffer)
                    read_buffer = split_data[1]
            # At least one complete frame received
            elif len_fend >= 3:
                for i in range(0, len_fend - 1):
                    read_buffer_tmp = read_buffer + split_data[i]
                    if len(read_buffer_tmp) is not 0:
                        new_frames.append(read_buffer_tmp)
                        read_buffer = []
                if split_data[len_fend - 1]:
                    read_buffer = split_data[len_fend - 1]
            # Get anymore data that is waiting
            read_data = self._read_interface()

        for new_frame in new_frames:
            if len(new_frame) and new_frame[0] == 0:
                if self.strip_df_start:
                    new_frame = Kiss.__strip_df_start(new_frame)
                self.frame_buffer.append(new_frame)

    def read(self):
        with self.lock:
            if not len(self.frame_buffer):
                self.__fill_buffer()

            if len(self.frame_buffer):
                return_frame = self.frame_buffer[0]
                del self.frame_buffer[0]
                return return_frame
            else:
                return None

    def write(self, frame_bytes, port=0):
        """
        Writes frame to KISS interface.

        :param frame: Frame to write.
        """
        with self.lock:
            kiss_packet = [kiss_constants.FEND] + [Kiss.__command_byte_combine(port, kiss_constants.DATA_FRAME)] + \
                Kiss.__escape_special_codes(frame_bytes) + [kiss_constants.FEND]

            return self._write_interface(kiss_packet)