christoph2/pyxcp

View on GitHub
pyxcp/transport/can.py

Summary

Maintainability
A
0 mins
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import abc
import functools
import operator
from bisect import bisect_left
from collections import OrderedDict
from time import perf_counter
from time import time

from pyxcp.config import Configuration
from pyxcp.transport.base import BaseTransport


CAN_EXTENDED_ID = 0x80000000
MAX_11_BIT_IDENTIFIER = (1 << 11) - 1
MAX_29_BIT_IDENTIFIER = (1 << 29) - 1
MAX_DLC_CLASSIC = 8
CAN_FD_DLCS = (12, 16, 20, 24, 32, 48, 64)  # Discrete CAN-FD DLCs in case DLC > 8.


class IdentifierOutOfRangeError(Exception):
    """Signals an identifier greater then :obj:`MAX_11_BIT_IDENTIFIER` or :obj:`MAX_29_BIT_IDENTIFIER`."""

    pass


def isExtendedIdentifier(identifier: int) -> bool:
    """Check for extendend CAN identifier.

    Parameters
    ----------
    identifier: int

    Returns
    -------
    bool
    """
    return (identifier & CAN_EXTENDED_ID) == CAN_EXTENDED_ID


def stripIdentifier(identifier: int) -> int:
    """Get raw CAN identifier (remove :obj:`CAN_EXTENDED_ID` bit if present).

    Parameters
    ----------
    identifier: int

    Returns
    -------
    int
    """
    return identifier & (~0xE0000000)


def samplePointToTsegs(tqs: int, samplePoint: float) -> tuple:
    """Calculate TSEG1 and TSEG2 from time-quantas and sample-point.

    Parameters
    ----------
    tqs: int
        Number of time-quantas
    samplePoint: float or int
        Sample-point as a percentage value.

    Returns
    -------
    tuple (TSEG1, TSEG2)
    """
    factor = samplePoint / 100.0
    tseg1 = int(tqs * factor)
    tseg2 = tqs - tseg1
    return (tseg1, tseg2)


def padFrame(frame: bytes, padding_value: int, padding_len: int = 0) -> bytes:
    """Pad frame to next discrete DLC value.

    References:
    -----------
    ISO/DIS 15765 - 4; 8.2 Data length Code (DLC)
    AUTOSAR CP Release 4.3.0, Specification of CAN Transport Layer; 7.3.8 N-PDU padding
    AUTOSAR CP Release 4.3.0, Specification of CAN Driver; [SWS_CAN_00502], [ECUC_Can_00485]
    AUTOSAR CP Release 4.3.0, Requirements on CAN; [SRS_Can_01073], [SRS_Can_01086], [SRS_Can_01160]
    """
    frame_len = max(len(frame), padding_len)
    if frame_len <= MAX_DLC_CLASSIC:
        actual_len = MAX_DLC_CLASSIC
    else:
        actual_len = CAN_FD_DLCS[bisect_left(CAN_FD_DLCS, frame_len)]
    # append fill bytes up to MAX_DLC resp. next discrete FD DLC.
    if len(frame) < actual_len:
        frame += bytes([padding_value]) * (actual_len - len(frame))
    return frame


class Identifier:
    """Convenience class for XCP formatted CAN identifiers.

    Parameters:
    -----------
    raw_id: int
        Bit 32 set (i.e. 0x80000000) signals an extended (29-bit) identifier.

    Raises
    ------
    :class:`IdentifierOutOfRangeError`
    """

    def __init__(self, raw_id: int):
        self._raw_id = raw_id
        self._id = stripIdentifier(raw_id)
        self._is_extended = isExtendedIdentifier(raw_id)
        if self._is_extended:
            if self._id > MAX_29_BIT_IDENTIFIER:
                raise IdentifierOutOfRangeError("29-bit identifier '{}' is out of range".format(self._id))
        else:
            if self._id > MAX_11_BIT_IDENTIFIER:
                raise IdentifierOutOfRangeError("11-bit identifier '{}' is out of range".format(self._id))

    @property
    def id(self) -> int:
        """
        Returns
        -------
        int
            Identifier as seen on bus.
        """
        return self._id

    @property
    def raw_id(self) -> int:
        """
        Returns
        -------
        int
            Raw XCP formatted identifier.
        """
        return self._raw_id

    @property
    def is_extended(self) -> bool:
        """
        Returns
        -------
        bool
            - True - 29-bit identifier.
            - False - 11-bit identifier.
        """
        return self._is_extended

    @staticmethod
    def make_identifier(identifier: int, extended: bool) -> int:
        """Factory method.

        Parameters
        ----------
        identifier: int
            Identifier as seen on bus.

        extended: bool
            bool
                - True - 29-bit identifier.
                - False - 11-bit identifier.
        Returns
        -------
        :class:`Identifier`

        Raises
        ------
        :class:`IdentifierOutOfRangeError`
        """
        return Identifier(identifier if not extended else (identifier | CAN_EXTENDED_ID))

    def __eq__(self, other):
        return (self.id == other.id) and (self.is_extended == other.is_extended)

    def __str__(self):
        return "Identifier(id = 0x{:08x}, is_extended = {})".format(self.id, self.is_extended)

    def __repr__(self):
        return "Identifier(0x{:08x})".format(self.raw_id)


