ducky/devices/keyboard.py

Summary

Maintainability
C
1 day
Test Coverage
"""
Keyboard controller - provides events for pressed and released keys.
"""

import enum
import io

from collections import deque

from . import DeviceFrontend, DeviceBackend, MMIOMemoryPage
from ..errors import InvalidResourceError
from ..mm import UINT8_FMT, addr_to_page, UINT32_FMT, u32_t
from ..hdt import HDTEntry_Device

DEFAULT_IRQ = 0x01
DEFAULT_MMIO_ADDRESS = 0x8000

class KeyboardPorts(enum.IntEnum):
  STATUS = 0x00
  DATA   = 0x01

  LAST   = 0x01

class HDTEntry_Keyboard(HDTEntry_Device):
  _fields_ = HDTEntry_Device.ENTRY_HEADER + [
    ('mmio_address', u32_t)
  ]

  def __init__(self, logger, config, section):
    super(HDTEntry_Keyboard, self).__init__(logger, section, 'Virtual keyboard controller')

    self.mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS)

    logger.debug('%s: mmio-address=%s', self.__class__.__name__, UINT32_FMT(self.mmio_address))

class KeyboardMMIOMemoryPage(MMIOMemoryPage):
  def read_u8(self, offset):
    self.DEBUG('%s.read_u8: offset=%s', self.__class__.__name__, UINT8_FMT(offset))

    if offset == KeyboardPorts.STATUS:
      return 0x00

    if offset == KeyboardPorts.DATA:
      b = self._device._read_char()
      if b is None:
        self.DEBUG('%s.get: empty input, signal it downstream', self.__class__.__name__)
        return 0xFF

      self.DEBUG('%s.get: input byte is %i', self.__class__.__name__, b)
      return b

    self.WARN('%s.read_u8: attempt to read raw offset: offset=%s', self.__class__.__name__, UINT8_FMT(offset))
    return 0x00

class ControlMessages(enum.IntEnum):
  HALT = 1025

  CONTROL_MESSAGE_FIRST = 1024

class Frontend(DeviceFrontend):
  def __init__(self, machine, name):
    super(Frontend, self).__init__(machine, 'input', name)

    self._comm_queue = machine.comm_channel.get_queue(name)
    self._streams = []
    self._stream = None

    self.backend = machine.get_device_by_name(name)

  @staticmethod
  def create_from_config(machine, config, section):
    slave = config.get(section, 'slave', default = section)

    return Frontend(machine, slave)

  def boot(self):
    super(Frontend, self).boot()

    self._open_input()
    self.backend.boot()

  def halt(self):
    self._close_input()
    self.backend.halt()

    super(Frontend, self).halt()

  def enqueue_stream(self, stream):
    self.machine.DEBUG('%s.enqueue_input: stream=%s', self.__class__.__name__, stream)

    if not stream.has_poll_support():
      raise InvalidResourceError('Keyboard stream must support polling')

    self._streams.append(stream)

  def _close_input(self):
    self.machine.DEBUG('%s._close_input: input=%s', self.__class__.__name__, self._stream)

    if self._stream is None:
      return

    self._stream.unregister_with_reactor(self.machine.reactor)
    self._stream = None

  def _open_input(self):
    self.machine.DEBUG('%s._open_input', self.__class__.__name__)

    self._close_input()

    if not self._streams:
      self.machine.DEBUG('%s._open_input: no additional input streams', self.__class__.__name__)
      self._comm_queue.write_in(ControlMessages.HALT)

      # if not self.queue or self.queue[-1] != ControlMessages.HALT:
      #   self.machine.DEBUG('signal halt')
      #   self.queue.append(ControlMessages.HALT)
      return

    self._stream = self._streams.pop(0)
    self.machine.DEBUG('%s._open_input: stream=%r', self.__class__.__name__, self._stream)

    self._stream.register_with_reactor(self.machine.reactor, on_read = self._handle_raw_input, on_error = self._handle_input_error)

  def _handle_input_error(self):
    self.machine.DEBUG('%s._handle_input_error')

    self._open_input()

  def _handle_raw_input(self):
    self.machine.DEBUG('%s._handle_raw_input', self.__class__.__name__)

    assert self._stream is not False

    buff = self._stream.read(size = io.DEFAULT_BUFFER_SIZE)
    self.machine.DEBUG('%s._handle_raw_input: buff=%s (%s)', self.__class__.__name__, buff, type(buff))

    if buff is None:
      self.machine.DEBUG('%s._handle_raw_input: nothing to do, no input', self.__class__.__name__)
      return

    if not buff:
      # EOF
      self._open_input()
      return

    self.machine.DEBUG('%s._handle_raw_input: adding %i chars', self.__class__.__name__, len(buff))

    self._comm_queue.write_in(buff)

    self.machine.trigger_irq(self.backend)

class Backend(DeviceBackend):
  def __init__(self, machine, name, mmio_address = None, irq = None):
    super(Backend, self).__init__(machine, 'input', name)

    self._mmio_address = mmio_address or DEFAULT_MMIO_ADDRESS
    self._mmio_page = None
    self.irq = irq or DEFAULT_IRQ

    self._comm_queue = machine.comm_channel.create_queue(name)
    self._key_queue = deque()

  @staticmethod
  def create_from_config(machine, config, section):
    return Backend(machine, section,
                   mmio_address = config.getint(section, 'mmio-address', DEFAULT_MMIO_ADDRESS),
                   irq = config.getint(section, 'irq', DEFAULT_IRQ))

  @staticmethod
  def create_hdt_entries(logger, config, section):
    return [HDTEntry_Keyboard(logger, config, section)]

  def __repr__(self):
    return 'basic keyboard controller on [%s] as %s' % (UINT32_FMT(self._mmio_address), self.name)

  def boot(self):
    self.machine.DEBUG('%s.boot', self.__class__.__name__)

    self._mmio_page = KeyboardMMIOMemoryPage(self, self.machine.memory, addr_to_page(self._mmio_address))
    self.machine.memory.register_page(self._mmio_page)

    self.machine.tenh('hid: %s', self)

  def halt(self):
    self.machine.DEBUG('%s.halt', self.__class__.__name__)

    self.machine.memory.unregister_page(self._mmio_page)

  def _process_input_events(self):
    self.machine.DEBUG('%s.__process_input_events', self.__class__.__name__)

    while True:
      e = self._comm_queue.read_in()
      if e is None:
        return

      if isinstance(e, (list, bytearray, bytes)):
        for key in e:
          self._key_queue.append(key)

      elif isinstance(e, ControlMessages):
        self._key_queue.append(e)

      else:
        raise InvalidResourceError('Unknown message: e=%s, type=%s' % (e, type(e)))

  def _read_char(self):
    q = self._key_queue

    if not q:
      self._process_input_events()

      if not q:
        return None

    b = q.popleft()

    if b == ControlMessages.HALT:
      self.machine.halt()

      return None

    return b