matheuswhite/bluebees

View on GitHub
bluebees/client/node/commands/config.py

Summary

Maintainability
B
6 hrs
Test Coverage
from bluebees.client.node.node_data import node_name_list
from bluebees.client.application.application_data import ApplicationData, app_name_list
from bluebees.client.node.node_data import NodeData
from bluebees.client.network.network_data import NetworkData
from bluebees.common.file import file_helper
from bluebees.common.utils import check_hex_string, order, run_seq
from bluebees.client.data_paths import base_dir, node_dir, app_dir, net_dir
from bluebees.client.mesh_layers.element import Element
from bluebees.client.mesh_layers.mesh_context import SoftContext
from bluebees.client.mesh_layers.access_layer import check_opcode, check_parameters, \
                                            OpcodeLengthError, \
                                            OpcodeBadFormat, OpcodeReserved, \
                                            ParametersLengthError
from tqdm import tqdm
import click
import asyncio
import traceback
import ruamel


def validate_name(ctx, param, value):
    if not value:
        raise click.BadParameter('This option is required')
    if not node_name_list() or value not in node_name_list():
        raise click.BadParameter(f'The "{value}" node not exist')
    return value


def validate_app(value):
    if not app_name_list() or value not in app_name_list():
        raise click.BadParameter(f'The "{value}" application not exist')


def validate_model(value, index):
    try:
        id_ = value['id']
        index_order = str(index) + order(index)

        if type(id_) != str:
            raise click.BadParameter('The id value must be a hex string')
        elif not check_hex_string(id_):
            raise click.BadParameter(f'On {index_order} model, id is bad '
                                     f'formatting of hex string')
        elif len(id_) not in [4, 8]:
            raise click.BadParameter(f'On {index_order} model, the id '
                                     f'length must be 2 or 4 bytes')
    except KeyError:
        raise click.BadParameter('id keyword not found')

    try:
        if type(value['publish']) != str:
            raise click.BadParameter(f'On {index_order} model, the publish '
                                     f'value must be a hex string')
        elif not check_hex_string(value['publish']):
            raise click.BadParameter(f'On {index_order} model, bad formatting'
                                     f' of hex string, on publish value')
        elif len(value['publish']) != 4:
            raise click.BadParameter(f'On {index_order} model, the publish '
                                     f'value length must be 2 bytes')
    except KeyError:
        pass

    try:
        if type(value['subscribe']) != ruamel.yaml.comments.CommentedSeq:
            raise click.BadParameter(f'On {index_order} model, the subscribe '
                                     f'value must be a list of hex string')
        for i, addr in enumerate(value['subscribe']):
            if type(addr) != str:
                raise click.BadParameter(f'On {index_order} model, the '
                                         f'{i+1}{order(i+1)} subscribe value '
                                         f'must be a hex string')
            elif not check_hex_string(addr):
                raise click.BadParameter(f'On {index_order} model, bad '
                                         f'formatting of hex string, on '
                                         f'{i+1}{order(i+1)} subscribe value'
                                         f' model')
            elif len(addr) != 4:
                raise click.BadParameter(f'On {index_order} model, the '
                                         f'{i+1}{order(i+1)} subscribe value '
                                         f'length must be 2 bytes')
    except KeyError:
        pass

    try:
        if not app_name_list() or \
           value['application'] not in app_name_list():
            raise click.BadParameter(f'On {index_order} model, the '
                                     f'"{value["application"]}" application '
                                     f'not exist')
    except KeyError:
        raise click.BadParameter('application keyword not found')


