fdev/bc125csv

View on GitHub
bc125csv/scanner.py

Summary

Maintainability
C
7 hrs
Test Coverage
from __future__ import print_function
from __future__ import division

import re
import sys

try:
    import pyudev
except ImportError: # pragma: no cover
    sys.exit("Failed to import pyudev (https://pyudev.readthedocs.org/):,"
        " install using:\n  pip install pyudev")

try:
    import serial
except ImportError: # pragma: no cover
    sys.exit("Failed to import pyserial (http://pyserial.sourceforge.net/),"
        " install using:\n  pip install pyserial")


# CTCSS (Continuous Tone-Coded Squelch System) tones
CTCSS_TONES = [
    "67.0","69.3","71.9","74.4","77.0","79.7","82.5","85.4","88.5","91.5",
    "94.8","97.4","100.0","103.5","107.2","110.9","114.8","118.8","123.0",
    "127.3","131.8","136.5","141.3","146.2","151.4","156.7","159.8","162.2",
    "165.5","167.9","171.3","173.8","177.3","179.9","183.5","186.2","189.9",
    "192.8","196.6","199.5","203.5","206.5","210.7","218.1","225.7","229.1",
    "233.6","241.8","250.3","254.1",
]

# DCS (Digital-Coded Squelch) codes
DCS_CODES = [
    "023","025","026","031","032","036","043","047","051","053",
    "054","065","071","072","073","074","114","115","116","122",
    "125","131","132","134","143","145","152","155","156","162",
    "165","172","174","205","212","223","225","226","243","244",
    "245","246","251","252","255","261","263","265","266","271",
    "274","306","311","315","325","331","332","343","346","351",
    "356","364","365","371","411","412","413","423","431","432",
    "445","446","452","454","455","462","464","465","466","503",
    "506","516","523","526","532","546","565","606","612","624",
    "627","631","632","654","662","664","703","712","723","731",
    "732","734","743","754",
]

SUPPORTED_MODELS = ("BC125AT", "UBC125XLT", "UBC126AT")


class Channel(object):
    """
    Representation of a channel in the scanner.
    """

    def __init__(self, index, name, frequency, modulation="AUTO", tqcode=0, 
        delay=2, lockout=False, priority=False):
        self.index = index
        self.name = name
        self.frequency = frequency
        self.modulation = modulation
        self.tqcode = tqcode
        self.delay = delay
        self.lockout = lockout
        self.priority = priority

    @property
    def tq(self):
        """Readable CTCSS tone and DCS code."""
        if self.tqcode == 0:
            return "none"
        if self.tqcode == 127:
            return "search"
        if self.tqcode == 240:
            return "no tone"
        if 64 <= self.tqcode <= 113:
            return CTCSS_TONES[self.tqcode - 64] + " Hz"
        if 128 <= self.tqcode <= 231:
            return "DCS " + DCS_CODES[self.tqcode - 128]

    @property
    def freqcode(self):
        """Frequency code in CIN format (nnnnmmmm)."""
        return self.frequency.replace(".", "").zfill(8)

    def __repr__(self):
        return "CH%03d: %s %s" % (self.index, self.frequency, self.modulation)


class ScannerException(Exception):
    pass


