EventGhost/EventGhost

View on GitHub
eg/Classes/SerialThread.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-
#
# This file is part of EventGhost.
# Copyright © 2005-2020 EventGhost Project <http://www.eventghost.net/>
#
# EventGhost is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 2 of the License, or (at your option)
# any later version.
#
# EventGhost is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along
# with EventGhost. If not, see <http://www.gnu.org/licenses/>.

from threading import Condition, currentThread, Lock, Thread
from time import clock, sleep

# Local imports
import eg
from eg.WinApi.Dynamic import (
    # ctypes stuff
    byref,
    create_string_buffer,
    FormatError,
    pointer,
    sizeof,

    # constants
    CLRDTR,
    CLRRTS,
    DTR_CONTROL_DISABLE,
    ERROR_IO_PENDING,
    EV_BREAK,
    EV_CTS,
    EV_DSR,
    EV_ERR,
    EV_RING,
    EV_RLSD,
    EV_RXCHAR,
    EV_RXFLAG,
    EV_TXEMPTY,
    EVENPARITY,
    FILE_ATTRIBUTE_NORMAL,
    FILE_FLAG_OVERLAPPED,
    GENERIC_READ,
    GENERIC_WRITE,
    INFINITE,
    INVALID_HANDLE_VALUE,
    MARKPARITY,
    NOPARITY,
    ODDPARITY,
    ONE5STOPBITS,
    ONESTOPBIT,
    OPEN_EXISTING,
    QS_ALLINPUT,
    SETDTR,
    SETRTS,
    SPACEPARITY,
    TWOSTOPBITS,
    WAIT_OBJECT_0,
    WAIT_TIMEOUT,

    # types
    COMMCONFIG,
    COMMTIMEOUTS,
    COMSTAT,
    DCB,
    DWORD,
    HANDLE,
    OVERLAPPED,

    # functions
    ClearCommError,
    CloseHandle,
    CreateEvent,
    CreateFile,
    EscapeCommFunction,
    GetCommState,
    GetCommTimeouts,
    GetDefaultCommConfig,
    GetLastError,
    GetOverlappedResult,
    MsgWaitForMultipleObjects,
    ReadFile,
    ResetEvent,
    SetCommMask,
    SetCommState,
    SetCommTimeouts,
    SetEvent,
    WaitCommEvent,
    WaitForSingleObject,
    WriteFile,
)

# parity string to dcb value dict
PARITY_S2V_DICT = {
    'N': NOPARITY,
    'O': ODDPARITY,
    'E': EVENPARITY,
    'M': MARKPARITY,
    'S': SPACEPARITY,
}
# parity dcb value to string dict
PARITY_V2S_DICT = {
    NOPARITY: 'N',
    ODDPARITY: 'O',
    EVENPARITY: 'E',
    MARKPARITY: 'M',
    SPACEPARITY: 'S',
}
# stop bits string to dcb value dict
STOPBITS_S2V_DICT = {'1': ONESTOPBIT, '1.5': ONE5STOPBITS, '2': TWOSTOPBITS}
# stop bits dcb value to string dict
STOPBITS_V2S_DICT = {ONESTOPBIT: '1', ONE5STOPBITS: '1.5', TWOSTOPBITS: '2'}

class SerialError(eg.Exception):
    """
    Base class for SerialThread related exceptions.
    """
    def __init__(self, msg=None):
        if msg is None:
            errno = GetLastError()
            strerror = FormatError(errno)
        else:
            errno = 0
            strerror = msg
        eg.Exception.__init__(self, errno, strerror)


