node_tools/async_funcs.py

Summary

Maintainability
B
5 hrs
Test Coverage
# coding: utf-8

"""asyncio-specific wrapper functions."""

import asyncio
import aiohttp
import logging
import time


logger = logging.getLogger(__name__)


async def bootstrap_mbr_node(client, ctlr_id, node_id, deque, ex=False):
    """
    Wrapper for bootstrapping a new member node; adds one network for
    each node and adds each node to its (new) network.  Updates net/id
    tries with new data.
    :notes: Since we *always* provide a new (ZT) network, we do *not*
            handle existing nodes here.  The `deque` parameter is
            required for handle_net_cfg().
    :param client: ztcli_api client object
    :param ctlr_id: node ID of controller node
    :param node_id: node ID
    :param deque: netobj queue
    :param ex: True if node is an exit node
    """
    from node_tools import ctlr_data as ct

    from node_tools.ctlr_funcs import get_network_id
    from node_tools.ctlr_funcs import handle_net_cfg
    from node_tools.ctlr_funcs import set_network_cfg
    from node_tools.trie_funcs import find_dangling_nets
    from node_tools.trie_funcs import get_dangling_net_data
    from node_tools.trie_funcs import update_id_trie

    await add_network_object(client, ctlr_id=ctlr_id)
    net_id = get_network_id(client.data)
    logger.debug('BOOTSTRAP: added network id {}'.format(net_id))
    net_rules = ct.rules
    await config_network_object(client, net_rules, net_id)
    logger.debug('BOOTSTRAP: added network rules {}'.format(net_rules))
    await get_network_object_ids(client)
    logger.debug('BOOTSTRAP: got network list {}'.format(client.data))

    net_list = client.data
    if net_id in net_list:
        # Get details about each network
        await get_network_object_data(client, net_id)
        logger.debug('BOOTSTRAP: got network data {}'.format(client.data))
        trie_nets = [net_id]

        await add_network_object(client, net_id, node_id)
        logger.debug('BOOTSTRAP: added node id {} to gw net'.format(node_id))
        await get_network_object_ids(client, net_id)
        member_dict = client.data
        logger.debug('BOOTSTRAP: got node dict {}'.format(member_dict))
        await get_network_object_data(client, net_id, node_id)
        logger.debug('BOOTSTRAP: got node data {}'.format(client.data))

        ipnet, _, gw = handle_net_cfg(deque)
        await config_network_object(client, ipnet, net_id)

        # A dedicated exit node is a special case, otherwise, each new node
        # gets a src_net here, and still *needs* a exit_net.  We also need to
        # update the src net needs after linking the next mbr node.
        if not ex:
            data_list = find_dangling_nets(ct.id_trie)
            logger.debug('BOOTSTRAP: got exit net {}'.format(data_list))
            if len(data_list) == 2:
                exit_net = data_list[0]
                exit_node = data_list[1]
                await add_network_object(client, exit_net, node_id)
                logger.debug('BOOTSTRAP: added node id {} to exit net {}'.format(node_id, exit_net))
                netcfg = get_dangling_net_data(ct.net_trie, exit_net)
                gw_cfg = set_network_cfg(netcfg.host)
                logger.debug('BOOTSTRAP: got node addr {} for exit net'.format(gw_cfg))
                await config_network_object(client, gw_cfg, exit_net, node_id)
                trie_nets = [net_id, exit_net]
            else:
                logger.error('BOOTSTRAP: malformed exit net data {}'.format(data_list))

        await config_network_object(client, gw, net_id, node_id)
        logger.debug('BOOTSTRAP: set gw addr {} for src net {}'.format(gw, net_id))

        await update_mbr_data(client, ct.net_trie, net_id, node_id)
        logger.debug('BOOTSTRAP: loaded net trie with {} and {} data'.format(node_id, net_id))

        node_needs = [False, False]
        net_needs = [False, True]
        if not ex:
            update_id_trie(ct.id_trie, [exit_net], [node_id, exit_node], needs=[False, False], nw=True)

        update_id_trie(ct.id_trie, trie_nets, [node_id], needs=node_needs)
        update_id_trie(ct.id_trie, [net_id], [node_id], needs=net_needs, nw=True)
        # logger.debug('TRIE: id_trie has items: {}'.format(ct.id_trie.items()))