def validate_single_cmd(file, index, value):
    try:
        opcode = value['opcode']
        try:
            if type(opcode) != str:
                raise click.BadParameter(f'On {file} file, the opcode of the'
                                         f'{index}{order(index)} command must '
                                         f'be a hex string')
            elif not check_hex_string(opcode):
                raise click.BadParameter(f'On {file} file, bad formatting of '
                                         f'opcode on the {index}{order(index)}'
                                         f' command')
            check_opcode(bytes.fromhex(opcode))
        except OpcodeLengthError:
            raise click.BadParameter(f'On {file} file, the length of opcode in'
                                     f' the {index}{order(index)} command must'
                                     f' be less than 3')
        except OpcodeBadFormat:
            raise click.BadParameter(f'On {file} file, the opcode in'
                                     f' the {index}{order(index)} command is '
                                     f'wrong')
        except OpcodeReserved:
            raise click.BadParameter(f'On {file} file, the opcode in'
                                     f' the {index}{order(index)} command is '
                                     f'wrong')
    except KeyError:
        raise click.BadParameter(f'On {file} file, the {index}{order(index)}'
                                 f' command not contains "opcode" keyword')

    try:
        parameters = value['parameters']
        try:
            if type(parameters) != str:
                raise click.BadParameter(f'On {file} file, the parameters of '
                                         f'the {index}{order(index)} command '
                                         f'must be a hex string')
            elif not check_hex_string(parameters):
                raise click.BadParameter(f'On {file} file, bad formatting of '
                                         f'parameters on the '
                                         f'{index}{order(index)} command')
            check_parameters(bytes.fromhex(opcode), bytes.fromhex(parameters))
        except ParametersLengthError:
            raise click.BadParameter(f'On {file} file, the length of '
                                     f'parameters in the {index}{order(index)}'
                                     f' command is wrong')
    except KeyError:
        raise click.BadParameter(f'On {file} file, the {index}{order(index)}'
                                 f' command not contains "parameters" keyword')

    try:
        application = value['application']
        if type(application) != str:
            raise click.BadParameter(f'On {file} file, the application of '
                                     f'the {index}{order(index)} command '
                                     f'must be a string')
        elif not app_name_list() or application not in app_name_list():
            raise click.BadParameter(f'On {file} file, the application of '
                                     f'the {index}{order(index)} command '
                                     f'not exist')
    except KeyError:
        pass

    try:
        tries = value['tries']
        if type(tries) != int:
            raise click.BadParameter(f'On {file} file, the tries of '
                                     f'the {index}{order(index)} command '
                                     f'must be a integer')
        elif tries <= 0:
            raise click.BadParameter(f'On {file} file, the tries of '
                                     f'the {index}{order(index)} command '
                                     f'must be non-zero and positive')
    except KeyError:
        pass


def validate_post_cmds(value):
    file = file_helper.read(value)
    if not file:
        raise click.BadParameter(f'File "{value}" not found')

    try:
        commands = file['commands']
        if type(commands) != ruamel.yaml.comments.CommentedSeq:
            raise click.BadParameter(f'On {value} file, The "commands" must be'
                                     f' a list')
        for i, cmd in enumerate(commands):
            validate_single_cmd(value, i + 1, cmd)
    except KeyError:
        raise click.BadParameter(f'Not found "commands" keyword on {value}'
                                 f' file')


def parse_config(ctx, param, value):
    if not value:
        raise click.BadParameter('This option is required')
    else:
        cfg = file_helper.read(value)
        if not cfg:
            raise click.BadParameter(f'File "{value}" not found')

        try:
            if type(cfg['applications']) != ruamel.yaml.comments.CommentedSeq:
                raise click.BadParameter(f'The "applications" must be a list')

            for app in cfg['applications']:
                validate_app(app)
        except KeyError:
            cfg['applications'] = []

        try:
            for i, model in enumerate(cfg['models']):
                validate_model(model, i + 1)
        except KeyError:
            cfg['models'] = []

        try:
            if type(cfg['post_cmds']) != ruamel.yaml.comments.CommentedSeq:
                raise click.BadParameter(f'The "post_cmds" must be a list')

            for cmd_file in cfg['post_cmds']:
                validate_post_cmds(cmd_file)
        except KeyError:
            cfg['post_cmds'] = []

        return cfg, value