class SerialThread(Thread):
    """
    Eased handling of serial port communication.
    """
    # These functions are bound to the class, so instances or subclasses
    # can replace them. This is needed for code that uses the FTDI.DLL
    # driver for example.
    _ReadFile = ReadFile
    _WriteFile = WriteFile
    _ClearCommError = ClearCommError
    _CreateFile = CreateFile
    _CloseHandle = CloseHandle
    # Cache list for GetAllPorts
    _serialPortList = None
    SerialError = SerialError

    def __init__(self, hFile=0):
        Thread.__init__(self, target=self.ReceiveThreadProc)
        self.hFile = int(hFile)
        self.osWriter = OVERLAPPED()
        self.osWriter.hEvent = CreateEvent(None, 0, 0, None)
        self.osReader = OVERLAPPED()
        self.osReader.hEvent = CreateEvent(None, 1, 0, None)
        self.dwRead = DWORD()
        self.comstat = COMSTAT()
        self.dcb = DCB()
        self.dcb.DCBlength = sizeof(DCB)
        self.oldCommTimeouts = COMMTIMEOUTS()

        self.readEventCallback = None
        self.readEventLock = Lock()
        self.readEventLock.acquire()
        self.readCondition = Condition()
        self.buffer = ""
        self.keepAlive = True
        self.stopEvent = CreateEvent(None, 1, 0, None)
        self.callbackThread = Thread(
            target=self.CallbackThreadProc,
            name="SerialReceiveThreadProc"
        )

    def __enter__(self):
        self.SuspendReadEvents()
        return self

    def __exit__(self, dummyExcType, dummyExcValue, dummyTraceback):
        self.ResumeReadEvents()
        return False

    @eg.LogItWithReturn
    def CallbackThreadProc(self):
        while self.keepAlive:
            self.readCondition.acquire()
            while len(self.buffer):
                if self.readEventLock.acquire(0):
                    if self.readEventCallback:
                        try:
                            self.readEventCallback(self)
                        except Exception:
                            eg.PrintTraceback()
                    self.readEventLock.release()
                    sleep(0.001)
                else:
                    break
            self.readCondition.wait()
            self.readCondition.release()

    def Close(self):
        """
        Closes  the serial port and stops all event processing.
        """
        self.keepAlive = False
        SetEvent(self.stopEvent)
        if currentThread() != self:
            self.join(1.0)
        if self.hFile:
            #Restore original timeout values:
            SetCommTimeouts(self.hFile, self.oldCommTimeouts)
            #Close COM-Port:
            if not self._CloseHandle(self.hFile):
                self.hFile = None
                raise SerialError()

    def Flush(self):
        self.readCondition.acquire()
        self.buffer = ""
        self.readCondition.release()

    @classmethod
    def GetAllPorts(cls):
        """
        Returns a list with all available serial ports.
        """
        serialPortList = cls._serialPortList
        if serialPortList is not None:
            return serialPortList
        serialPortList = []
        commconfig = COMMCONFIG()
        commconfig.dwSize = sizeof(COMMCONFIG)
        lpCC = pointer(commconfig)
        dwSize = DWORD(0)
        lpdwSize = byref(dwSize)
        for i in range(0, 255):
            name = 'COM%d' % (i + 1)
            res = GetDefaultCommConfig(name, lpCC, lpdwSize)
            if res == 1 or (res == 0 and GetLastError() == 122):
                serialPortList.append(i)
        cls._serialPortList = serialPortList
        return serialPortList

    def GetBaudrate(self):
        """
        Returns the currently used baud rate as an integer.
        """
        return self.dcb.BaudRate

    def GetMode(self):
        """
        Returns the currently used serial port mode setting as string.

        See :meth:`Open` for a complete description of the mode string.
        """
        dcb = self.dcb
        return (
            str(dcb.ByteSize) +
            PARITY_V2S_DICT[dcb.Parity] +
            STOPBITS_V2S_DICT[dcb.StopBits]
        )

    def HandleReceive(self, data):
        # read all data currently available
        data += self._ReadAllAvailable()
        # append the data to the buffer and notify waiting threads
        self.readCondition.acquire()
        self.buffer += data
        self.readCondition.notifyAll()
        self.readCondition.release()

    def Open(self, port=0, baudrate=9600, mode="8N1"):
        """
        Opens the serial port. After the port is opened, you have to call
        :meth:`Start` to start the actual event processing of this class.

        :Arguments:
            port
                The port to open as an integer starting at one (0 = COM1)
            baudrate
                The baudrate to use as an integer. Defaults to 9600.
            mode
                The serial port mode settings to use. This should be a
                string created by the joining of the following parts:

                #. byte size
                    '5', '6', '7' or '8'
                #. parity mode
                    * 'N' for no parity,
                    * 'E' for even parity,
                    * 'O' for odd parity,
                    * 'M' for mark parity,
                    * 'S' for space parity
                #. stop bits
                    * '1' for one stop bit
                    * '1.5' for one and a half stop bit
                    * '2' for two stop bits

                Example values: '8N1' (default), '8E1', '7N1.5'
        """
        #the "//./COMx" format is required for devices >= 9
        #not all versions of windows seem to support this properly
        #so that the first few ports are used with the DOS device name
        if port < 9:
            deviceStr = 'COM%d' % (port + 1)
        else:
            deviceStr = '\\\\.\\COM%d' % (port + 1)
        try:
            self.hFile = self._CreateFile(
                deviceStr,
                GENERIC_READ | GENERIC_WRITE,
                0,  # exclusive access
                None,  # default security attributes
                OPEN_EXISTING,
                FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
                0
            )
        except Exception:
            self.hFile = None    # cause __del__ is called anyway
            raise eg.Exceptions.SerialOpenFailed()
        if self.hFile == INVALID_HANDLE_VALUE:
            self.hFile = None
            raise eg.Exceptions.SerialOpenFailed()
        dcb = self.dcb
        GetCommState(self.hFile, byref(dcb))
        dcb.fBinary = True
        dcb.fParity = False
        dcb.fOutxCtsFlow = False
        dcb.fOutxDsrFlow = False
        dcb.fDtrControl = DTR_CONTROL_DISABLE
        dcb.fDsrSensitivity = False
        dcb.fTXContinueOnXoff = True
        dcb.fOutX = False
        dcb.fInX = False
        dcb.fErrorChar = False
        dcb.fNull = False
        dcb.fRtsControl = False
        dcb.fAbortOnError = False
        dcb.BaudRate = baudrate
        dcb.ByteSize = int(mode[0])
        dcb.Parity = PARITY_S2V_DICT[mode[1].upper()]
        dcb.StopBits = STOPBITS_S2V_DICT[mode[2:]]
        SetCommState(self.hFile, byref(dcb))
        GetCommTimeouts(self.hFile, byref(self.oldCommTimeouts))
        commTimeouts = COMMTIMEOUTS(0, 0, 0, 100, 100)
        SetCommTimeouts(self.hFile, byref(commTimeouts))

    def Read(self, numBytes=1, timeout=0.0):
        """
        Reads data from the serial port.
        """
        self.readCondition.acquire()
        startTime = clock()
        endTime = startTime + timeout
        while len(self.buffer) < numBytes:
            waitTime = endTime - clock()
            if waitTime <= 0:
                numBytes = len(self.buffer)
                break
            self.readCondition.wait(waitTime)
        data = self.buffer[:numBytes]
        self.buffer = self.buffer[numBytes:]
        self.readCondition.release()
        return data

    @eg.LogItWithReturn
    def ReceiveThreadProc(self):
        hFile = self.hFile
        osReader = self.osReader
        lpBuf = create_string_buffer(1)
        dwRead = DWORD()
        pHandles = (HANDLE * 2)(osReader.hEvent, self.stopEvent)

        waitingOnRead = False
        while self.keepAlive:
            #print "ReceiveThreadProc"
            # if no read is outstanding, then issue another one
            if not waitingOnRead:
                #print "ResetEvent"
                ResetEvent(osReader.hEvent)
                returnValue = self._ReadFile(
                    hFile,
                    lpBuf,
                    1,  # we want to get notified as soon as a byte is available
                    byref(dwRead),
                    byref(osReader)
                )
                if not returnValue:
                    err = GetLastError()
                    if err != 0 and err != ERROR_IO_PENDING:
                        raise SerialError()
                    waitingOnRead = True
                else:
                    # read completed immediately
                    if dwRead.value:
                        self.HandleReceive(lpBuf.raw)
                        continue

            ret = MsgWaitForMultipleObjects(
                2, pHandles, 0, 100000, QS_ALLINPUT
            )
            if ret == WAIT_OBJECT_0:
                returnValue = GetOverlappedResult(
                    hFile,
                    byref(osReader),
                    byref(dwRead),
                    0
                )
                #print "GetOverlappedResult", returnValue, dwRead.value
                waitingOnRead = False
                if returnValue and dwRead.value:
                    self.HandleReceive(lpBuf.raw)
            elif ret == WAIT_OBJECT_0 + 1:
                #print "WAIT_OBJECT_1"
                # stop event signaled
                self.readCondition.acquire()
                self.readCondition.notifyAll()
                self.readCondition.release()
                break
            elif ret == WAIT_TIMEOUT:
                #print "WAIT_TIMEOUT"
                pass
            else:
                raise SerialError("Unknown message in ReceiveThreadProc")

    def ReportStatusEvent(self, statusEvent):
        print statusEvent

    def ResumeReadEvents(self):
        """
        Resumes the receive event processing that was disabled before by
        :meth:`SuspendReadEvents`
        """
        self.readEventLock.release()
        self.readCondition.acquire()
        self.readCondition.notifyAll()
        self.readCondition.release()

    def SetBaudrate(self, baudrate):
        """
        Sets the baud rate to an integer value.
        """
        self.dcb.BaudRate = baudrate
        SetCommState(self.hFile, byref(self.dcb))

    def SetDtr(self, flag=True):
        """
        Sets DTR state.
        """
        EscapeCommFunction(self.hFile, SETDTR if flag else CLRDTR)

    def SetMode(self, mode):
        """
        Sets the serial port mode setting from a string.

        See :meth:`Open` for a complete description of the mode string.
        """
        dcb = self.dcb
        dcb.ByteSize = int(mode[0])
        dcb.Parity = PARITY_S2V_DICT[mode[1].upper()]
        dcb.StopBits = STOPBITS_S2V_DICT[mode[2:]]
        SetCommState(self.hFile, byref(dcb))

    def SetReadEventCallback(self, callback):
        """
        Sets a callback function for receive event processing. Should be
        called before :meth:`Start`. The callback should have the form::

            def HandleReceive(serialThread):
                ...

        where serialThread will be the instance of this SerialThread class, so
        you can easily call :meth:`Read` and :meth:`Write` on this object.
        """
        self.readEventCallback = callback
        self.readEventLock.release()

    def SetRts(self, flag=True):
        """
        Sets RTS state.
        """
        EscapeCommFunction(self.hFile, SETRTS if flag else CLRRTS)

    def Start(self):
        """
        Starts the event processing of this class.
        """
        self.callbackThread.start()
        self.start()

    def StatusCheckLoop(self):
        hComm = self.hFile
        waitingOnStatusHandle = False
        dwCommEvent = DWORD()
        dwOvRes = DWORD()
        commMask = (
            EV_BREAK | EV_CTS | EV_DSR | EV_ERR | EV_RING |
            EV_RLSD | EV_RXCHAR | EV_RXFLAG | EV_TXEMPTY
        )

        if not SetCommMask(self.hFile, commMask):
            raise SerialError("error setting communications mask")

        osStatus = OVERLAPPED(0)
        osStatus.hEvent = CreateEvent(None, 1, 0, None)
        if not osStatus.hEvent:
            raise SerialError("error creating event")

        while True:
            # Issue a status event check if one hasn't been issued already.
            if not waitingOnStatusHandle:
                if WaitCommEvent(hComm, byref(dwCommEvent), byref(osStatus)):
                    # WaitCommEvent returned immediately.
                    # Deal with status event as appropriate.
                    self.ReportStatusEvent(dwCommEvent.value)
                else:
                    if GetLastError() == ERROR_IO_PENDING:
                        waitingOnStatusHandle = True
                    else:
                        raise SerialError("error in WaitCommEvent")

            # Check on overlapped operation
            if waitingOnStatusHandle:
                # Wait a little while for an event to occur.
                res = WaitForSingleObject(osStatus.hEvent, 1000)
                if res == WAIT_OBJECT_0:
                    if GetOverlappedResult(
                        hComm, byref(osStatus), byref(dwOvRes), 0
                    ):
                        # Status event is stored in the event flag
                        # specified in the original WaitCommEvent call.
                        # Deal with the status event as appropriate.
                        self.ReportStatusEvent(dwCommEvent.value)
                    else:
                        # An error occurred in the overlapped operation;
                        # call GetLastError to find out what it was
                        # and abort if it is fatal.
                        raise SerialError()
                    # Set waitingOnStatusHandle flag to indicate that a new
                    # WaitCommEvent is to be issued.
                    waitingOnStatusHandle = True
                elif res == WAIT_TIMEOUT:
                    print "timeout"
                else:
                    # This indicates a problem with the OVERLAPPED structure's
                    # event handle.
                    CloseHandle(osStatus.hEvent)
                    raise SerialError("error in the WaitForSingleObject")

        CloseHandle(osStatus.hEvent)

    def Stop(self):
        """
        Stops the event processing of this class.
        """
        self.keepAlive = False
        SetEvent(self.stopEvent)
        if currentThread() != self:
            self.join(1.0)

    def SuspendReadEvents(self):
        """
        Temporarily disables the receive event processing of this class. Call
        :meth:`ResumeReadEvents` to resume processing.
        """
        self.readEventLock.acquire()

    def Write(self, data):
        """
        Writes a string to the port.
        """
        dwWritten = DWORD(0)
        returnValue = self._WriteFile(
            self.hFile,
            data,
            len(data),
            byref(dwWritten),
            byref(self.osWriter)
        )
        if returnValue != 0:
            return
        err = GetLastError()
        if err != 0 and err != ERROR_IO_PENDING:
            raise SerialError()
        if not GetOverlappedResult(
            self.hFile,
            byref(self.osWriter),
            byref(dwWritten),
            1
        ):
            raise SerialError()
        if dwWritten.value != len(data):
            raise self.SerialError("Write timeout")

    def _ReadAllAvailable(self):
        """
        Get all bytes currently in the drivers receive buffer.
        Only used internally.
        """
        sleep(0.001)
        errors = DWORD()
        self._ClearCommError(self.hFile, byref(errors), byref(self.comstat))
        numBytes = self.comstat.cbInQue
        if numBytes == 0:
            return ''
        #ResetEvent(self.osReader.hEvent)
        lpBuffer = create_string_buffer(numBytes)
        numberOfBytesRead = DWORD()
        if not self._ReadFile(
            self.hFile,
            lpBuffer,
            numBytes,
            byref(numberOfBytesRead),
            byref(self.osReader)
        ):
            WaitForSingleObject(self.osReader.hEvent, INFINITE)
        return lpBuffer.raw[:numberOfBytesRead.value]