stress_test/monitor.py
File `monitor.py` has 971 lines of code (exceeds 250 allowed). Consider refactoring.# Copyright (c) 2016 Intracom S.A. Telecom Solutions. All rights reserved.## This program and the accompanying materials are made available under the# terms of the Eclipse Public License v1.0 which accompanies this distribution,# and is available at http://www.eclipse.org/legal/epl-v10.html """ Monitor Class- All monitoring-related functionality is here""" import geventimport gevent.queueimport jsonimport loggingimport reimport queueimport subprocessimport timeimport util.sysstats class Monitor: """ All monitor- related functionality is here """ def __init__(self, controller): """ Creates a Monitor. Options from JSON input file :param controller: object of the Controller class :type controller: object """ self.controller = controller self.global_sample_id = 0 self.repeat_id = 0 self.test_repeats = 0 def system_results(self): """ Collect runtime statistics :returns: experiment statistics in dictionary :rtype: dict """ system_statistics = {} system_statistics['total_memory_bytes'] = \ util.sysstats.sys_total_memory_bytes(self.controller._ssh_conn) system_statistics['controller_cwd'] = \ util.sysstats.proc_cwd(self.controller.pid, self.controller._ssh_conn) system_statistics['controller_java_xopts'] = \ util.sysstats.get_java_options(self.controller.pid, self.controller._ssh_conn) system_statistics['timestamp'] = \ int(subprocess.check_output('date +%s', shell=True, universal_newlines=True).strip()) system_statistics['date'] = \ subprocess.check_output('date', shell=True, universal_newlines=True).strip() system_statistics['used_memory_bytes'] = \ util.sysstats.sys_used_memory_bytes(self.controller._ssh_conn) system_statistics['free_memory_bytes'] = \ util.sysstats.sys_free_memory_bytes(self.controller._ssh_conn) system_statistics['controller_cpu_system_time'] = \ util.sysstats.proc_cpu_system_time(self.controller.pid, self.controller._ssh_conn) system_statistics['controller_cpu_user_time'] = \ util.sysstats.proc_cpu_user_time(self.controller.pid, self.controller._ssh_conn) system_statistics['controller_vm_size'] = \ util.sysstats.proc_vm_size(self.controller.pid, self.controller._ssh_conn) system_statistics['controller_num_fds'] = \ util.sysstats.proc_num_fds(self.controller.pid, self.controller._ssh_conn) system_statistics['controller_num_threads'] = \ util.sysstats.proc_num_threads(self.controller.pid, self.controller._ssh_conn) system_statistics['one_minute_load'] = \ util.sysstats.sys_load_average(self.controller._ssh_conn)[0] system_statistics['five_minute_load'] = \ util.sysstats.sys_load_average(self.controller._ssh_conn)[1] system_statistics['fifteen_minute_load'] = \ util.sysstats.sys_load_average(self.controller._ssh_conn)[2] return system_statistics class Oftraf: """ Oftraf related monitoring """ def __init__(self, controller, oftraf): """ Creates an oftraf monitor object. :param controller: object of the Controller class :param oftraf: object of the Oftraf class :type controller: object :type oftraf: object """ self.oftraf = oftraf self.controller = controller self.exit_flag = False self.results_queue = gevent.queue.Queue(maxsize=1) def of_monitor_thread(self): """ Function executed inside a thread and returns the output in json format, of openflow packets counts """ try: while self.exit_flag is False: oftraf_interval_sec = self.oftraf.interval_ms / 1000 logging.info('[oftraf_monitor_thread] Waiting for {0} seconds.' .format(oftraf_interval_sec)) gevent.sleep(oftraf_interval_sec) logging.info('[oftraf_monitor_thread] ' 'get throughput of controller') response_data = \ json.loads(self.oftraf.oftraf_get_of_counts()) tcp_out_traffic = tuple(response_data['TCP_OF_out_counts']) tcp_in_traffic = tuple(response_data['TCP_OF_in_counts']) out_traffic = tuple(response_data['OF_out_counts']) in_traffic = tuple(response_data['OF_in_counts']) results = {'of_out_traffic': out_traffic, 'of_in_traffic': in_traffic, 'tcp_of_out_traffic': tcp_out_traffic, 'tcp_of_in_traffic': tcp_in_traffic} self.exit_flag = True except: logging.error('[oftraf.monitor_thread] Error monitor thread ' 'failed.') results = {'of_out_traffic': (0, 0), 'of_in_traffic': (0, 0), 'tcp_of_out_traffic': (0, 0), 'tcp_of_in_traffic': (0, 0)} exit() self.results_queue.put(results) def monitor_run_oftraf(self): """ This monitor function is used to collect the results from of_monitor_thread function :returns: Returns the results from the gevent queue :rtype: dict """ # Parallel section self.exit_flag = False monitor_thread = gevent.spawn(self.of_monitor_thread) res = self.results_queue.get(block=True) gevent.joinall([monitor_thread]) gevent.killall([monitor_thread]) return res class Mtcbench(Monitor): """ MTCbench- related monitoring. Subclass of Monitor superclass """ def __init__(self, controller, emulator): """ Creates a MTCbench monitor object. :param controller: object of the Controller class :param emulator: object of the SBEmu subclass :type controller: object :type emulator: object """ super(self.__class__, self).__init__(controller) self.emulator = emulator self.result_queue = gevent.queue.Queue() self.term_success = '__successful_termination__' self.term_fail = '__failed_termination__' self.data_queue = gevent.queue.Queue() def monitor_results_active(self): """ This monitor function is used from south bound active mtcbench \ tests to collect the related key results :returns: Returns the dictionary with the results included into JSON \ input file :rtype: dict """ results = self.system_results() results['global_sample_id'] = \ self.global_sample_id self.global_sample_id += 1 results['repeat_id'] = self.repeat_id results['mtcbench_simulated_hosts'] = \ self.emulator.simulated_hosts results['mtcbench_switches'] = \ self.emulator.get_overall_topo_size() results['mtcbench_threads'] = \ self.emulator.threads results['mtcbench_switches_per_thread'] = \ self.emulator.switches_per_thread results['mtcbench_thread_creation_delay_ms'] = \ self.emulator.thread_creation_delay_ms results['mtcbench_delay_before_traffic_ms'] = \ self.emulator.delay_before_traffic_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['test_repeats'] = self.test_repeats results['controller_node_ip'] = self.controller.ip results['controller_port'] = \ str(self.controller.of_port) results['mtcbench_mode'] = self.emulator.mode results['mtcbench_ms_per_test'] = \ self.emulator.ms_per_test results['mtcbench_internal_repeats'] = \ self.emulator.internal_repeats results['mtcbench_warmup'] = self.emulator.warmup return results def monitor_results_idle(self): """ This monitor function is used from south bound idle mtcbench \ tests to collect the related key results :returns: Returns the dictionary with the results included into JSON \ input file :rtype: dict """ results = self.system_results() results['global_sample_id'] = self.global_sample_id self.global_sample_id += 1 results['mtcbench_simulated_hosts'] = \ self.emulator.simulated_hosts results['mtcbench_switches'] = self.emulator.get_overall_topo_size() results['mtcbench_threads'] = self.emulator.threads results['mtcbench_switches_per_thread'] = \ self.emulator.switches_per_thread results['mtcbench_thread_creation_delay_ms'] = \ self.emulator.thread_creation_delay_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['mtcbench_delay_before_traffic_ms'] = \ self.emulator.delay_before_traffic_ms results['controller_node_ip'] = self.controller.ip results['controller_port'] = self.controller.of_port results['mtcbench_mode'] = self.emulator.mode results['mtcbench_ms_per_test'] = self.emulator.ms_per_test results['mtcbench_internal_repeats'] = \ self.emulator.internal_repeats results['mtcbench_warmup'] = self.emulator.warmup return results Function `monitor_thread_idle` has a Cognitive Complexity of 16 (exceeds 5 allowed). Consider refactoring.
Cyclomatic complexity is too high in method monitor_thread_idle. (7) def monitor_thread_idle(self, boot_start_time): """ This monitor function is used from south bound idle mtcbench \ tests to put into gevent queue the results during test running :param boot_start_time: The time we begin starting topology switches :type boot_start_time: int """ discovery_deadline = 120 expected_switches = self.emulator.get_overall_topo_size() topology_bootup_time_ms = self.emulator.get_topo_bootup_ms() sleep_before_discovery = float(topology_bootup_time_ms) / 1000 logging.info('[monitor_thread_idle] Monitor thread started') t_start = boot_start_time logging.info('[monitor_thread_idle] Starting discovery') previous_discovered_switches = 0 discovered_switches = 0 time.sleep(sleep_before_discovery) t_discovery_start = time.time() error_code = 0 max_discovered_switches = 0 while True: if (time.time() - t_discovery_start) > discovery_deadline: error_code = 201 logging.info( '[monitor_thread_idle] Deadline of {0} seconds passed, ' 'discovered {1} switches.'.format(discovery_deadline, discovered_switches)) discovery_time = time.time() - t_start - discovery_deadline results = self.monitor_results_idle() results['bootup_time_secs'] = discovery_time results['discovered_switches'] = discovered_switches results['max_discovered_switches'] = max_discovered_switches results['discovered_switches_error_code'] = error_code results['successful_bootup_time'] = -1 self.result_queue.put([results]) return 0 else: new_ssh = self.controller.init_ssh() discovered_switches = \ self.controller.get_oper_switches(new_ssh) if discovered_switches == -1: discovered_switches = previous_discovered_switches if discovered_switches > max_discovered_switches: max_discovered_switches = discovered_switches if discovered_switches != previous_discovered_switches: t_discovery_start = time.time() previous_discovered_switches = discovered_switches if discovered_switches == expected_switches: delta_t = time.time() - t_start logging.info( '[monitor_thread_idle] {0} switches found in ' '{1} seconds'. format(discovered_switches, delta_t)) results = self.monitor_results_idle() results['bootup_time_secs'] = delta_t results['discovered_switches'] = discovered_switches results['max_discovered_switches'] = \ max_discovered_switches results['discovered_switches_error_code'] = error_code results['successful_bootup_time'] = delta_t self.result_queue.put([results]) return 0 gevent.sleep(1) Function `monitor_thread_active` has a Cognitive Complexity of 18 (exceeds 5 allowed). Consider refactoring.
Cyclomatic complexity is too high in method monitor_thread_active. (8) def monitor_thread_active(self): """ This monitor function is used from south bound active mtcbench \ tests to put into gevent queue the results during test running """ internal_repeat_id = 0 logging.info('[monitor_thread_active] monitor thread started') # will hold samples taken in the lifetime of this thread test_samples = [] while True: try: # read messages from queue while TERM_SUCCESS has not been sent line = self.data_queue.get(block=True) if line == self.term_success: logging.info('[monitor_thread_active] successful ' 'termination string returned. Returning ' 'samples and exiting.') self.result_queue.put(test_samples) return else: # look for lines containing a substring like e.g. # 'total = 1.2345 per ms' match = re.search(r'total = (.+) per ms', line) if match is not None or line == self.term_fail: results = self.monitor_results_active() if line == self.term_fail: logging.info('[monitor_thread] returned failed ' 'termination ' 'string returning gathered samples ' 'and exiting.') results['throughput_responses_sec'] = -1 results['internal_repeat_id'] = internal_repeat_id test_samples.append(results) self.result_queue.put(test_samples) return if match is not None: # extract the numeric portion from the above regex results['throughput_responses_sec'] = \ float(match.group(1)) * 1000.0 results['internal_repeat_id'] = internal_repeat_id test_samples.append(results) internal_repeat_id += 1 except queue.Empty as exept: logging.error('[monitor_thread_active] {0}'.format(str(exept))) self.result_queue.put(test_samples) return gevent.sleep(0.5) def monitor_run(self, boot_start_time=None): """ This monitor function is used from both south bound active and idle mtcbench tests to get the results from gevent queue :param boot_start_time: The time we begin starting topology switches :returns: Returns a dictionary, including all the results :rtype: dict :type boot_start_time: int """ logging.info('[MTCbench.monitor_run] creating and starting' ' monitor and MTCbench threads.') # Consumer - producer threads (mtcbench_thread is the producer, # monitor_thread is the consumer) threads = [] if boot_start_time is None: logging.info('[MTCbench.monitor_run] active test monitor is ' 'running') monitor_thread = gevent.spawn(self.monitor_thread_active) threads.append(monitor_thread) mtcbench_thread = gevent.spawn(self.mtcbench_thread, True, self.data_queue) threads.append(mtcbench_thread) else: logging.info('[MTCbench.monitor_run] idle test monitor is running') self.mtcbench_thread(False, None) monitor_thread = \ gevent.spawn(self.monitor_thread_idle, boot_start_time) threads.append(monitor_thread) gevent.joinall(threads) samples = self.result_queue.get() gevent.killall(threads) return samples def mtcbench_thread(self, block_flag=True, data_queue=None): """ Function used to execute MTCBench thread :param block_flag: It is used as a flag. When it is True the emulator \ run will wait for the completition of MTcbench thread running :param data_queue: If not None the results are written into the \ data_queue line by line. In case of None the results are written \ into standard output :returns: Returns a dictionary, including all the results :rtype: dict :type block_flag: boolean :type data_queue: queue """ logging.info('[MTCbench.mtcbench_thread] MTCbench thread started') try: self.emulator.run(self.controller.ip, self.controller.of_port, '[MTCbench.mtcbench_thread]', data_queue, False, block_flag, False) # mtcbench ended, enqueue termination message if data_queue is not None: data_queue.put_nowait(self.term_success) logging.info('[MTCbench.mtcbench_thread] MTCbench thread ended ' 'successfully') except: if data_queue is not None: data_queue.put_nowait(self.term_fail) logging.error('[MTCbench.mtcbench_thread] Exception: ' 'MTCbench_thread exited with error.') return 0 class Multinet(Monitor): """ Multinet- related monitoring. Subclass of Monitor superclass """ def __init__(self, controller, oftraf, emulator): """ Creates a Multinet monitor object. :param controller: object of the Controller class :param oftraf: object of the Oftraf class :param emulator: object of the SBEmu subclass :type controller: object :type oftraf: object :type emulator: object """ Monitor.__init__(self, controller) self.oftraf_node = oftraf self.emulator = emulator self.result_queue = gevent.queue.Queue() Cyclomatic complexity is too high in method monitor_run. (7)
Function `monitor_run` has a Cognitive Complexity of 7 (exceeds 5 allowed). Consider refactoring. def monitor_run(self, reference_results=None, sample_id=None, boot_start_time=None): """ This monitor function is used from both south bound active and idle \ multinet tests to get the results from gevent queue :param reference_results: The results returned from the just previous \ iteration of the test. Used in the frame of a stability test :param sample_id: The id of the sample running. Used in the frame of \ a stability test :param boot_start_time: The time we begin starting topology switches :returns: Returns a dictionary, including all the results :rtype: dict :type reference_results: dict :type sample_id: int :type boot_start_time: int """ logging.info('[Multinet.monitor_run] creating and starting' ' monitoring of Multinet worker events.') if boot_start_time is None and sample_id is None: logging.info('[Multinet.monitor_run] Active test monitor is ' 'running') monitor_thread = gevent.spawn(self.monitor_thread_active) elif self.oftraf_node is None: logging.info('[Multinet.monitor_run] Idle scalability test ' 'monitor is running') monitor_thread = \ gevent.spawn(self.monitor_thread_idle_scalability, boot_start_time) else: logging.info('[Multinet.monitor_run] Idle test stability ' 'monitor is running') monitor_thread = \ gevent.spawn(self.monitor_thread_idle_stability, reference_results, sample_id) gevent.joinall([monitor_thread]) total_results = self.result_queue.get() gevent.killall([monitor_thread]) if boot_start_time is None and sample_id is None: return total_results elif self.oftraf_node is None: return total_results else: return (total_results["current_sample"], total_results["previous_sample"]) Function `monitor_thread_idle_scalability` has a Cognitive Complexity of 16 (exceeds 5 allowed). Consider refactoring.
Cyclomatic complexity is too high in method monitor_thread_idle_scalability. (7) def monitor_thread_idle_scalability(self, boot_start_time): """ This monitor function is used from both idle scalability multinet tests tests to put into gevent queue the results during test running :param boot_start_time: The time we begin starting topology switches :type boot_start_time: int """ discovery_deadline = 120 expected_switches = self.emulator.get_overall_topo_size() topology_bootup_time_ms = self.emulator.get_topo_bootup_ms() sleep_before_discovery = float(topology_bootup_time_ms) / 1000 logging.info('[monitor_thread_idle] Monitor thread started') t_start = boot_start_time logging.info('[monitor_thread_idle] Starting discovery') previous_discovered_switches = 0 discovered_switches = 0 time.sleep(sleep_before_discovery) t_discovery_start = time.time() error_code = 0 max_discovered_switches = 0 while True: results = self.system_results() results['global_sample_id'] = self.global_sample_id self.global_sample_id += 1 results['multinet_workers'] = len(self.emulator.workers_ips) results['multinet_worker_topo_size'] = self.emulator.topo_size results['multinet_topology_type'] = self.emulator.topo_type results['multinet_hosts_per_switch'] = \ self.emulator.topo_hosts_per_switch results['multinet_group_size'] = self.emulator.topo_group_size results['multinet_group_delay_ms'] = \ self.emulator.topo_group_delay_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['controller_node_ip'] = self.controller.ip results['controller_port'] = str(self.controller.of_port) if (time.time() - t_discovery_start) > discovery_deadline: error_code = 201 logging.info( '[monitor_thread_idle] Deadline of {0} seconds passed, ' 'discovered {1} switches.'.format(discovery_deadline, discovered_switches)) discovery_time = time.time() - t_start - discovery_deadline results['multinet_size'] = \ self.emulator.topo_size * len(self.emulator.workers_ips) results['bootup_time_secs'] = discovery_time results['discovered_switches'] = discovered_switches results['max_discovered_switches'] = max_discovered_switches results['discovered_switches_error_code'] = error_code results['successful_bootup_time'] = -1 self.result_queue.put([results]) return 0 else: new_ssh = self.controller.init_ssh() discovered_switches = \ self.controller.get_oper_switches(new_ssh) logging.info('Discovered switches: =' .format(discovered_switches)) if discovered_switches == -1: discovered_switches = previous_discovered_switches if discovered_switches > max_discovered_switches: max_discovered_switches = discovered_switches if discovered_switches != previous_discovered_switches: t_discovery_start = time.time() previous_discovered_switches = discovered_switches if discovered_switches == expected_switches: delta_t = time.time() - t_start logging.info( '[monitor_thread_idle] {0} switches found in ' '{1} seconds'. format(discovered_switches, delta_t)) results['multinet_size'] = \ self.emulator.topo_size * len(self.emulator.workers_ips) results['bootup_time_secs'] = delta_t results['discovered_switches'] = discovered_switches results['max_discovered_switches'] = \ max_discovered_switches results['discovered_switches_error_code'] = error_code results['successful_bootup_time'] = delta_t self.result_queue.put([results]) return 0 gevent.sleep(1) Function `monitor_thread_idle_stability` has 28 lines of code (exceeds 25 allowed). Consider refactoring. def monitor_thread_idle_stability(self, reference_results, sample_id): """ This monitor function is used from idle stability multinet tests \ to put the results into gevent queue :param reference_results: The results returned from the just previous \ iteration of the test. Used in the frame of a stability test :param sample_id: The id of the sample running. Used in the frame of a \ stability test :type reference_results: dict :type sample_id: int """ oftraf_mon = Oftraf(self.controller, self.oftraf_node) oftraf_monitor_results = oftraf_mon.monitor_run_oftraf() results = self.system_results() results['global_sample_id'] = self.global_sample_id results['multinet_workers'] = len(self.emulator.workers_ips) results['multinet_size'] = \ self.emulator.topo_size * len(self.emulator.workers_ips) results['multinet_worker_topo_size'] = self.emulator.topo_size results['multinet_topology_type'] = self.emulator.topo_type results['multinet_hosts_per_switch'] = \ self.emulator.topo_hosts_per_switch results['multinet_group_size'] = self.emulator.topo_group_size results['multinet_group_delay_ms'] = self.emulator.topo_group_delay_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['controller_node_ip'] = self.controller.ip results['controller_port'] = str(self.controller.of_port) traffic_gen_ms = float(self.oftraf_node.interval_ms) / 1000Similar blocks of code found in 8 locations. Consider refactoring. results['of_out_packets_per_sec'] = \ (abs(float(oftraf_monitor_results['of_out_traffic'][0])) - reference_results['of_out_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_out_bytes_per_sec'] = \ (abs(float(oftraf_monitor_results['of_out_traffic'][1])) - reference_results['of_out_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_in_packets_per_sec'] = \ (abs(float(oftraf_monitor_results['of_in_traffic'][0])) - reference_results['of_in_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_in_bytes_per_sec'] = \ (abs(float(oftraf_monitor_results['of_in_traffic'][1])) - reference_results['of_in_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_out_packets_per_sec'] = \ (abs(float(oftraf_monitor_results['tcp_of_out_traffic'][0])) - reference_results['tcp_of_out_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_out_bytes_per_sec'] = \ (abs(float(oftraf_monitor_results['tcp_of_out_traffic'][1])) - reference_results['tcp_of_out_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_in_packets_per_sec'] = \ (abs(float(oftraf_monitor_results['tcp_of_in_traffic'][0])) - reference_results['tcp_of_in_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_in_bytes_per_sec'] = \ (abs(float(oftraf_monitor_results['tcp_of_in_traffic'][1])) - reference_results['tcp_of_in_traffic'][1]) / traffic_gen_ms results['sample_id'] = sample_id reference_results = oftraf_monitor_results self.result_queue.put({"current_sample": results, "previous_sample": reference_results}) return Function `monitor_thread_active` has 29 lines of code (exceeds 25 allowed). Consider refactoring. def monitor_thread_active(self): """ This monitor function is used from active scalability multinet tests to put the results into gevent queue """ oftraf_mon = Oftraf(self.controller, self.oftraf_node) oftraf_monitor_results = oftraf_mon.monitor_run_oftraf() results = self.system_results() results['global_sample_id'] = self.global_sample_id self.global_sample_id += 1 results['multinet_workers'] = len(self.emulator.workers_ips) results['multinet_size'] = \ self.emulator.topo_size * len(self.emulator.workers_ips) results['multinet_worker_topo_size'] = self.emulator.topo_size results['multinet_topology_type'] = self.emulator.topo_type results['multinet_hosts_per_switch'] = \ self.emulator.topo_hosts_per_switch results['multinet_group_size'] = self.emulator.topo_group_size results['multinet_group_delay_ms'] = self.emulator.topo_group_delay_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['controller_node_ip'] = self.controller.ip results['controller_port'] = str(self.controller.of_port) results['interpacket_delay_ms'] = self.emulator.interpacket_delay_ms results['traffic_generation_duration_ms'] = \ self.emulator.traffic_gen_duration_ms traffic_gen_ms = float(self.emulator.traffic_gen_duration_ms) / 1000Similar blocks of code found in 8 locations. Consider refactoring. results['of_out_packets_per_sec'] = \ float(oftraf_monitor_results['of_out_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_out_bytes_per_sec'] = \ float(oftraf_monitor_results['of_out_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_in_packets_per_sec'] = \ float(oftraf_monitor_results['of_in_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['of_in_bytes_per_sec'] = \ float(oftraf_monitor_results['of_in_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_out_packets_per_sec'] = \ float(oftraf_monitor_results['tcp_of_out_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_out_bytes_per_sec'] = \ float(oftraf_monitor_results['tcp_of_out_traffic'][1]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_in_packets_per_sec'] = \ float(oftraf_monitor_results['tcp_of_in_traffic'][0]) / traffic_gen_msSimilar blocks of code found in 8 locations. Consider refactoring. results['tcp_of_in_bytes_per_sec'] = \ float(oftraf_monitor_results['tcp_of_in_traffic'][1]) / traffic_gen_ms self.result_queue.put([results]) return 0 class NBgen(Monitor): """ NB-generator- related monitoring. Subclass of Monitor superclass """ def __init__(self, controller, nbgen, sbemu): """ Creates a NBgen monitor object. :param controller: object of the Controller class :param nbgen: object of the NB-generator class :param sbemu: object of the SBEmu subclass :type controller: object :type nbgen: object :type sbemu: object """ Monitor.__init__(self, controller) self.nbgen_queue = gevent.queue.Queue() self.nbgen = nbgen self.sbemu = sbemu Function `__poll_flows_ds` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. def __poll_flows_ds(self, t_start, expected_flows): """ Monitors operational DS from the time the transmission starts from NB towards the controller until the expected number of flows are found or the deadline is reached. :param t_start: timestamp for begin of discovery :param expected_flows: The number of expected flows to be compared with discovered flows :returns: Returns a float number containing the time in which total flows were discovered otherwise containing -1.0 on failure. :rtype: float :type t_start: float :type expected_flows: int """ t_discovery_start = time.time() previous_discovered_flows = 0 while True:Similar blocks of code found in 2 locations. Consider refactoring. if (time.time() - t_discovery_start) > \ self.nbgen.flows_ds_discovery_deadline: logging.info('[NB_emulator] [Poll_flows thread] Deadline of ' '{0} seconds passed' .format(self.nbgen.flows_ds_discovery_deadline)) self.nbgen.e2e_installation_time = -1.0 self.nbgen_queue.put({'end_to_end_flows_operation_time': -1.0}, block=True) logging.info('[NB_emulator] [Poll_flows thread] End to End ' 'installation time monitor FAILED') return else: new_ssh = self.controller.init_ssh() oper_ds_found_flows = self.controller.get_oper_flows(new_ssh) logging.debug('[NB_emulator] [Poll_flows thread] Found {0}' ' flows at inventory'. format(oper_ds_found_flows)) if (oper_ds_found_flows - previous_discovered_flows) != 0: t_discovery_start = time.time() previous_discovered_flows = oper_ds_found_flows if oper_ds_found_flows == expected_flows: time_interval = time.time() - t_start logging.debug('[NB_emulator] [Poll_flows thread] ' 'Flow-Master {0} flows found in {1} seconds' .format(expected_flows, time_interval)) self.nbgen.e2e_installation_time = time_interval self.nbgen_queue.put( {'end_to_end_flows_operation_time': time_interval}, block=True) logging.info('[NB_emulator] [Poll_flows thread] ' 'End to End installation time is: {0}' .format(self.nbgen.e2e_installation_time)) return gevent.sleep(1) Function `__poll_flows_ds_confirm` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. def __poll_flows_ds_confirm(self, expected_flows): """ Monitors operational DS until the expected number of flows are found or the deadline is reached. :param expected_flows: The number of expected flows to be compared with discovered flows :returns: Returns a float number containing the time in which total flows were discovered otherwise containing -1.0 on failure. :rtype: float :type expected_flows: int """ t_start = time.time() t_discovery_start = time.time() previous_discovered_flows = 0 while True:Similar blocks of code found in 2 locations. Consider refactoring. if (time.time() - t_discovery_start) > \ self.nbgen.flows_ds_discovery_deadline: logging.info('[NB_emulator] [Poll_flows_confirm thread] ' ' Deadline of {0} seconds passed' .format(self.flows_ds_discovery_deadline)) self.nbgen.confirm_time = -1.0 self.nbgen_queue.put({'confirm_time': -1.0}, block=True) logging.info('[NB_emulator] [Poll_flows_confirm thread] ' 'Confirmation time monitoring FAILED') return else: new_ssh = self.controller.init_ssh() oper_ds_found_flows = self.controller.get_oper_flows(new_ssh) logging.debug('[NB_emulator] [Poll_flows_confirm thread] ' 'Found {0} flows at inventory' .format(oper_ds_found_flows)) if (oper_ds_found_flows - previous_discovered_flows) != 0: t_discovery_start = time.time() previous_discovered_flows = oper_ds_found_flows if oper_ds_found_flows == expected_flows: time_interval = time.time() - t_start logging.debug('[NB_emulator] [Poll_flows_confirm thread] ' 'Flow-Master {0} flows found in {1} seconds' .format(expected_flows, time_interval)) self.nbgen.confirm_time = time_interval self.nbgen_queue.put({'confirm_time': time_interval}, block=True) logging.info('[NB_emulator] [Poll_flows_confirm thread] ' 'Confirmation time is: {0}' .format(self.nbgen.confirm_time)) return gevent.sleep(1) Function `__poll_flows_switches` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. def __poll_flows_switches(self, t_start, expected_flows): """ Monitors installed flows into switches of Multinet from the first REST request, until the expected number of flows are found or the deadline is reached. :param t_start: timestamp for beginning of discovery :param expected_flows: The number of expected flows to be compared with discovered flows :returns: Returns a float number containing the time in which total flows were discovered in Multinet switches. Otherwise containing -1.0 on failure. :rtype: float :type t_start: float :type expected_flows: int """ t_discovery_start = time.time() previous_discovered_flows = 0 while True:Similar blocks of code found in 2 locations. Consider refactoring. if (time.time() - t_discovery_start) > \ self.nbgen.flows_ds_discovery_deadline: logging.info('[NB_emulator] [Poll_flows_switches thread] ' 'Deadline of {0} seconds passed' .format(self.flows_ds_discovery_deadline)) self.nbgen.discover_flows_on_switches_time = -1.0 self.nbgen_queue.put({'switch_operation_time': -1.0}, block=True) logging.info('[NB_emulator] [Poll_flows_switches thread] ' 'Discovering flows on switches FAILED') return else: new_ssh = self.sbemu.init_ssh() discovered_flows = self.sbemu.get_flows(new_ssh) logging.debug('[NB_emulator] [Poll_flows_switches thread] ' 'Found {0} flows at topology switches' .format(discovered_flows)) if (discovered_flows - previous_discovered_flows) != 0: t_discovery_start = time.time() previous_discovered_flows = discovered_flows if discovered_flows == expected_flows: time_interval = time.time() - t_start logging.debug('[NB_emulator] [Poll_flows_switches thread]' ' expected flows = {0} \n ' 'discovered flows = {1}' .format(expected_flows, discovered_flows)) self.discover_flows_on_switches_time = time_interval self.nbgen_queue.put( {'switch_operation_time': time_interval}, block=True) logging.info('[NB_emulator] [Poll_flows_switches thread] ' 'Time to discover flows on switches is: {0}' .format(self.nbgen. discover_flows_on_switches_time)) return gevent.sleep(1) def __controller_time(self, t_start): """ Monitors the time for all add REST requests to be sent and their response to be received. :param t_start: timestamp for beginning of discovery :returns: Returns a float number containing the time in which total flows were discovered in Multinet switches. Otherwise containing -1.0 on failure. :rtype: float :type t_start: float """ controller_time = time.time() - t_start return controller_time def monitor_threads_run(self, t_start, total_failed_flows, expected_flows, flow_delete_flag): """ This monitor function is used from north bound tests to get the \ results from gevent queue :param t_start: timestamp for beginning of discovery iteration of \ the test. :param total_failed_flows: The number of failed flows after an add or \ delete function :param expected_flows: The number of expected flows to be compared \ with discovered flows :param flow_delete_flag: Flag, which when is set to True, a delete \ flows action in DS is performed. Otherwise an add flows action is \ performed :returns: Returns a dictionary, including all the results :rtype: dict :type t_start: float :type total_failed_flows: int :type expected_flows: int :type flow_delete_flag: boolean """ logging.info('[NB_emulator] Start polling measurements') monitor_ds = gevent.spawn(self.__poll_flows_ds, t_start, expected_flows) monitor_sw = gevent.spawn(self.__poll_flows_switches, t_start, expected_flows) monitor_ds_confirm = gevent.spawn(self.__poll_flows_ds_confirm, expected_flows) gevent.joinall([monitor_ds, monitor_sw, monitor_ds_confirm]) gevent.killall([monitor_ds, monitor_sw, monitor_ds_confirm]) time_start = time.time() controller_time = self.__controller_time(t_start) discovered_flows = self.sbemu.get_flows() flow_measurement_latency_interval = time.time() - time_start logging.info('[NB_emulator] Flows measurement latency ' 'interval: {0} sec. | Discovered flows: {1}' .format(flow_measurement_latency_interval, discovered_flows)) results_thread = {} results = {} while not self.nbgen_queue.empty(): results_thread.update(self.nbgen_queue.get()) if flow_delete_flag is False: results = self.monitor_results_add(controller_time, results_thread, total_failed_flows) else: results = self.monitor_results_del(controller_time, results_thread, total_failed_flows) return results Function `monitor_results_add` has 27 lines of code (exceeds 25 allowed). Consider refactoring.
Function `monitor_results_add` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring. def monitor_results_add(self, add_controller_time, results_thread, total_failed_flows): """ This monitor function is used to create the result dictionary during \ an add flows action :param add_controller_time: time for all add REST requests to be sent \ and their response to be received :param results_thread: The dictionary from monitor_threads_run \ function including the contents from nbgen_queue :param total_failed_flows: The number of failed flows after an add or \ delete function :returns: Returns a dictionary, including all the results :rtype: dict :type add_controller_time: float :type results_thread: dict :type total_failed_flows: int """ results = self.system_results() results['global_sample_id'] = self.global_sample_id results['multinet_workers'] = len(self.sbemu.workers_ips) results['multinet_size'] = \ self.sbemu.topo_size * len(self.sbemu.workers_ips) results['multinet_worker_topo_size'] = self.sbemu.topo_size results['multinet_topology_type'] = self.sbemu.topo_type results['multinet_hosts_per_switch'] = \ self.sbemu.topo_hosts_per_switch results['multinet_group_size'] = self.sbemu.topo_group_size results['multinet_group_delay_ms'] = self.sbemu.topo_group_delay_ms results['controller_statistics_period_ms'] = \ self.controller.stat_period_ms results['controller_node_ip'] = self.controller.ip results['controller_port'] = str(self.controller.of_port) results['interpacket_delay_ms'] = self.sbemu.interpacket_delay_ms results['traffic_generation_duration_ms'] = \ self.sbemu.traffic_gen_duration_ms results['flow_operation_delay_ms'] = \ self.nbgen.flow_operations_delay_ms results['flow_workers'] = \ self.nbgen.flow_workers # Flow scalability tests metrics # ------------------------------------------------------------------ # Add controller time: Time for all ADD REST requests to be sent # and their response to be received results['add_controller_time'] = add_controller_time results['add_controller_rate'] = \ float(self.nbgen.total_flows) / add_controller_time # End-to-end-installation-time: results['end_to_end_installation_time'] = \ results_thread['end_to_end_flows_operation_time']Similar blocks of code found in 3 locations. Consider refactoring. if results_thread['end_to_end_flows_operation_time'] != -1: results['end_to_end_installation_rate'] = \ float(self.nbgen.total_flows) / \ results_thread['end_to_end_flows_operation_time'] else: results['end_to_end_installation_rate'] = -1 # Add switch time: Time from the FIRST REST request until ALL flows # are present in the network results['add_switch_time'] = results_thread['switch_operation_time']Similar blocks of code found in 3 locations. Consider refactoring. if results_thread['switch_operation_time'] != -1: results['add_switch_rate'] = \ float(self.nbgen.total_flows) / \ results_thread['switch_operation_time'] else: results['add_switch_rate'] = -1 results['add_confirm_time'] = results_thread['confirm_time']Similar blocks of code found in 3 locations. Consider refactoring. if results_thread['confirm_time'] != -1: results['add_confirm_rate'] = \ float(self.nbgen.total_flows) / results_thread['confirm_time'] else: results['add_confirm_rate'] = -1 results['total_flows'] = self.nbgen.total_flows results['total_failed_flows_operations_add'] = total_failed_flows return results def monitor_results_del(self, controller_time, results_thread, total_failed_flows): """ This monitor function is used to create the result dictionary during a \ delete flows action :param controller_time: time for all delete REST requests to be sent \ and their response to be received :param results_thread: The dictionary from monitor_threads_run \ function including the contents from nbgen_queue :param total_failed_flows: The number of failed flows after an add or \ delete function :returns: Returns a dictionary, including all the results :rtype: dict :type controller_time: float :type results_thread: dict :type total_failed_flows: int """ # Remove controller time: Time for all delete REST requests to be sent # and their response to be received results = self.system_results() results['remove_controller_time'] = controller_time results['remove_controller_rate'] = \ float(self.nbgen.total_flows) / controller_time # end_to_end_remove_time: The time period started after the last flow # was configured, until we receive confirmation all flows were removed. results['end_to_end_remove_time'] = \ results_thread['end_to_end_flows_operation_time'] results['end_to_end_remove_rate'] = \ float(self.nbgen.total_flows) / \ results_thread['end_to_end_flows_operation_time'] # Remove switch time: Time from the first delete REST request until all # flows are removed from the network. results['remove_switch_time'] = \ results_thread['switch_operation_time'] results['remove_switch_rate'] = \ float(self.nbgen.total_flows) / \ results_thread['switch_operation_time'] # Remove confirm time: Time period started after the last # flow was unconfigured until we receive confirmation all flows are # removed. results['remove_confirm_time'] = results_thread['confirm_time'] results['remove_confirm_rate'] = \ float(self.nbgen.total_flows) / results_thread['confirm_time'] results['total_failed_flows_operations_del'] = total_failed_flows results['flow_delete_flag'] = 'True' return results