barcode_server/barcode.py
import asyncio
import logging
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict
import evdev
from evdev import *
from barcode_server.config import AppConfig
from barcode_server.keyevent_reader import KeyEventReader
from barcode_server.stats import SCAN_COUNT, DEVICES_COUNT, DEVICE_DETECTION_TIME
LOGGER = logging.getLogger(__name__)
class BarcodeEvent:
def __init__(self, input_device: InputDevice, barcode: str, date: datetime = None):
self.id = str(uuid.uuid4())
self.date = date if date is not None else datetime.now()
self.device = input_device
self.input_device = self.device
self.barcode = barcode
class BarcodeReader:
"""
Reads barcodes from all USB barcode scanners in the system
"""
def __init__(self, config: AppConfig):
self.config = config
self.devices = {}
self.listeners = set()
self._keyevent_reader = KeyEventReader()
self._main_task = None
self._device_tasks = {}
async def start(self):
"""
Start detecting and reading barcode scanner devices
"""
self._main_task = asyncio.create_task(self._detect_and_read())
async def stop(self):
"""
Stop detecting and reading barcode scanner devices
"""
if self._main_task is None:
return
for device_path, t in self._device_tasks.items():
t.cancel()
self._device_tasks.clear()
self._main_task.cancel()
self._main_task = None
async def _detect_and_read(self):
"""
Detect barcode scanner devices and start readers for them
"""
while True:
try:
self.devices = self._find_devices(self.config.DEVICE_PATTERNS.value, self.config.DEVICE_PATHS.value)
DEVICES_COUNT.set(len(self.devices))
for path, d in self.devices.items():
if path in self._device_tasks:
continue
LOGGER.info(
f"Reading: {d.path}: Name: {d.name}, "
f"Vendor: {d.info.vendor:04x}, Product: {d.info.product:04x}")
task = asyncio.create_task(self._start_reader(d))
self._device_tasks[path] = task
await asyncio.sleep(1)
except Exception as e:
logging.exception(e)
await asyncio.sleep(10)
async def _start_reader(self, input_device):
"""
Start a reader for a specific device
:param input_device: the input device
"""
try:
# become the sole recipient of all incoming input events
input_device.grab()
while True:
barcode = await self._read_line(input_device)
if barcode is not None and len(barcode) > 0:
event = BarcodeEvent(input_device, barcode)
asyncio.create_task(self._notify_listeners(event))
except Exception as e:
LOGGER.exception(e)
self._device_tasks.pop(input_device.path)
finally:
try:
# release device
input_device.ungrab()
except Exception as e:
pass
@staticmethod
@DEVICE_DETECTION_TIME.time()
def _find_devices(patterns: List, paths: List[str]) -> Dict[str, InputDevice]:
"""
# Finds the input device with the name ".*Barcode Reader.*".
# Could and should be parameterized, of course. Device name as cmd line parameter, perhaps?
:param patterns: list of patterns to match the device name against
:return: Map of ("Device Path" -> InputDevice) items
"""
result = {}
# find devices
devices = evdev.list_devices()
# create InputDevice instances
devices = [evdev.InputDevice(fn) for fn in devices]
# filter by device name
devices = list(filter(lambda d: any(map(lambda y: y.match(d.name), patterns)), devices))
# add manually defined paths
for path in paths:
try:
if Path(path).exists():
devices.append(evdev.InputDevice(path))
else:
logging.warning(f"Path doesn't exist: {path}")
except Exception as e:
logging.exception(e)
for d in devices:
result[d.path] = d
return result
async def _read_line(self, input_device: InputDevice) -> str or None:
"""
Read a single line (ENTER stops input) from the given device
:param input_device: the device to listen on
:return: a barcode
"""
# Using a thread executor here is a workaround for
# input_device.async_read_loop() skipping input events sometimes,
# so we use a synchronous method instead.
# While not perfect, it has a much higher success rate.
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self._keyevent_reader.read_line, input_device)
return result
def add_listener(self, listener: callable):
"""
Add a barcode event listener
:param listener: async callable taking two arguments
"""
self.listeners.add(listener)
async def _notify_listeners(self, event: BarcodeEvent):
"""
Notifies all listeners about the scanned barcode
:param event: barcode event
"""
SCAN_COUNT.inc()
LOGGER.info(f"{event.input_device.name} ({event.input_device.path}): {event.barcode}")
for listener in self.listeners:
asyncio.create_task(listener(event))