async def close_mbr_net(client, node_lst, boot_lst, min_nodes=5):
    """
    Wrapper for closing the bootstrap chain or adding it to an existing
    (closed) network. This should run in the ctlr state runner *after*
    the other handlers.
    :param client: ztcli_api client object
    :param node_lst: list of all active nodes
    :param boot_lst: list of bootstrap nodes
    :param min_nodes: minimum number of nodes for a closed network
    """
    from node_tools import ctlr_data as ct
    from node_tools import state_data as st

    from node_tools.ctlr_funcs import unset_network_cfg
    from node_tools.network_funcs import publish_cfg_msg
    from node_tools.trie_funcs import cleanup_state_tries
    from node_tools.trie_funcs import find_dangling_nets
    from node_tools.trie_funcs import find_exit_net
    from node_tools.trie_funcs import get_neighbor_ids
    from node_tools.trie_funcs import get_target_node_id

    head_id = boot_lst[-1]
    tail_id = boot_lst[0]
    head_exit_net = find_exit_net(ct.id_trie)[0]
    head_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, head_id)
    tail_exit_net = find_dangling_nets(ct.id_trie)[0]
    deauth = unset_network_cfg()

    # if true, we only have a boot list
    if len(node_lst) == len(boot_lst):
        # check if we have enough nodes for a network
        if len(boot_lst) >= min_nodes:
            logger.debug('CLOSURE: creating network from boot_list {}'.format(boot_lst))
            for mbr_id in [head_id, tail_id]:
                st.wait_cache.set(mbr_id, True, 90)
            # detach and connect head to tail
            await config_network_object(client, deauth, head_exit_net, head_id)
            cleanup_state_tries(ct.net_trie, ct.id_trie, head_exit_net, head_id, mbr_only=True)
            logger.debug('CLOSURE: deauthed head id {} from exit net {}'.format(head_id, head_exit_net))

            await connect_mbr_node(client, head_id, head_src_net, tail_exit_net, tail_id)
            publish_cfg_msg(ct.id_trie, head_id, addr='127.0.0.1')
        else:
            logger.debug('CLOSURE: not enough bootstrap nodes to wrap')
    else:
        logger.debug('CLOSURE: adding bootstrap list {} to network'.format(boot_lst))
        tgt_id = get_target_node_id(node_lst, boot_lst)
        tgt_net, tgt_exit_net, tgt_src_node, tgt_exit_node = get_neighbor_ids(ct.net_trie, tgt_id)
        tgt_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, tgt_src_node)

        for mbr_id in [tgt_id, tail_id, head_id, tgt_exit_node]:
            st.wait_cache.set(mbr_id, True, 120)
        # detach and connect tgt to tail
        await config_network_object(client, deauth, tgt_exit_net, tgt_id)
        cleanup_state_tries(ct.net_trie, ct.id_trie, tgt_exit_net, tgt_id, mbr_only=True)
        logger.debug('CLOSURE: deauthed tgt id {} from tgt exit net {}'.format(tgt_id, tgt_exit_net))

        await connect_mbr_node(client, tgt_id, tgt_src_net, tail_exit_net, tail_id)
        publish_cfg_msg(ct.id_trie, tgt_id, addr='127.0.0.1')

        # detach and connect head to tgt exit net
        await config_network_object(client, deauth, head_exit_net, head_id)
        cleanup_state_tries(ct.net_trie, ct.id_trie, head_exit_net, head_id, mbr_only=True)
        logger.debug('CLOSURE: deauthed node id {} from head exit net {}'.format(head_id, head_exit_net))

        await connect_mbr_node(client, head_id, head_src_net, tgt_exit_net, tgt_exit_node)
        time.sleep(0.02)
        publish_cfg_msg(ct.id_trie, head_id, addr='127.0.0.1')


