Syncleus/apex

View on GitHub
src/apex/kiss/kiss_tcp.py

Summary

Maintainability
A
0 mins
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""KISS Core Classes."""

# These imports are for python3 compatibility inside python2
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import socket

from apex.kiss import constants as kiss_constants
from .kiss import Kiss

__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
__email__ = 'jeffrey.freeman@syncleus.com'
__license__ = 'Apache License, Version 2.0'
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
__credits__ = []
__version__ = '0.0.2'


class KissTcp(Kiss):

    """KISS TCP Object Class."""

    logger = logging.getLogger(__name__)
    logger.setLevel(kiss_constants.LOG_LEVEL)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(kiss_constants.LOG_LEVEL)
    formatter = logging.Formatter(kiss_constants.LOG_FORMAT)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    logger.propagate = False

    def __init__(self,
                 strip_df_start=True,
                 host=None,
                 tcp_port=8000):
        super(KissTcp, self).__init__(strip_df_start)

        self.host = host
        self.tcp_port = tcp_port
        self.socket = None

        self.logger.info('Using interface_mode=TCP')

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.socket.close()

    def _read_interface(self):
        return self.socket.recv(kiss_constants.READ_BYTES)

    def _write_interface(self, data):
        self.socket.write(data)

    def connect(self, mode_init=None, **kwargs):
        """
        Initializes the KISS device and commits configuration.

        See http://en.wikipedia.org/wiki/KISS_(TNC)#Command_codes
        for configuration names.

        :param **kwargs: name/value pairs to use as initial config values.
        """
        self.logger.debug('kwargs=%s', kwargs)

        address = (self.host, self.tcp_port)
        self.socket = socket.create_connection(address)

    def close(self):
        super(KissTcp, self).close()

        if not self.socket:
            raise RuntimeError('Attempting to close before the class has been started.')

        self.socket.shutdown()
        self.socket.close()

    def shutdown(self):
        self.socket.shutdown()