class Frame:
    """"""

    def __init__(self, id_: Identifier, dlc: int, data: bytes, timestamp: int):
        self.id = id_
        self.dlc = dlc
        self.data = data
        self.timestamp = timestamp

    def __repr__(self):
        return "Frame(id = 0x{:08x}, dlc = {}, data = {}, timestamp = {})".format(self.id, self.dlc, self.data, self.timestamp)

    __str__ = __repr__


class CanInterfaceBase(metaclass=abc.ABCMeta):
    """
    Abstract CAN interface handler that can be implemented for any actual CAN device driver
    """

    PARAMETER_MAP = {}

    @abc.abstractmethod
    def init(self, parent, receive_callback):
        """
        Must implement any required action for initing the can interface

        Parameters
        ----------
        parent: :class:`Can`
            Refers to owner.
        receive_callback: callable
            Receive callback function to register with the following argument: payload: bytes
        """

    @abc.abstractmethod
    def transmit(self, payload: bytes):
        """
        Must transmit the given payload on the master can id.

        Parameters
        ----------
        payload: bytes
            payload to transmit
        """

    @abc.abstractmethod
    def close(self):
        """Must implement any required action for disconnecting from the can interface"""

    @abc.abstractmethod
    def connect(self):
        """Open connection to can interface"""

    @abc.abstractmethod
    def read(self):
        """Read incoming data"""

    @abc.abstractmethod
    def getTimestampResolution(self):
        """Get timestamp resolution in nano seconds."""

    def loadConfig(self, config):
        """Load configuration data."""
        self.config = Configuration(self.PARAMETER_MAP or {}, config or {})


class EmptyHeader:
    """There is no header for XCP on CAN"""

    def pack(self, *args, **kwargs):
        return b""


# can.detect_available_configs()