async def cleanup_orphans(client):
    """
    Simple cleanup function to run after node juggling and before the
    tries are updated (in the main netstate context).
    :param client: ztcli_api client object
    """
    from node_tools import ctlr_data as ct
    from node_tools import state_data as st

    from node_tools.trie_funcs import cleanup_state_tries
    from node_tools.trie_funcs import find_orphans

    net_list, node_list = find_orphans(ct.net_trie, ct.id_trie)
    logger.debug('got net_list {} and node_list: {}'.format(net_list, node_list))
    if net_list:
        for thing in net_list:
            if isinstance(thing, str):
                await delete_network_object(client, thing)
                cleanup_state_tries(ct.net_trie, ct.id_trie, thing, None)
                logger.warning('CLEANUP: removed orphan: {}'.format(thing))
            elif isinstance(thing, tuple):
                if st.wait_cache.get(thing[1]) is None:
                    await delete_network_object(client, thing[0])
                    cleanup_state_tries(ct.net_trie, ct.id_trie, thing[0], thing[1])
                    logger.warning('CLEANUP: removed orphan: {}'.format(thing))
    if node_list:
        for thing in node_list:
            del ct.id_trie[thing]
            logger.warning('CLEANUP: removed orphan: {}'.format(thing))


async def connect_mbr_node(client, node_id, src_net, exit_net, gw_node):
    """
    Wrapper to reconnect an existing member node; needs the (upstream)
    exit net and node IDs as well as its own ID and src_net ID.
    :notes: uses both Tries (required by offline_mbr_node below)
    :param client: ztcli_api client object
    :param node_id: node ID
    :param node_net: src_net ID for node
    :param exit_net: network ID of the gateway net to connect
    :param gw_node: node ID of the gateway host
    """
    from node_tools import ctlr_data as ct

    from node_tools.ctlr_funcs import set_network_cfg
    from node_tools.trie_funcs import get_dangling_net_data
    from node_tools.trie_funcs import update_id_trie

    await add_network_object(client, exit_net, node_id)
    logger.debug('CONNECT: added neighbor {} to exit net {}'.format(node_id, exit_net))
    netcfg = get_dangling_net_data(ct.net_trie, exit_net)
    gw_cfg = set_network_cfg(netcfg.host)
    logger.debug('CONNECT: got cfg {} for exit net'.format(gw_cfg))
    await config_network_object(client, gw_cfg, exit_net, node_id)
    logger.debug('CONNECT: net_trie keys: {}'.format(list(ct.net_trie)))
    logger.debug('CONNECT: id_trie keys: {}'.format(list(ct.id_trie)))

    update_id_trie(ct.id_trie, [exit_net], [node_id, gw_node], needs=[False, False], nw=True)
    update_id_trie(ct.id_trie, [src_net, exit_net], [node_id], needs=[False, False])


async def offline_mbr_node(client, node_id):
    """
    Wrapper for handling an offline member node; removes the mbr node
    and its left-hand (src) net, and relinks both nieghbor nodes.
    This should run in the ctlr state runner *before* the state trie
    updates happen.
    :param client: ztcli_api client object
    :param node_id: node ID
    """
    from node_tools import ctlr_data as ct
    from node_tools import state_data as st

    from node_tools.ctlr_funcs import unset_network_cfg
    from node_tools.network_funcs import publish_cfg_msg
    from node_tools.trie_funcs import cleanup_state_tries
    from node_tools.trie_funcs import get_neighbor_ids

    try:
        node_net, exit_net, src_node, exit_node = get_neighbor_ids(ct.net_trie, node_id)
        node_nets = [node_net, exit_net]
        logger.debug('OFFLINE: got node_nets {} and nodes {} {}'.format(node_nets, src_node, exit_node))
        if exit_node is not None:
            st.wait_cache.set(exit_node, True, 65)
            logger.debug('OFFLINE: added exit_node {} to wait cache'.format(exit_node))
    except Exception as exc:
        logger.error('OFFLINE: {}'.format(exc))
        node_nets = [None, None]

    deauth = unset_network_cfg()
    if node_nets != [None, None]:
        if src_node is not None:
            src_net, _, _, _ = get_neighbor_ids(ct.net_trie, src_node)

        if src_node is None:
            if exit_net is not None:
                await config_network_object(client, deauth, exit_net, node_id)
                cleanup_state_tries(ct.net_trie, ct.id_trie, exit_net, node_id, mbr_only=True)
                logger.debug('OFFLINE: deauthed node id {} from exit net {}'.format(node_id, exit_net))
            await delete_network_object(client, node_net)
            cleanup_state_tries(ct.net_trie, ct.id_trie, node_net, node_id)
            logger.debug('OFFLINE: removed dangling net {}'.format(node_net))
        else:
            await config_network_object(client, deauth, exit_net, node_id)
            cleanup_state_tries(ct.net_trie, ct.id_trie, exit_net, node_id, mbr_only=True)
            logger.debug('OFFLINE: deauthed node id {} from exit net {}'.format(node_id, exit_net))
            await delete_network_object(client, node_net)
            cleanup_state_tries(ct.net_trie, ct.id_trie, node_net, node_id)
            logger.debug('OFFLINE: removed network id {} and node {}'.format(node_net, node_id))

            await connect_mbr_node(client, src_node, src_net, exit_net, exit_node)
            publish_cfg_msg(ct.id_trie, src_node, addr='127.0.0.1')
    else:
        logger.warning('OFFLINE: node {} has missing net list {}'.format(node_id, node_nets))


