alot/ui.py
# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>
# This file is released under the GNU GPL, version 3 or a later revision.
# For further details see the COPYING file
import logging
import os
import signal
import codecs
import contextlib
import asyncio
import traceback
import urwid
from .settings.const import settings
from .buffers import BufferlistBuffer
from .buffers import SearchBuffer
from .commands import globals
from .commands import commandfactory
from .commands import CommandCanceled, SequenceCanceled
from .commands import CommandParseError
from .helper import split_commandline
from .helper import string_decode
from .helper import get_xdg_env
from .widgets.globals import CompleteEdit
from .widgets.globals import ChoiceWidget
async def periodic(callable_, period, *args, **kwargs):
while True:
try:
t = callable_(*args, **kwargs)
if asyncio.iscoroutine(t):
await t
except Exception as e:
logging.error('error in loop hook %s', str(e))
await asyncio.sleep(period)
class UI:
"""
This class integrates all components of alot and offers
methods for user interaction like :meth:`prompt`, :meth:`notify` etc.
It handles the urwid widget tree and mainloop (we use asyncio) and is
responsible for opening, closing and focussing buffers.
"""
def __init__(self, dbman, initialcmdline):
"""
:param dbman: :class:`~alot.db.DBManager`
:param initialcmdline: commandline applied after setting up interface
:type initialcmdline: str
:param colourmode: determines which theme to chose
:type colourmode: int in [1,16,256]
"""
self.dbman = dbman
"""Database Manager (:class:`~alot.db.manager.DBManager`)"""
self.buffers = []
"""list of active buffers"""
self.current_buffer = None
"""points to currently active :class:`~alot.buffers.Buffer`"""
self.db_was_locked = False
"""flag used to prevent multiple 'index locked' notifications"""
self.mode = 'global'
"""interface mode identifier - type of current buffer"""
self.commandprompthistory = []
"""history of the command line prompt"""
self.senderhistory = []
"""history of the sender prompt"""
self.recipienthistory = []
"""history of the recipients prompt"""
self.input_queue = []
"""stores partial keyboard input"""
self.last_commandline = None
"""saves the last executed commandline"""
# define empty notification pile
self._notificationbar = None
# should we show a status bar?
self._show_statusbar = settings.get('show_statusbar')
# pass keypresses to the root widget and never interpret bindings
self._passall = False
# indicates "input lock": only focus move commands are interpreted
self._locked = False
self._unlock_callback = None # will be called after input lock ended
self._unlock_key = None # key that ends input lock
# alarm handle for callback that clears input queue (to cancel alarm)
self._alarm = None
# force urwid to pass key events as unicode, independent of LANG
urwid.set_encoding('utf-8')
# create root widget
global_att = settings.get_theming_attribute('global', 'body')
mainframe = urwid.Frame(urwid.SolidFill())
self.root_widget = urwid.AttrMap(mainframe, global_att)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGUSR1, self._handle_signal)
# load histories
self._cache = os.path.join(
get_xdg_env('XDG_CACHE_HOME', os.path.expanduser('~/.cache')),
'alot', 'history')
self._cmd_hist_file = os.path.join(self._cache, 'commands')
self._sender_hist_file = os.path.join(self._cache, 'senders')
self._recipients_hist_file = os.path.join(self._cache, 'recipients')
size = settings.get('history_size')
self.commandprompthistory = self._load_history_from_file(
self._cmd_hist_file, size=size)
self.senderhistory = self._load_history_from_file(
self._sender_hist_file, size=size)
self.recipienthistory = self._load_history_from_file(
self._recipients_hist_file, size=size)
# set up main loop
self.mainloop = urwid.MainLoop(
self.root_widget,
handle_mouse=settings.get('handle_mouse'),
event_loop=urwid.TwistedEventLoop(),
unhandled_input=self._unhandled_input,
input_filter=self._input_filter)
loop = asyncio.get_event_loop()
# Create a task for the periodic hook
loop_hook = settings.get_hook('loop_hook')
if loop_hook:
# In Python 3.7 a nice aliase `asyncio.create_task` was added
loop.create_task(
periodic(
loop_hook, settings.get('periodic_hook_frequency'),
ui=self))
# set up colours
colourmode = int(settings.get('colourmode'))
logging.info('setup gui in %d colours', colourmode)
self.mainloop.screen.set_terminal_properties(colors=colourmode)
# clear the screen before the initial frame
self.mainloop.screen.clear()
logging.debug('fire first command')
loop.create_task(self.apply_commandline(initialcmdline))
# start urwids mainloop
self.mainloop.run()
def _error_handler(self, exception):
if isinstance(exception, CommandParseError):
self.notify(str(exception), priority='error')
elif isinstance(exception, CommandCanceled):
self.notify("operation cancelled", priority='error')
elif isinstance(exception, SequenceCanceled):
# This exception needs to trickle up to apply_commandline,
# then be handled explicitly.
raise exception
else:
logging.error(traceback.format_exc())
msg = "{}\n(check the log for details)".format(exception)
self.notify(msg, priority='error')
def _input_filter(self, keys, raw):
"""
handles keypresses.
This function gets triggered directly by class:`urwid.MainLoop`
upon user input and is supposed to pass on its `keys` parameter
to let the root widget handle keys. We intercept the input here
to trigger custom commands as defined in our keybindings.
"""
logging.debug("Got key (%s, %s)", keys, raw)
# work around: escape triggers this twice, with keys = raw = []
# the first time..
if not keys:
return
# let widgets handle input if key is virtual window resize keypress
# or we are in "passall" mode
elif 'window resize' in keys or self._passall:
return keys
# end "lockdown" mode if the right key was pressed
elif self._locked and keys[0] == self._unlock_key:
self._locked = False
self.mainloop.widget = self.root_widget
if callable(self._unlock_callback):
self._unlock_callback()
# otherwise interpret keybinding
else:
def clear(*_):
"""Callback that resets the input queue."""
if self._alarm is not None:
self.mainloop.remove_alarm(self._alarm)
self.input_queue = []
async def _apply_fire(cmdline):
try:
await self.apply_commandline(cmdline)
except CommandParseError as e:
self.notify(str(e), priority='error')
def fire(_, cmdline):
clear()
logging.debug("cmdline: '%s'", cmdline)
if not self._locked:
loop = asyncio.get_event_loop()
loop.create_task(_apply_fire(cmdline))
# move keys are always passed
elif cmdline in ['move up', 'move down', 'move page up',
'move page down']:
return [cmdline[5:]]
key = keys[0]
if key and 'mouse' in key[0]:
key = key[0] + ' %i' % key[1]
self.input_queue.append(key)
keyseq = ' '.join(self.input_queue)
candidates = settings.get_mapped_input_keysequences(self.mode,
prefix=keyseq)
if keyseq in candidates:
# case: current input queue is a mapped keysequence
# get binding and interpret it if non-null
cmdline = settings.get_keybinding(self.mode, keyseq)
if cmdline:
if len(candidates) > 1:
timeout = float(settings.get('input_timeout'))
if self._alarm is not None:
self.mainloop.remove_alarm(self._alarm)
self._alarm = self.mainloop.set_alarm_in(
timeout, fire, cmdline)
else:
return fire(self.mainloop, cmdline)
elif not candidates:
# case: no sequence with prefix keyseq is mapped
# just clear the input queue
clear()
else:
# case: some sequences with proper prefix keyseq is mapped
timeout = float(settings.get('input_timeout'))
if self._alarm is not None:
self.mainloop.remove_alarm(self._alarm)
self._alarm = self.mainloop.set_alarm_in(timeout, clear)
# update statusbar
self.update()
async def apply_commandline(self, cmdline):
"""
interprets a command line string
i.e., splits it into separate command strings,
instanciates :class:`Commands <alot.commands.Command>`
accordingly and applies then in sequence.
:param cmdline: command line to interpret
:type cmdline: str
"""
# remove initial spaces
cmdline = cmdline.lstrip()
# we pass Commands one by one to `self.apply_command`.
# To properly call them in sequence, even if they trigger asyncronous
# code (return Deferreds), these applications happen in individual
# callback functions which are then used as callback chain to some
# trivial Deferred that immediately calls its first callback. This way,
# one callback may return a Deferred and thus postpone the application
# of the next callback (and thus Command-application)
def apply_this_command(cmdstring):
logging.debug('%s command string: "%s"', self.mode, str(cmdstring))
# translate cmdstring into :class:`Command`
cmd = commandfactory(cmdstring, self.mode)
# store cmdline for use with 'repeat' command
if cmd.repeatable:
self.last_commandline = cmdline
return self.apply_command(cmd)
try:
for c in split_commandline(cmdline):
await apply_this_command(c)
except Exception as e:
if isinstance(e, SequenceCanceled):
self.notify("sequence of operations cancelled",
priority='error')
else:
self._error_handler(e)
@staticmethod
def _unhandled_input(key):
"""
Called by :class:`urwid.MainLoop` if a keypress was passed to the root
widget by `self._input_filter` but is not handled in any widget. We
keep it for debugging purposes.
"""
logging.debug('unhandled input: %s', key)
def show_as_root_until_keypress(self, w, key, afterwards=None):
"""
Replaces root widget by given :class:`urwid.Widget` and makes the UI
ignore all further commands apart from cursor movement.
If later on `key` is pressed, the old root widget is reset, callable
`afterwards` is called and normal behaviour is resumed.
"""
self.mainloop.widget = w
self._unlock_key = key
self._unlock_callback = afterwards
self._locked = True
def prompt(self, prefix, text='', completer=None, tab=0, history=None):
"""
prompt for text input.
This returns a :class:`asyncio.Future`, which will have a string value
:param prefix: text to print before the input field
:type prefix: str
:param text: initial content of the input field
:type text: str
:param completer: completion object to use
:type completer: :meth:`alot.completion.Completer`
:param tab: number of tabs to press initially
(to select completion results)
:type tab: int
:param history: history to be used for up/down keys
:type history: list of str
:rtype: asyncio.Future
"""
history = history or []
fut = asyncio.get_event_loop().create_future()
oldroot = self.mainloop.widget
def select_or_cancel(text):
"""Restore the main screen and invoce the callback (delayed return)
with the given text."""
self.mainloop.widget = oldroot
self._passall = False
fut.set_result(text)
def cerror(e):
logging.error(e)
self.notify('completion error: %s' % str(e),
priority='error')
self.update()
prefix = prefix + settings.get('prompt_suffix')
# set up widgets
leftpart = urwid.Text(prefix, align='left')
editpart = CompleteEdit(completer, on_exit=select_or_cancel,
edit_text=text, history=history,
on_error=cerror)
for _ in range(tab): # hit some tabs
editpart.keypress((0,), 'tab')
# build promptwidget
both = urwid.Columns(
[
('fixed', len(prefix), leftpart),
('weight', 1, editpart),
])
att = settings.get_theming_attribute('global', 'prompt')
both = urwid.AttrMap(both, att)
# put promptwidget as overlay on main widget
overlay = urwid.Overlay(both, oldroot,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 1),
None)
self.mainloop.widget = overlay
self._passall = True
return fut
@staticmethod
def exit():
"""
shuts down user interface without cleaning up.
Use a :class:`alot.commands.globals.ExitCommand` for a clean shutdown.
"""
try:
loop = asyncio.get_event_loop()
loop.stop()
except Exception as e:
logging.error('Could not stop loop: %s\nShutting down anyway..',
str(e))
@contextlib.contextmanager
def paused(self):
"""
context manager that pauses the UI to allow running external commands.
If an exception occurs, the UI will be started before the exception is
re-raised.
"""
self.mainloop.stop()
try:
yield
finally:
self.mainloop.start()
# make sure urwid renders its canvas at the correct size
self.mainloop.screen_size = None
self.mainloop.draw_screen()
def buffer_open(self, buf):
"""register and focus new :class:`~alot.buffers.Buffer`."""
# call pre_buffer_open hook
prehook = settings.get_hook('pre_buffer_open')
if prehook is not None:
prehook(ui=self, dbm=self.dbman, buf=buf)
if self.current_buffer is not None:
offset = settings.get('bufferclose_focus_offset') * -1
currentindex = self.buffers.index(self.current_buffer)
self.buffers.insert(currentindex + offset, buf)
else:
self.buffers.append(buf)
self.buffer_focus(buf)
# call post_buffer_open hook
posthook = settings.get_hook('post_buffer_open')
if posthook is not None:
posthook(ui=self, dbm=self.dbman, buf=buf)
def buffer_close(self, buf, redraw=True):
"""
closes given :class:`~alot.buffers.Buffer`.
This it removes it from the bufferlist and calls its cleanup() method.
"""
# call pre_buffer_close hook
prehook = settings.get_hook('pre_buffer_close')
if prehook is not None:
prehook(ui=self, dbm=self.dbman, buf=buf)
buffers = self.buffers
success = False
if buf not in buffers:
logging.error('tried to close unknown buffer: %s. \n\ni have:%s',
buf, self.buffers)
elif self.current_buffer == buf:
logging.info('closing current buffer %s', buf)
index = buffers.index(buf)
buffers.remove(buf)
offset = settings.get('bufferclose_focus_offset')
nextbuffer = buffers[(index + offset) % len(buffers)]
self.buffer_focus(nextbuffer, redraw)
buf.cleanup()
success = True
else:
buffers.remove(buf)
buf.cleanup()
success = True
# call post_buffer_closed hook
posthook = settings.get_hook('post_buffer_closed')
if posthook is not None:
posthook(ui=self, dbm=self.dbman, buf=buf, success=success)
def buffer_focus(self, buf, redraw=True):
"""focus given :class:`~alot.buffers.Buffer`."""
# call pre_buffer_focus hook
prehook = settings.get_hook('pre_buffer_focus')
if prehook is not None:
prehook(ui=self, dbm=self.dbman, buf=buf)
success = False
if buf not in self.buffers:
logging.error('tried to focus unknown buffer')
else:
if self.current_buffer != buf:
self.current_buffer = buf
self.mode = buf.modename
if isinstance(self.current_buffer, BufferlistBuffer):
self.current_buffer.rebuild()
self.update()
success = True
# call post_buffer_focus hook
posthook = settings.get_hook('post_buffer_focus')
if posthook is not None:
posthook(ui=self, dbm=self.dbman, buf=buf, success=success)
def get_deep_focus(self, startfrom=None):
"""return the bottom most focussed widget of the widget tree"""
if not startfrom:
startfrom = self.current_buffer
if 'get_focus' in dir(startfrom):
focus = startfrom.get_focus()
if isinstance(focus, tuple):
focus = focus[0]
if isinstance(focus, urwid.Widget):
return self.get_deep_focus(startfrom=focus)
return startfrom
def get_buffers_of_type(self, t):
"""
returns currently open buffers for a given subclass of
:class:`~alot.buffers.Buffer`.
:param t: Buffer class
:type t: alot.buffers.Buffer
:rtype: list
"""
return [x for x in self.buffers if isinstance(x, t)]
def clear_notify(self, messages):
"""
Clears notification popups. Call this to ged rid of messages that don't
time out.
:param messages: The popups to remove. This should be exactly
what :meth:`notify` returned when creating the popup
"""
newpile = self._notificationbar.widget_list
for l in messages:
if l in newpile:
newpile.remove(l)
if newpile:
self._notificationbar = urwid.Pile(newpile)
else:
self._notificationbar = None
self.update()
def choice(self, message, choices=None, select=None, cancel=None,
msg_position='above', choices_to_return=None):
"""
prompt user to make a choice.
:param message: string to display before list of choices
:type message: unicode
:param choices: dict of possible choices
:type choices: dict: keymap->choice (both str)
:param choices_to_return: dict of possible choices to return for the
choices of the choices of paramter
:type choices: dict: keymap->choice key is str and value is any obj)
:param select: choice to return if enter/return is hit. Ignored if set
to `None`.
:type select: str
:param cancel: choice to return if escape is hit. Ignored if set to
`None`.
:type cancel: str
:param msg_position: determines if `message` is above or left of the
prompt. Must be `above` or `left`.
:type msg_position: str
:rtype: asyncio.Future
"""
choices = choices or {'y': 'yes', 'n': 'no'}
assert select is None or select in choices.values()
assert cancel is None or cancel in choices.values()
assert msg_position in ['left', 'above']
fut = asyncio.get_event_loop().create_future() # Create a returned future
oldroot = self.mainloop.widget
def select_or_cancel(text):
"""Restore the main screen and invoce the callback (delayed return)
with the given text."""
self.mainloop.widget = oldroot
self._passall = False
fut.set_result(text)
# set up widgets
msgpart = urwid.Text(message)
choicespart = ChoiceWidget(choices,
choices_to_return=choices_to_return,
callback=select_or_cancel, select=select,
cancel=cancel)
# build widget
if msg_position == 'left':
both = urwid.Columns(
[
('fixed', len(message), msgpart),
('weight', 1, choicespart),
], dividechars=1)
else: # above
both = urwid.Pile([msgpart, choicespart])
att = settings.get_theming_attribute('global', 'prompt')
both = urwid.AttrMap(both, att, att)
# put promptwidget as overlay on main widget
overlay = urwid.Overlay(both, oldroot,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 1),
None)
self.mainloop.widget = overlay
self._passall = True
return fut
def notify(self, message, priority='normal', timeout=0, block=False):
"""
opens notification popup.
:param message: message to print
:type message: str
:param priority: priority string, used to format the popup: currently,
'normal' and 'error' are defined. If you use 'X' here,
the attribute 'global_notify_X' is used to format the
popup.
:type priority: str
:param timeout: seconds until message disappears. Defaults to the value
of 'notify_timeout' in the general config section.
A negative value means never time out.
:type timeout: int
:param block: this notification blocks until a keypress is made
:type block: bool
:returns: an urwid widget (this notification) that can be handed to
:meth:`clear_notify` for removal
"""
def build_line(msg, prio):
cols = urwid.Columns([urwid.Text(msg)])
att = settings.get_theming_attribute('global', 'notify_' + prio)
return urwid.AttrMap(cols, att)
msgs = [build_line(message, priority)]
if not self._notificationbar:
self._notificationbar = urwid.Pile(msgs)
else:
newpile = self._notificationbar.widget_list + msgs
self._notificationbar = urwid.Pile(newpile)
self.update()
def clear(*_):
self.clear_notify(msgs)
if block:
# put "cancel to continue" widget as overlay on main widget
txt = build_line('(escape continues)', priority)
overlay = urwid.Overlay(txt, self.root_widget,
('fixed left', 0),
('fixed right', 0),
('fixed bottom', 0),
None)
self.show_as_root_until_keypress(overlay, 'esc',
afterwards=clear)
else:
if timeout >= 0:
if timeout == 0:
timeout = settings.get('notify_timeout')
self.mainloop.set_alarm_in(timeout, clear)
return msgs[0]
def update(self, redraw=True):
"""redraw interface"""
# get the main urwid.Frame widget
mainframe = self.root_widget.original_widget
# body
if self.current_buffer:
mainframe.set_body(self.current_buffer)
# footer
lines = []
if self._notificationbar: # .get_text()[0] != ' ':
lines.append(self._notificationbar)
if self._show_statusbar:
lines.append(self.build_statusbar())
if lines:
mainframe.set_footer(urwid.Pile(lines))
else:
mainframe.set_footer(None)
# force a screen redraw
if self.mainloop.screen.started and redraw:
self.mainloop.draw_screen()
def build_statusbar(self):
"""construct and return statusbar widget"""
info = {}
cb = self.current_buffer
btype = None
if cb is not None:
info = cb.get_info()
btype = cb.modename
info['buffer_no'] = self.buffers.index(cb)
info['buffer_type'] = btype
info['total_messages'] = self.dbman.count_messages('*')
info['pending_writes'] = len(self.dbman.writequeue)
info['input_queue'] = ' '.join(self.input_queue)
lefttxt = righttxt = ''
if cb is not None:
lefttxt, righttxt = settings.get(btype + '_statusbar', ('', ''))
lefttxt = string_decode(lefttxt, 'UTF-8')
lefttxt = lefttxt.format(**info)
righttxt = string_decode(righttxt, 'UTF-8')
righttxt = righttxt.format(**info)
footerleft = urwid.Text(lefttxt, align='left')
pending_writes = len(self.dbman.writequeue)
if pending_writes > 0:
righttxt = ('|' * pending_writes) + ' ' + righttxt
footerright = urwid.Text(righttxt, align='right')
columns = urwid.Columns([
footerleft,
('pack', footerright)])
footer_att = settings.get_theming_attribute('global', 'footer')
return urwid.AttrMap(columns, footer_att)
async def apply_command(self, cmd):
"""
applies a command
This calls the pre and post hooks attached to the command,
as well as :meth:`cmd.apply`.
:param cmd: an applicable command
:type cmd: :class:`~alot.commands.Command`
"""
# FIXME: What are we guarding for here? We don't mention that None is
# allowed as a value fo cmd.
if cmd:
if cmd.prehook:
await cmd.prehook(ui=self, dbm=self.dbman, cmd=cmd)
try:
if asyncio.iscoroutinefunction(cmd.apply):
await cmd.apply(self)
else:
cmd.apply(self)
except Exception as e:
self._error_handler(e)
else:
if cmd.posthook:
logging.info('calling post-hook')
await cmd.posthook(ui=self, dbm=self.dbman, cmd=cmd)
def _handle_signal(self, signum, _frame):
"""
Handle UNIX signals: add a new task onto the event loop.
Doing it this way ensures what our handler has access to whatever
synchronization primitives or async calls it may require.
"""
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.handle_signal(signum), loop)
async def handle_signal(self, signum):
"""
handles UNIX signals
This function currently just handles SIGUSR1. It could be extended to
handle more
:param signum: The signal number (see man 7 signal)
"""
# it is a SIGINT ?
if signum == signal.SIGINT:
logging.info('shut down cleanly')
await self.apply_command(globals.ExitCommand())
elif signum == signal.SIGUSR1:
if isinstance(self.current_buffer, SearchBuffer):
self.current_buffer.rebuild()
self.update()
def cleanup(self):
"""Do the final clean up before shutting down."""
size = settings.get('history_size')
self._save_history_to_file(self.commandprompthistory,
self._cmd_hist_file, size=size)
self._save_history_to_file(self.senderhistory, self._sender_hist_file,
size=size)
self._save_history_to_file(self.recipienthistory,
self._recipients_hist_file, size=size)
@staticmethod
def _load_history_from_file(path, size=-1):
"""Load a history list from a file and split it into lines.
:param path: the path to the file that should be loaded
:type path: str
:param size: the number of lines to load (0 means no lines, < 0 means
all lines)
:type size: int
:returns: a list of history items (the lines of the file)
:rtype: list(str)
"""
if size == 0:
return []
if os.path.exists(path):
with codecs.open(path, 'r', encoding='utf-8') as histfile:
lines = [line.rstrip('\n') for line in histfile]
if size > 0:
lines = lines[-size:]
return lines
else:
return []
@staticmethod
def _save_history_to_file(history, path, size=-1):
"""Save a history list to a file for later loading (possibly in another
session).
:param history: the history list to save
:type history: list(str)
:param path: the path to the file where to save the history
:param size: the number of lines to save (0 means no lines, < 0 means
all lines)
:type size: int
:type path: str
:returns: None
"""
if size == 0:
return
if size > 0:
history = history[-size:]
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
# Write linewise to avoid building a large string in menory.
with codecs.open(path, 'w', encoding='utf-8') as histfile:
for line in history:
histfile.write(line)
histfile.write('\n')