async def appkey_add(client_element: Element, target: str,
                     application: str) -> bool:
    node_data = NodeData.load(base_dir + node_dir + target + '.yml')
    app_data = ApplicationData.load(base_dir + app_dir + application + '.yml')
    net_data = NetworkData.load(base_dir + net_dir + app_data.network + '.yml')

    net_key = int.from_bytes(net_data.key_index, 'big')
    app_key = int.from_bytes(app_data.key_index, 'big')
    key_index = (net_key | (app_key << 12)).to_bytes(3, 'big')[::-1]

    opcode = b'\x00'
    r_opcode = b'\x80\x03'
    parameters = key_index + app_data.key

    context = SoftContext(src_addr=b'\x00\x01', dst_addr=node_data.addr,
                          node_name=node_data.name,
                          network_name=node_data.network,
                          application_name='',
                          is_devkey=True, ack_timeout=3, segment_timeout=3)

    success = await client_element.send_message(opcode=opcode,
                                                parameters=parameters,
                                                ctx=context)
    if not success:
        return False

    r_content = await client_element.recv_message(opcode=r_opcode,
                                                  segment_timeout=1,
                                                  timeout=3, ctx=context)

    if r_content:
        if r_content[0] == 0 and r_content[1:] == key_index:
            if app_data.name not in node_data.apps:
                node_data.apps.append(app_data.name)
                node_data.save()

            if node_data.name not in app_data.nodes:
                app_data.nodes.append(node_data.name)
                app_data.save()

            return True

    return False


async def model_app_bind(client_element: Element, target: str, model_id: bytes,
                         application: str) -> bool:
    node_data = NodeData.load(base_dir + node_dir + target + '.yml')
    app_data = ApplicationData.load(base_dir + app_dir + application + '.yml')

    key_index = app_data.key_index

    opcode = b'\x80\x3d'
    r_opcode = b'\x80\x3e'
    parameters = node_data.addr[::-1] + key_index[::-1] + model_id[::-1]

    context = SoftContext(src_addr=b'\x00\x01', dst_addr=node_data.addr,
                          node_name=node_data.name,
                          network_name=node_data.network,
                          application_name='',
                          is_devkey=True, ack_timeout=3, segment_timeout=3)

    success = await client_element.send_message(opcode=opcode,
                                                parameters=parameters,
                                                ctx=context)
    if not success:
        return False

    r_content = await client_element.recv_message(opcode=r_opcode,
                                                  segment_timeout=1,
                                                  timeout=3, ctx=context)

    if r_content:
        if r_content[0] == 0 and r_content[1:] == parameters:
            return True

    return False


async def model_publication_set(client_element: Element, target: str,
                                model_id: bytes, addr: bytes,
                                application: str) -> bool:
    node_data = NodeData.load(base_dir + node_dir + target + '.yml')
    app_data = ApplicationData.load(base_dir + app_dir + application + '.yml')

    key_index = app_data.key_index
    default_ttl = b'\x07'
    period = b'\x00'
    default_xmit_int = b'\x40'

    opcode = b'\x03'
    r_opcode = b'\x80\x19'
    parameters = node_data.addr[::-1] + addr[::-1] + key_index[::-1] + \
        default_ttl[::-1] + period[::-1] + default_xmit_int[::-1] + \
        model_id[::-1]

    context = SoftContext(src_addr=b'\x00\x01', dst_addr=node_data.addr,
                          node_name=node_data.name,
                          network_name=node_data.network,
                          application_name='',
                          is_devkey=True, ack_timeout=3, segment_timeout=3)

    success = await client_element.send_message(opcode=opcode,
                                                parameters=parameters,
                                                ctx=context)
    if not success:
        return False

    r_content = await client_element.recv_message(opcode=r_opcode,
                                                  segment_timeout=1,
                                                  timeout=3, ctx=context)

    if r_content:
        if r_content[0] == 0 and r_content[1:] == parameters:
            return True

    return False


async def model_subscription_add(client_element: Element, target: str,
                                 model_id: bytes, addr: bytes) -> bool:
    node_data = NodeData.load(base_dir + node_dir + target + '.yml')

    opcode = b'\x80\x1b'
    r_opcode = b'\x80\x1f'
    parameters = node_data.addr[::-1] + addr[::-1] + model_id[::-1]

    context = SoftContext(src_addr=b'\x00\x01', dst_addr=node_data.addr,
                          node_name=node_data.name,
                          network_name=node_data.network,
                          application_name='',
                          is_devkey=True, ack_timeout=3, segment_timeout=3)

    success = await client_element.send_message(opcode=opcode,
                                                parameters=parameters,
                                                ctx=context)
    if not success:
        return False

    r_content = await client_element.recv_message(opcode=r_opcode,
                                                  segment_timeout=1,
                                                  timeout=3, ctx=context)

    if r_content:
        if r_content[0] == 0 and r_content[1:] == parameters:
            return True

    return False


