markusressel/barcode-server

View on GitHub
barcode_server/barcode.py

Summary

Maintainability
A
35 mins
Test Coverage
import asyncio
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict

import evdev
from evdev import *

from barcode_server.config import AppConfig
from barcode_server.keyevent_reader import KeyEventReader
from barcode_server.stats import SCAN_COUNT, DEVICES_COUNT, DEVICE_DETECTION_TIME

LOGGER = logging.getLogger(__name__)


class BarcodeEvent:

    def __init__(self, input_device: InputDevice, barcode: str, date: datetime = None):
        self.id = str(uuid.uuid4())
        self.date = date if date is not None else datetime.now()
        self.device = input_device
        self.input_device = self.device
        self.barcode = barcode


class BarcodeReader:
    """
    Reads barcodes from all USB barcode scanners in the system
    """

    def __init__(self, config: AppConfig):
        self.config = config
        self.devices = {}
        self.listeners = set()

        self._keyevent_reader = KeyEventReader()

        self._main_task = None
        self._device_tasks = {}

    async def start(self):
        """
        Start detecting and reading barcode scanner devices
        """
        self._main_task = asyncio.create_task(self._detect_and_read())

    async def stop(self):
        """
        Stop detecting and reading barcode scanner devices
        """
        if self._main_task is None:
            return

        for device_path, t in self._device_tasks.items():
            t.cancel()
        self._device_tasks.clear()
        self._main_task.cancel()
        self._main_task = None

    async def _detect_and_read(self):
        """
        Detect barcode scanner devices and start readers for them
        """
        while True:
            try:
                self.devices = self._find_devices(self.config.DEVICE_PATTERNS.value, self.config.DEVICE_PATHS.value)
                DEVICES_COUNT.set(len(self.devices))

                for path, d in self.devices.items():
                    if path in self._device_tasks:
                        continue
                    LOGGER.info(
                        f"Reading: {d.path}: Name: {d.name}, "
                        f"Vendor: {d.info.vendor:04x}, Product: {d.info.product:04x}")
                    task = asyncio.create_task(self._start_reader(d))
                    self._device_tasks[path] = task

                await asyncio.sleep(1)
            except Exception as e:
                logging.exception(e)
                await asyncio.sleep(10)

    async def _start_reader(self, input_device):
        """
        Start a reader for a specific device
        :param input_device: the input device
        """
        try:
            # become the sole recipient of all incoming input events
            input_device.grab()
            while True:
                barcode = await self._read_line(input_device)
                if barcode is not None and len(barcode) > 0:
                    event = BarcodeEvent(input_device, barcode)
                    asyncio.create_task(self._notify_listeners(event))
        except Exception as e:
            LOGGER.exception(e)
            self._device_tasks.pop(input_device.path)
        finally:
            try:
                # release device
                input_device.ungrab()
            except Exception as e:
                pass

    @staticmethod
    @DEVICE_DETECTION_TIME.time()
    def _find_devices(patterns: List, paths: List[str]) -> Dict[str, InputDevice]:
        """
        # Finds the input device with the name ".*Barcode Reader.*".
        # Could and should be parameterized, of course. Device name as cmd line parameter, perhaps?
        :param patterns: list of patterns to match the device name against
        :return: Map of ("Device Path" -> InputDevice) items
        """
        result = {}
        # find devices
        devices = evdev.list_devices()
        # create InputDevice instances
        devices = [evdev.InputDevice(fn) for fn in devices]

        # filter by device name
        devices = list(filter(lambda d: any(map(lambda y: y.match(d.name), patterns)), devices))

        # add manually defined paths
        for path in paths:
            try:
                if Path(path).exists():
                    devices.append(evdev.InputDevice(path))
                else:
                    logging.warning(f"Path doesn't exist: {path}")
            except Exception as e:
                logging.exception(e)

        for d in devices:
            result[d.path] = d

        return result

    async def _read_line(self, input_device: InputDevice) -> str or None:
        """
        Read a single line (ENTER stops input) from the given device
        :param input_device: the device to listen on
        :return: a barcode
        """
        # Using a thread executor here is a workaround for
        # input_device.async_read_loop() skipping input events sometimes,
        # so we use a synchronous method instead.
        # While not perfect, it has a much higher success rate.
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(None, self._keyevent_reader.read_line, input_device)
        return result

    def add_listener(self, listener: callable):
        """
        Add a barcode event listener
        :param listener: async callable taking two arguments
        """
        self.listeners.add(listener)

    async def _notify_listeners(self, event: BarcodeEvent):
        """
        Notifies all listeners about the scanned barcode
        :param event: barcode event
        """
        SCAN_COUNT.inc()
        LOGGER.info(f"{event.input_device.name} ({event.input_device.path}): {event.barcode}")
        for listener in self.listeners:
            asyncio.create_task(listener(event))