hSaria/cPing

View on GitHub
cping/layouts/modern.py

Summary

Maintainability
A
0 mins
Test Coverage
'''Curses-based interactive window.'''
import curses
import math
import sys

import cping.layouts
import cping.protocols
import cping.utils

COLUMN_DELIMITER = '  '
COLUMN_WIDTH_MINIMUM = 6


class Layout(cping.layouts.Layout):
    '''Curses-based interactive window.'''
    colors = {}

    def __call__(self):
        # Perform curses initialization code and call render, then clean up
        curses.wrapper(self.render)

    @staticmethod
    def initialize_colors():
        '''Populate the `Layout.colors` dictionaries with the attribute numbers.'''
        curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
        Layout.colors['green'] = curses.color_pair(1)
        curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)
        Layout.colors['red'] = curses.color_pair(2)
        curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
        Layout.colors['yellow'] = curses.color_pair(3)

    @staticmethod
    def render_sparkline(window, line, column, host, length):
        '''Render a sparkline at the requested position.

        Args:
            window (curses.window): Window to which the sparkline is rendered.
            line (int): y coordinate.
            column (int): x coordinate of the start of the sparkline.
            host (cping.protocols.Host): Source of the sparkline's data.
            length (int): The maximum length of the sparkline.
        '''
        for result in list(host.results)[-length:]:
            if result['latency'] == -1:
                color = 'red'
                point = '.' if sys.platform == 'win32' else '░'
            else:
                color = 'yellow' if result['error'] else 'green'
                point = '!'

                if sys.platform != 'win32':
                    point = cping.utils.sparkline_point(
                        result['latency'] * 1000,
                        host.results_summary['min'],
                        host.results_summary['max'],
                        host.results_summary['stdev'],
                    )

            color = Layout.colors.get(color, curses.A_NORMAL)
            window.addstr(line, column, point, color)

            # Shift to next point
            column += 1

    @staticmethod
    def render_table(window, table, selection):
        '''Calls `window.addnstr` to render the table based on the selection.

        Args:
            window (curses.window): Window to which the table is rendered.
            table (list): The results table (i.e. `get_table`).
            selection (int): The index of the selected row.
        '''
        lines, columns = window.getmaxyx()

        page = get_table_page(table, lines - 1, selection)
        page_count = math.ceil(len(table) / (lines - 1))
        page_number = (selection // (lines - 1)) + 1

        footer = get_table_footer(page_count, page_number)
        footer = footer.ljust(columns)

        try:
            window.erase()

            # Add the page to the window
            for index, row in enumerate(page):
                # Extend the header to the end of the screen
                if page_number == 1 and index == 0:
                    row['line'] = row['line'].ljust(columns)

                # Highlight selection; account for pagination
                if index == selection % (lines - 1):
                    row['attrs'] |= curses.A_BOLD

                window.addnstr(index, 0, row['line'], columns, row['attrs'])

                # Add the sparkline if this isn't the header
                if row.get('host'):
                    sparkline_length = columns - len(row['line'])

                    if sparkline_length > 0:
                        row['host'].set_results_length(sparkline_length)
                        Layout.render_sparkline(
                            window,
                            index,
                            len(row['line']),
                            row['host'],
                            sparkline_length,
                        )

            # Add a footer to the bottom of the screen
            window.addnstr(lines - 1, 0, footer, columns, curses.A_STANDOUT)
            window.refresh()
        except curses.error:
            # Triggers when excessively resizing the window
            pass

    def render(self, window):
        '''Start rendering the layout. Blocking function meant to be called with
        `curses.wrapper(self.render)`.'''
        # pylint: disable=too-many-branches,not-an-iterable  # linter bug
        Layout.initialize_colors()

        # Set the timeout (ms) for `windows.getch`
        window.timeout(int(self.protocol.interval * 1000))

        # State tracking
        button = selection = show_address = sort_key = 0

        while button != ord('q'):
            table = get_table(self.hosts, sort_key, show_address=show_address)
            Layout.render_table(window, table, selection)

            # Clear burst mode to avoid sticking while waiting on getch
            for host in self.hosts:
                host.burst_mode.clear()

            button = window.getch()

            if button == curses.KEY_UP:
                selection = max(selection - 1, 0)
            elif button == curses.KEY_DOWN:
                selection = min(selection + 1, len(self.hosts))
            elif button == ord('a'):
                show_address = not show_address
            elif button == ord('b'):
                # Burst mode
                if selection > 0:
                    table[selection]['host'].burst_mode.set()
                else:
                    for host in self.hosts:
                        host.burst_mode.set()
            elif button == ord('s'):
                # Start or stop the selected host (all if header selected)
                if selection > 0:
                    if table[selection]['host'].is_running():
                        table[selection]['host'].stop(block=True)
                    else:
                        table[selection]['host'].start()
                elif any(host.is_running() for host in self.hosts):
                    for host in self.hosts:
                        host.stop(block=True)
                else:
                    interval = self.protocol.interval
                    cping.utils.stagger_start(self.hosts, interval)
            elif button in range(48, 48 + 7):
                # Sorting: 48 is the '0' key, so this is effectively `range(7)`
                sort_key = get_table_sort_key(button % 48, sort_key)

            # Flush input buffer to remove queued keys pressed during processing
            curses.flushinp()


def get_host_columns(host, show_address=False):
    '''Returns a list of strings containing host, min, avg, max, stdev, loss.

    Args:
        host (cping.protocols.Host): Host from which to get the details.
    '''
    columns = [str(host)]

    if show_address and hasattr(host, 'addrinfo'):
        address = host.addrinfo[4][0]

        if host.name != address:
            columns[0] += f' {address}'

    for stat in ['min', 'avg', 'max', 'stdev', 'loss']:
        if host.results_summary[stat] is None:
            columns.append('-  ')
            continue

        if stat == 'loss':
            columns.append(f'{host.results_summary[stat]:.0%} ')
        else:
            columns.append(f'{host.results_summary[stat]:.2f}')

    return columns


def get_table(hosts, sort_key=0, show_address=False):
    '''Returns a list of dictionaries, one for each row.

    Args:
        hosts (list): The hosts upon which the table is based.
        sort_key (int): Passed to `sort_hosts`.
    '''
    # Table starts with the header
    header = [' HOST ', 'MIN ', 'AVG ', 'MAX ', 'STD ', 'LOSS ']
    table = [{'columns': header, 'attrs': curses.A_STANDOUT}]

    # Add sorting indicator
    if isinstance(sort_key, int) and 0 < abs(sort_key) <= len(header):
        header[abs(sort_key) - 1] += '▲' if sort_key > 0 else '▼'

    # Add the hosts, their columns, and the appropriate curses attributes
    for host in sort_hosts(hosts, sort_key):
        row = {
            'host': host,
            'columns': get_host_columns(host, show_address=show_address),
            'attrs': curses.A_NORMAL,
        }

        if not host.is_running():
            row['attrs'] = curses.A_UNDERLINE

        table.append(row)

    # Calculate the maximum width of each column among all rows
    column_widths = [COLUMN_WIDTH_MINIMUM] * 6

    for column in range(6):
        for row in table:
            column_width = len(row['columns'][column])
            column_widths[column] = max(column_width, column_widths[column])

    # Align columns and store the resulting string into the `line` key
    for row in table:
        if row.get('host') and row['host'].status:
            # The header will always be in the table at this point
            padding = len(table[0]['line'])
            status = row['host'].status

            row['line'] = COLUMN_DELIMITER.join([row['columns'][0], status])
            row['line'] = row['line'].ljust(padding)[:padding]
            continue

        for column in range(6):
            # The Host column is left justified
            if column == 0:
                value = row['columns'][column].ljust(column_widths[column])
            else:
                value = row['columns'][column].rjust(column_widths[column])

            row['columns'][column] = value

        row['line'] = COLUMN_DELIMITER.join(row['columns'])

    return table


def get_table_footer(page_count, page_number):
    '''Returns a footer (string) for a table.

    Args:
        page_count (int): The total number of pages.
        page_number (int): Currently selected page.
    '''
    footer = f' Page {page_number}/{page_count} | ' if page_count > 1 else ''

    footer += ', '.join([
        '[B]urst mode (hold)',
        '[S]tart/[S]top',
        'Show [a]ddress',
        '[Q]uit',
        '[▲/▼] Change selection',
        '[1-6] Order table',
    ])

    return footer.upper()


def get_table_page(table, size, selection):
    '''Returns a subset of the table based on the selection.

    Args:
        table (list): The list of rows to filter.
        size (int): Number of rows to return in the page.
        selection (int): The index of the item to be highlighted.
    '''
    page = []

    for index, row in enumerate(table):
        # Row in the selected page
        if index // size == selection // size:
            page.append(row)

    return page


def get_table_sort_key(new, current):
    '''Returns the new sorting index depending on the current one. Positive
    implies ascending, negative is descending, and `0` means no sorting. The
    sorting will cycle between ascending, descending, and `0` if `new` is the
    same as `abs(current)`.

    Args:
        new (int): the sorting key being requested.
        current (int): the sorting key currently being used.
    '''
    if isinstance(current, int) and abs(current) == new:
        if current < 0:
            # Currently descending; reset sorting
            return 0
        # Currently ascending; change to descending
        return -current
    # Currently None or some other key; change to ascending
    return new


def host_results_sort_key(host, key):
    '''Returns `host.results_summary.get(key)` if it's not None. Otherise 10**6.'''
    value = host.results_summary.get(key)
    return value if value is not None else 10**6


def sort_hosts(hosts, sort_key=0):
    '''Returns `hosts`, sorted according to `sort_key`.

    Args:
        hosts (list): The list of hosts to be sorted.
        sort_key (int): The column by which the table is sorted. Starting
            at 1, the column numbers map to: host, min, avg, max, stdev, and
            loss.
    '''
    sort_keys = {
        1: lambda host: cping.utils.natural_ordering_sort_key(str(host)),
        2: lambda host: host_results_sort_key(host, 'min'),
        3: lambda host: host_results_sort_key(host, 'avg'),
        4: lambda host: host_results_sort_key(host, 'max'),
        5: lambda host: host_results_sort_key(host, 'stdev'),
        6: lambda host: host_results_sort_key(host, 'loss'),
    }

    # Get the respective lambda sort key, defaulting to no sorting
    key = sort_keys.get(abs(sort_key or 0), lambda host: 0)
    reverse = isinstance(sort_key, int) and sort_key < 0

    return sorted(hosts, key=key, reverse=reverse)