intracom-telecom-sdn/multinet

View on GitHub
util/multinet_requests.py

Summary

Maintainability
A
2 hrs
Test Coverage
"""
Multinet utility functions to communicate with the master and the worker machines
"""

import requests
import sys
import multiprocessing
import json
import time
import logging
import argparse


logging.getLogger().setLevel(logging.DEBUG)

def parse_arguments():
    """Reads the arguments passed from command line.

    Command line Args:
      --json-config (str): Compulsory argument. The path to the JSON
      configuration file.
      --serial-requests (bool): Optional argument. Defines if the requests
      will be sent from the master to the workers parallely, simultaneusly to
      all workers, or serially, one worker each time.

    Returns:
      collection: An object containing the values of all arguments.
    """
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    parser.add_argument('--json-config',
                        required=True,
                        type=str,
                        dest='json_config',
                        action='store',
                        help='Configuration file (JSON)')

    parser.add_argument('--serial-requests',
                        required=False,
                        dest='is_serial',
                        action='store_true',
                        default=False,
                        help='Is request in serial execution mode')

    args = parser.parse_args()
    return args


def parse_json_conf(json_config):
    """Parse a JSON configuration file.
    The path to this file is given from command line argument --json-config
    and passed as an argument in this function.

    Args:
      json_config (str): The path to the JSON configuration file

    Returns:
      dict: The parsed json configuration
    """

    conf = {}
    with open(json_config) as conf_file:
        conf = json.load(conf_file)
    return conf


def dpid_offset_range(num_vms):
    """Generate a range of dpid dpid_offset_list
    Every VM has allocates 1000 unique dpid offsets

    Args:
      num_vms (int): The number of virtual machines
      topo_size (int): The number of topology switches
    Returns:
      list: The dpid offset range
    """
    return [i for i in xrange(0, num_vms)]


def make_post_request(host_ip, host_port, route, data=None):
    """Make a POST request
    Make a POST request to a remote REST server and log the response

    Args:
      host_ip (str): The ip of the remote REST server
      host_port (int): The port of the remote REST server
      route (str): The REST API endpoint
      data (dict): A dictionary or a list with any additional data

    Returns:
      requests.models.Response: The HTTP response for the performed request
    """
    session = requests.Session()
    session.trust_env = False

    url = 'http://{0}:{1}/{2}'.format(host_ip, host_port, route)
    route_name = route.split('/')[0]
    logging.info('[{0}_topology_handler][url] {1}'.format(route_name, url))
    if data is None:
        post_call = session.post(url, timeout=None)
    else:
        headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
        post_call = session.post(
            url,
            data=json.dumps(data),
            headers=headers, timeout=None)
    logging.info('[{0}_topology_handler][response status code] {1}'.
          format(route_name, post_call.status_code))
    logging.info('[{0}_topology_handler][response data] {1}'.
          format(route_name, post_call.text))
    responce = {'status_code':post_call.status_code, 'text':post_call.text}
    post_call.close()
    return responce


def make_post_request_runner(host_ip, host_port, route, data, queue):
    """Wrapper function to create a new job for each POST request.
    Make a POST request and put the response in a queue.
    Used for multiprocessing.

    Args:
      host_ip (str): The IP address of the REST server
      host_port (int): The port of the REST server
      route (str): The REST API endpoint
      data (str): Any additional JSON data
      queue (multiprocessing.Queue): The queue where all the responses are stored
    """
    queue.put(make_post_request(host_ip, host_port, route, data))
    return 0


def handle_post_request(post_call, exit_on_fail=True):
    """Handle the response of a REST request
    If the status code is not successful and the caller specifies so, sys.exit
    Else log the response text

    Args:
      post_call (requests.models.Response): The response to handle
      exit_on_fail (Optional[bool]): True -> Exit on error status code / False -> continue
    """
    failed_post_call = post_call['status_code'] >= 300 or post_call['status_code'] < 200
    if failed_post_call and exit_on_fail:
        sys.exit(post_call['status_code'])
    else:
        logging.debug(post_call['text'])


def broadcast_cmd(worker_ip_list, worker_port_list, opcode, data=None):
    """Broadcast a POST request to all the workers
    Use multiple processes to send POST requests to a specified
    endpoint of all the workers simultaneously.

    Args:
      worker_ip_list (list): A list of IP addresses to broadcast the POST request
      worker_port (int): The port of the workers
      opcode (str): The REST API endpoint
      topo_size (int): The number of topology switches
      data (dict): JSON data to go with the request

    Returns:
      list: A list of responses for all the POST requests performed
    """

    if data is not None and 'is_serial' in data:
        is_serial = data['is_serial']
    else:
        logging.info('[{0}] POST data is None. Setting is_serial to False'.
                     format(opcode))
        is_serial = False

    if opcode == 'init':
        dpid_offset_list = dpid_offset_range(len(worker_ip_list))
        offset_idx = 0

    processes = []
    result_queue = multiprocessing.Queue()

    for worker_ip, worker_port in zip(worker_ip_list, worker_port_list):
        if opcode == 'init':
            data['dpid_offset'] = dpid_offset_list[offset_idx]
            offset_idx += 1

        if is_serial:
            # Serial send REST requests to workers
            logging.info('[{0}] is running in serial mode'.format(opcode))
            processes.append(make_post_request(worker_ip, worker_port, opcode,
                                               data))
        else:
            # Parallel send REST requests to workers
            logging.info('[{0}] is running in parallel mode'.format(opcode))
            process = multiprocessing.Process(target=make_post_request_runner,
                                              args=(worker_ip,
                                                    worker_port,
                                                    opcode,
                                                    data,
                                                    result_queue,))
            processes.append(process)
            process.start()

    if is_serial:
        return processes
    else:
        for process in processes:
            process.join()
        return [result_queue.get() for _ in processes]



def aggregate_broadcast_response(responses):
    """Perform an aggregation on a list of HTTP responses
    If all the responses status code is successful return 200 else return 500
    Gather all the responses text in a list

    Args:
      responses (list): A list of HTTP responses

    Returns:
      status (int): The aggregate status code
      body (list): The list of all the responses text
    """
    status = 200 if all(
        r['status_code'] >= 200 and r['status_code'] < 300 for r in responses) else 500
    body = json.dumps([r['text'] for r in responses])
    return status, body


def master_cmd(master_ip, master_port, opcode, data=None):
    """Wrapper function to send a command to the master

    Args:
      master_ip (str): The IP address of the master
      master_port (int): The port of the master
      opcode (str): The REST API endpoint (the command we want to send)

    Returns:
      requests.models.Response: The HTTP response for the performed request
    """
    return make_post_request(master_ip, master_port, opcode, data)