class Can(BaseTransport):
    """"""

    PARAMETER_MAP = {
        #                           Type            Req'd   Default
        "CAN_DRIVER": (str, True, None),
        "CHANNEL": (str, False, ""),
        "MAX_DLC_REQUIRED": (bool, False, False),
        "MAX_CAN_FD_DLC": (int, False, 64),
        "PADDING_VALUE": (int, False, 0),
        "CAN_USE_DEFAULT_LISTENER": (bool, False, True),
        # defaults to True, in this case the default listener thread is used.
        # If the canInterface implements a listener service, this parameter
        # can be set to False, and the default listener thread won't be started.
        "CAN_ID_MASTER": (int, True, None),
        "CAN_ID_SLAVE": (int, True, None),
        "CAN_ID_BROADCAST": (int, False, None),
        "BITRATE": (int, False, 250000),
        "RECEIVE_OWN_MESSAGES": (bool, False, False),
    }

    PARAMETER_TO_KW_ARG_MAP = {
        "RECEIVE_OWN_MESSAGES": "receive_own_messages",
        "CHANNEL": "channel",
        "BITRATE": "bitrate",
    }

    MAX_DATAGRAM_SIZE = 7
    HEADER = EmptyHeader()
    HEADER_SIZE = 0

    def __init__(self, config=None, policy=None):
        """init for CAN transport
        :param config: configuration
        """
        super().__init__(config, policy)
        self.loadConfig(config)
        drivers = registered_drivers()
        interfaceName = self.config.get("CAN_DRIVER")
        if interfaceName not in drivers:
            raise ValueError("{} is an invalid driver name -- choose from {}".format(interfaceName, [x for x in drivers.keys()]))
        canInterfaceClass = drivers[interfaceName]
        self.canInterface = canInterfaceClass()
        self.useDefaultListener = self.config.get("CAN_USE_DEFAULT_LISTENER")
        self.can_id_master = Identifier(self.config.get("CAN_ID_MASTER"))
        self.can_id_slave = Identifier(self.config.get("CAN_ID_SLAVE"))
        self.canInterface.loadConfig(config)
        self.canInterface.init(self, self.dataReceived)
        #
        # Regarding CAN-FD s. AUTOSAR CP Release 4.3.0, Requirements on CAN; [SRS_Can_01160] Padding of bytes due to discrete CAN FD DLC]:
        #   "... If a PDU does not exactly match these configurable sizes the unused bytes shall be padded."
        #
        self.max_dlc_required = self.config.get("MAX_DLC_REQUIRED") or self.canInterface.is_fd
        self.padding_value = self.config.get("PADDING_VALUE")
        self.padding_len = self.config.get("MAX_CAN_FD_DLC") if self.canInterface.is_fd else MAX_DLC_CLASSIC

    def dataReceived(self, payload: bytes, recv_timestamp: float = None):
        self.processResponse(
            payload,
            len(payload),
            counter=(self.counterReceived + 1) & 0xffff,
            recv_timestamp=recv_timestamp,
        )

    def listen(self):
        while True:
            if self.closeEvent.isSet():
                return
            frame = self.canInterface.read()
            if frame:
                self.dataReceived(frame.data, frame.timestamp)

    def connect(self):
        if self.useDefaultListener:
            self.startListener()
        self.canInterface.connect()
        self.status = 1  # connected

    def send(self, frame):
        # XCP on CAN trailer: if required, FILL bytes must be appended
        if self.max_dlc_required:
            frame = padFrame(frame, self.padding_value, self.padding_len)
        # send the request
        self.pre_send_timestamp = time()
        self.canInterface.transmit(payload=frame)
        self.post_send_timestamp = time()

    def closeConnection(self):
        if hasattr(self, "canInterface"):
            self.canInterface.close()

    def close(self):
        self.finishListener()
        self.closeConnection()


def setDLC(length: int):
    """Return DLC value according to CAN-FD.

    :param length: Length value to be mapped to a valid CAN-FD DLC.
                   ( 0 <= length <= 64)
    """

    if length < 0:
        raise ValueError("Non-negative length value required.")
    elif length <= MAX_DLC_CLASSIC:
        return length
    elif length <= 64:
        for dlc in CAN_FD_DLCS:
            if length <= dlc:
                return dlc
    else:
        raise ValueError("DLC could be at most 64.")


def calculateFilter(ids: list):
    """
    :param ids: An iterable (usually list or tuple) containing CAN identifiers.

    :return: Calculated filter and mask.
    :rtype: tuple (int, int)
    """
    any_extended_ids = any(isExtendedIdentifier(i) for i in ids)
    raw_ids = [stripIdentifier(i) for i in ids]
    cfilter = functools.reduce(operator.and_, raw_ids)
    cmask = functools.reduce(operator.or_, raw_ids) ^ cfilter
    cmask ^= 0x1FFFFFFF if any_extended_ids else 0x7FF
    return (cfilter, cmask)


def try_to_install_system_supplied_drivers():
    """Register available pyxcp CAN drivers."""
    import importlib
    import pkgutil
    import pyxcp.transport.candriver as cdr

    for _, modname, _ in pkgutil.walk_packages(cdr.__path__, "{}.".format(cdr.__name__)):
        try:
            importlib.import_module(modname)
        except Exception:
            pass


def registered_drivers():
    """
    Returns
    -------
    dict (name, class)
        Dictionary containing CAN driver names and classes of all
        available drivers (pyxcp supplied and user-defined).
    """
    sub_classes = CanInterfaceBase.__subclasses__()
    return OrderedDict(zip(([c.__name__ for c in sub_classes]), sub_classes))