async def send_cmd(client_element: Element, target: str, opcode: bytes,
                   parameters: bytes, application: str):
    node_data = NodeData.load(base_dir + node_dir + target + '.yml')

    app_name = application if application else node_data.apps[0]

    context = SoftContext(src_addr=b'\x00\x01', dst_addr=node_data.addr,
                          node_name=node_data.name,
                          network_name=node_data.network,
                          application_name=app_name,
                          is_devkey=False, ack_timeout=3, segment_timeout=3)

    success = await client_element.send_message(opcode=opcode,
                                                parameters=parameters,
                                                ctx=context)
    return success


async def config_task(target: str, client_element: Element, config: dict):
    tries = 3
    total_steps = len(config['applications'])
    for m in config['models']:
        total_steps += 1
        if 'publication' in m.keys():
            total_steps += 1
        if 'subscription' in m.keys():
            total_steps += len(m['subscription'])
    for cmd_file in config['post_cmds']:
        file = file_helper.read(cmd_file)
        total_steps += len(file['commands'])

    with tqdm(range(total_steps)) as pbar:
        success = True

        for app in config['applications']:
            for t in range(tries):
                success = await appkey_add(client_element, target, app)
                if success:
                    break
            if not success:
                click.echo(click.style(f'\nError on appkey add. Application: '
                                       f'{app}', fg='red'))
                return None
            pbar.update(1)

        for model in config['models']:
            for t in range(tries):
                success = await model_app_bind(client_element, target,
                                               bytes.fromhex(model['id']),
                                               model['application'])
                if success:
                    break
            if not success:
                click.echo(click.style(f'\nError on model app bind. '
                                       f'Model id: 0x{model["id"]}', fg='red'))
                return None
            pbar.update(1)

            if 'publication' in model:
                for t in range(tries):
                    success = await model_publication_set(
                        client_element, target, bytes.fromhex(model['id']),
                        bytes.fromhex(model['publication']),
                        model['application'])
                    if success:
                        break
                if not success:
                    click.echo(click.style(f'\nError on model publication set.'
                                           f' Model id: 0x{model["id"]}',
                                           fg='red'))
                    return None
                pbar.update(1)

            if 'subscription' in model:
                for s in model['subscription']:
                    for t in range(tries):
                        success = await model_subscription_add(
                            client_element, target, bytes.fromhex(model['id']),
                            bytes.fromhex(s))
                        if success:
                            break
                    if not success:
                        click.echo(click.style(f'\nError on model subscription'
                                               f' add. Model id: 0x'
                                               f'{model["id"]}', fg='red'))
                        return None
                    pbar.update(1)

        for post_cmd in config['post_cmds']:
            file = file_helper.read(post_cmd)
            for cmd in file['commands']:
                tries = cmd['tries'] if 'tries' in cmd.keys() else 1
                application = cmd['application'] if \
                    'application' in cmd.keys() else ''
                for t in range(tries):
                    success = await send_cmd(client_element, target,
                                             bytes.fromhex(cmd['opcode']),
                                             bytes.fromhex(cmd['parameters']),
                                             application)
                    await asyncio.sleep(.1)
                if not success:
                    click.echo(click.style(f'\nError on send cmd. cmd file: '
                                           f'{post_cmd}', fg='red'))
                    return None
                pbar.update(1)


@click.command()
@click.option('--name', '-n', type=str, default='', required=True,
              help='Specify the name of node', callback=validate_name)
@click.option('--config', '-c', type=str, default='', required=True,
              help='Specify a YAML config file. A file example is shown in'
                   ' node_config.yml', callback=parse_config)
def config(name, config):
    '''Set the node configuration'''

    click.echo(click.style(f'Config "{name}" node using {config[1]} config '
                           f'file', fg='green'))

    try:
        loop = asyncio.get_event_loop()
        client_element = Element()

        run_seq_t = run_seq([
            client_element.spwan_tasks(loop),
            config_task(name, client_element, config[0])
        ])
        loop.run_until_complete(run_seq_t)
    except KeyboardInterrupt:
        click.echo(click.style('Interruption by user', fg='yellow'))
    except RuntimeError:
        click.echo('Runtime error')
    except Exception:
        click.echo(traceback.format_exc())
    finally:
        client_element.disconnect()
        tasks_running = asyncio.Task.all_tasks()
        for t in tasks_running:
            t.cancel()
        loop.stop()