async def update_mbr_data(client, net_trie, net_id, mbr_id):
    """
    Wrapper to update net state trie during bootstrap.  Loads net trie
    with new data (does not remove any stale trie data).
    :param client: ztcli_api client object
    :param net_trie: zt network/member data
    :param net_id: network ID
    :param mbr_id: node ID
    """

    # get network data and load net trie
    await get_network_object_data(client, net_id)
    logger.debug('loading network: {}'.format(net_id))
    net_trie[net_id] = client.data

    # get mbr data and load net trie
    await get_network_object_data(client, net_id, mbr_id)
    logger.debug('loading member: {}'.format(mbr_id))
    net_trie[net_id + mbr_id] = client.data


async def update_state_tries(client, net_trie, id_trie):
    """
    Wrapper to update ctlr state tries from ZT client API.  Loads net/id
    tries with new data (does not remove any stale trie data).
    :param client: ztcli_api client object
    :param net_trie: zt network/member data
    :param id_trie: network/node state
    """
    from node_tools.trie_funcs import load_id_trie

    await get_network_object_ids(client)
    logger.debug('{} networks found'.format(len(client.data)))
    net_list = client.data
    for net_id in net_list:
        mbr_list = []
        # get details about each network and update trie data
        await get_network_object_data(client, net_id)
        net_trie[net_id] = client.data
        await get_network_object_ids(client, net_id)
        logger.debug('network {} has {} possible member(s)'.format(net_id, len(client.data)))
        member_dict = client.data
        for mbr_id in member_dict.keys():
            # get details about each network member and update trie data
            await get_network_object_data(client, net_id, mbr_id)
            if client.data['authorized']:
                logger.debug('adding member: {}'.format(mbr_id))
                net_trie[net_id + mbr_id] = client.data
                load_id_trie(net_trie, id_trie, [], [mbr_id])
                mbr_list.append(mbr_id)
        load_id_trie(net_trie, id_trie, [net_id], mbr_list, nw=True)
        logger.debug('member key suffixes: {}'.format(net_trie.suffixes(net_id)))


async def unwrap_mbr_net(client, node_lst, boot_lst, min_nodes=5):
    """
    Wrapper for unwrapping the (closed) network when it gets too small.
    This should run in the ctlr state runner *after* the other handlers
    (and when there are not enough nodes to keep a closed network).
    :param client: ztcli_api client object
    :param node_lst: list of all active nodes
    :param boot_lst: list of bootstrap nodes
    :param min_nodes: minimum number of nodes for a closed network
    """
    from node_tools import ctlr_data as ct

    from node_tools.ctlr_funcs import unset_network_cfg
    from node_tools.network_funcs import publish_cfg_msg
    from node_tools.trie_funcs import cleanup_state_tries
    from node_tools.trie_funcs import find_dangling_nets
    from node_tools.trie_funcs import get_neighbor_ids
    from node_tools.trie_funcs import get_target_node_id

    if len(node_lst) < min_nodes and len(boot_lst) == 0:
        logger.debug('UNWRAP: creating bootstrap list from network {}'.format(node_lst))
        tgt_id = get_target_node_id(node_lst, boot_lst)
        tgt_net, tgt_exit_net, _, _ = get_neighbor_ids(ct.net_trie, tgt_id)
        # tgt_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, tgt_src_node)
        data_list = find_dangling_nets(ct.id_trie)
        exit_net = data_list[0]
        exit_node = data_list[1]
        deauth = unset_network_cfg()

        # detach and connect tgt node back to exit node
        await config_network_object(client, deauth, tgt_exit_net, tgt_id)
        cleanup_state_tries(ct.net_trie, ct.id_trie, tgt_exit_net, tgt_id, mbr_only=True)
        logger.debug('UNWRAP: deauthed node id {} from tgt exit net {}'.format(tgt_id, tgt_exit_net))

        await connect_mbr_node(client, tgt_id, tgt_net, exit_net, exit_node)
        publish_cfg_msg(ct.id_trie, tgt_id, addr='127.0.0.1')
    else:
        logger.debug('UNWRAP: num nodes at least {} so not unwrapping'.format(min_nodes))