class Scanner(serial.Serial, object):
    """
    Wrap around Serial to provide compatible readline and helper methods.
    """
    
    RE_CIN = re.compile(r"""
        # CIN,[INDEX],[NAME],[FRQ],[MOD],[CTCSS/DCS],[DLY],[LOUT],[PRI]
        ^ # No characters before
        CIN,
        (?P<index>\d{1,3}),
        (?P<name>[^,]{0,16}),
        (?P<freq>\d{5,8}), # 4 decimals, so at least 5 digits
        (?P<modulation>AUTO|AM|FM|NFM),
        (?P<tq>\d{1,3}),
        (?P<delay>-10|-5|0|1|2|3|4|5),
        (?P<lockout>0|1),
        (?P<priority>0|1) # no comma!
        $ # No characters after
        """, flags=re.VERBOSE)

    def __init__(self, port, baudrate=9600): # pragma: no cover
        super(Scanner, self).__init__(port=port, baudrate=baudrate)

    def writeread(self, command): # pragma: no cover
        self.write((command + "\r").encode())
        self.flush()
        return self.readlinecr()

    def send(self, command):
        result = self.writeread(command)
        if not re.match(r"(^ERR|,NG$)", result):
            return result

    def readlinecr(self): # pragma: no cover
        """
        The Serial class might be based on serial.FileLike, which allows
        one to override the eol character, and io.RawIOBase, which doesn't.
        To ensure this possibility, the readline method is overriden.
        """
        line = ""
        while True:
            c = self.read(1).decode()
            if c == "\r":
                return line
            line += c

    def enter_programming(self):
        result = self.send("PRG")
        if not result or result != "PRG,OK":
            raise ScannerException("Failed to enter programming mode.")

    def exit_programming(self):
        result = self.send("EPG")
        if not result or result != "EPG,OK":
            raise ScannerException("Failed to leave programming mode.")

    def get_model(self):
        """Get model name from scanner."""
        result = self.send("MDL")
        if not result or not result.startswith("MDL,"):
            raise ScannerException("Could not get model name.")
        return result[4:]

    def get_channel(self, index):
        """Read channel object from scanner."""
        result = self.send("CIN,%d" % index)

        # Error occurred
        if not result:
            raise ScannerException("Could not read channel %d." %  index)

        # Try to match result
        match = self.RE_CIN.match(result)
        if not match:
            raise ScannerException("Unexpected data for channel %d." %  index)
        data = match.groupdict()

        # Return on empty channel
        if data["freq"] == "00000000":
            return

        # Convert 1290000 to 129.0000
        frequency = "%s.%s" % (data["freq"][:-4].lstrip("0"), data["freq"][-4:])

        return Channel(**{
            "index":      int(data["index"]),
            "name":       data["name"].strip(),
            "frequency":  frequency,
            "modulation": data["modulation"],
            "tqcode":     int(data["tq"]),
            "delay":      int(data["delay"]),
            "lockout":    data["lockout"] == "1",
            "priority":   data["priority"] == "1",
        })

    def set_channel(self, channel):
        """Write channel object to scanner."""
        command = ",".join(map(str, [
            "CIN",
            channel.index,
            channel.name,
            channel.freqcode,
            channel.modulation,
            channel.tqcode,
            channel.delay,
            int(channel.lockout),
            int(channel.priority),
        ]))

        # Write to scanner
        result = self.send(command)
        if not result or result != "CIN,OK":
            raise ScannerException("Could not write to channel %d." % channel.index)

    def delete_channel(self, index):
        """Delete channel from scanner."""
        channel = self.get_channel(index)
        
        # Only delete if channel has data
        # Unnecessary deletes are slow
        if channel:
            result = self.send("DCH,%d" % index)
            if not result or result != "DCH,OK":
                raise ScannerException("Could not delete channel %d." % index)


class VirtualScanner(Scanner):
    """
    Virtual scanner to test without an actual scanner.
    """
    def __init__(self, *args, **kwargs):
        # Don"t create a Serial object
        pass

    def writeread(self, command):
        """Fake the handling of certain commands."""
        # Get model
        if command == "MDL":
            return "MDL,VIRTUAL"

        # Programming mode
        if command == "PRG":
            return "PRG,OK"

        # Exit programming mode
        if command == "EPG":
            return "EPG,OK"

        # Get channel
        if re.match(r"^CIN,([1-9]|1[0-9]|5[1-9])$", command):
            # Return data for channels 1-19 and 51-59
            index = int(command[4:])
            lockout = index == 55
            priority = index == 15
            return "CIN,{0},Channel {0},1{0:02d}0000,FM,0,2,{1:d},{2:d}" \
                .format(index, lockout, priority)
        elif re.match(r"^CIN,[0-9]+$", command):
            index = int(command[4:])
            tq = (0, 127, 240, 145)[index % 4]
            return "CIN,{0},,00000000,FM,{1},0,0,0".format(index, tq)

        # Set channel
        if command.startswith("CIN,"):
            return "CIN,OK"

        # Delete channel
        if command.startswith("DCH,"):
            return "DCH,OK"

        # Other commands give an error
        return "ERR"


class DeviceLookup(object): # pragma: no cover
    """
    Scan USB devices and look for a compatible scanner.
    """

    def __init__(self):
        self.context = pyudev.Context()

    def is_scanner(self, device):
        """Given USB device is a compatible scanner."""
        return device.get("ID_VENDOR_ID") == "1965" and \
            device.get("ID_MODEL") in SUPPORTED_MODELS

    def is_tty(self, device):
        """Given USB device is a serial tty."""
        return device.get("SUBSYSTEM") == "tty"

    def get_device(self):
        """Find compatible scanner and return usb device.

        If found a tty device will be returned, otherwise the
        usb device will be returned.
        """
        # Look for scanner tty
        for device in self.context.list_devices():
            if self.is_scanner(device) and self.is_tty(device):
                return device

        # No scanner with tty, look for scanner
        for device in self.context.list_devices():
            if self.is_scanner(device):
                return device