async def add_network_object(client, net_id=None, mbr_id=None, ctlr_id=None):
    """
    Command wrapper for creating ZT objects under the `controller` endpoint.
    Required arguments are `client` and either one of the following:
        `net_id` *and* `mbr_id` to create a member object *or* just
        `ctlr_id` to create a network object.
    :param client: ztcli_api client object
    :param net_id: network ID endpoint path
    :param mbr_id: member ID endpoint path
    :param ctlr_id: network controller ID
    """
    from node_tools.ctlr_funcs import name_generator

    if net_id and mbr_id:
        endpoint = 'controller/network/{}/member/{}'.format(net_id, mbr_id)
        cfg_dict = {'': ''}
    elif ctlr_id:
        net_name = name_generator()
        endpoint = 'controller/network/{}'.format(ctlr_id + '______')
        cfg_dict = {'name': net_name}
    else:
        logger.error('One or more required arguments not found!')
        return

    await client.set_value(cfg_dict, endpoint)


async def config_network_object(client, cfg_dict, net_id, mbr_id=None):
    """
    Command wrapper for configuring ZT objects under the `controller` endpoint.
    Required arguments are `client` and `cfg_dict`, plus either one of
    the following:
        `net_id` *and* `mbr_id` to configure a member object *or* just
        `net_id` to configure a network object.
    :param client: ztcli_api client object
    :param cfg_dict: dictionary of configuration fragments
    :param net_id: network ID endpoint path
    :param mbr_id: member ID endpoint path
    """
    if mbr_id and net_id:
        endpoint = 'controller/network/{}/member/{}'.format(net_id, mbr_id)
    elif net_id:
        endpoint = 'controller/network/{}'.format(net_id)
    else:
        logger.error('One or more required arguments not found!')
        return

    await client.set_value(cfg_dict, endpoint)


async def delete_network_object(client, net_id, mbr_id=None):
    """
    Command wrapper for deleting ZT objects under the `controller` endpoint.
    Required arguments are `client` and either one of the following:
        `net_id` *and* `mbr_id` to delete a member object *or* just
        `net_id` to delete a network object.
    Warning: deleting a network object is permanent.
    :param client: ztcli_api client object
    :param net_id: network ID endpoint path
    :param mbr_id: member ID endpoint path
    """
    if mbr_id and net_id:
        endpoint = 'controller/network/{}/member/{}'.format(net_id, mbr_id)
    elif net_id:
        endpoint = 'controller/network/{}'.format(net_id)
    else:
        logger.error('One or more required arguments not found!')
        return

    await client.delete_thing(endpoint)


async def get_network_object_data(client, net_id, mbr_id=None):
    """
    Command wrapper for getting ZT network/member data under the
    `controller` endpoint.
    Required arguments are `client` and either one of the following:
        `net_id` *and* `mbr_id` to get member data *or* just
        `net_id` to get network data.
    :param client: ztcli_api client object
    :param net_id: network ID endpoint path
    :param mbr_id: member ID endpoint path
    """
    if mbr_id and net_id:
        endpoint = 'controller/network/{}/member/{}'.format(net_id, mbr_id)
    elif net_id:
        endpoint = 'controller/network/{}'.format(net_id)
    else:
        logger.error('One or more required arguments not found!')
        return

    await client.get_data(endpoint)


async def get_network_object_ids(client, net_id=None):
    """
    Command wrapper for getting ZT network/member objects under the
    `controller` endpoint.
    Required arguments are `client` and optionally `net_id` to get
    member IDs from a network.
    :param client: ztcli_api client object
    :param net_id: network ID endpoint path
    """
    if net_id:
        endpoint = 'controller/network/{}/member'.format(net_id)
    else:
        endpoint = 'controller/network'

    await client.get_data(endpoint)