test_junkie/cli/cli_audit.py
import collections
from test_junkie.builder import Builder
from test_junkie.constants import Undefined
class CliAudit:
__SECTIONS = ["owners", "features", "suites", "components", "tags"]
def __init__(self, suites, args):
self.aggregated_data = {
"absolute_test_count": 0, # parameterized tests will be treated as 1 test
"absolute_suite_count": 0,
"context_by_features": {},
"context_by_owners": {},
"context_by_suites": {},
"context_by_tags": {},
"context_by_components": {},
"parameterized_test_count": 0,
"parameterized_suite_count": 0,
}
self.suites = suites
self.exe_roster = Builder.get_execution_roster()
self.args = args
def aggregate(self):
def is_relevant(_suite=None, _test=None):
if not _suite and not _test:
raise Exception("Must pass in either a SuiteObject or a TestObject!")
elif _suite and not _test:
from test_junkie.rules import Rules
if self.args.no_rules and _suite.get_rules().__class__ != Rules:
return False
from test_junkie.listener import Listener
if self.args.no_listeners and _suite.get_listener().__class__ != Listener:
return False
if self.args.no_suite_retries and _suite.get_retry_limit() > 1:
return False
if self.args.no_suite_meta and _suite.get_meta():
return False
if self.args.no_owners and _suite.get_owner():
return False
if self.args.no_features and _suite.get_feature():
return False
if self.args.features != Undefined:
if _suite.get_feature() in self.args.features:
return True
return False
else:
if self.args.no_test_retries and _test.get_retry_limit() > 1:
return False
if self.args.no_test_meta and _suite.get_meta():
return False
if self.args.no_owners and _test.get_owner():
return False
if self.args.no_components and _test.get_component():
return False
if self.args.no_tags and _test.get_tags():
return False
if self.args.owners != Undefined:
if _test.get_owner() in self.args.owners:
return True
return False
if self.args.components != Undefined:
if _test.get_component() in self.args.components:
return True
return False
if self.args.tags != Undefined:
for tag in self.args.tags:
if tag in _test.get_tags():
return True
return False
return True
for suite, suite_object in self.exe_roster.items():
if not is_relevant(_suite=suite_object):
continue
tests = suite_object.get_test_objects()
self.aggregated_data["absolute_suite_count"] += 1
if suite_object.get_parameters() != [None]:
self.aggregated_data["parameterized_suite_count"] += 1
feature = suite_object.get_feature()
if feature not in self.aggregated_data["context_by_features"]:
self.aggregated_data["context_by_features"].update({
feature: {"tags": {}, "owners": {}, "components": {}, "suites": {}, "total_tests": 0}})
feature_context = self.aggregated_data["context_by_features"][feature]
feature_context["total_tests"] += len(tests)
feature_context["suites"].update({"{}.{}".format(suite_object.get_class_module(),
suite_object.get_class_name()): len(tests)})
suite_context = {"total_tests": len(tests),
"tags": {},
"owners": {},
"components": {},
"feature": feature}
self.aggregated_data["absolute_test_count"] += len(tests)
for test in tests:
if test.get_owner() is None:
test.get_kwargs().update({"owner": suite_object.get_owner()})
if not is_relevant(_suite=suite, _test=test):
continue
# self.aggregated_data["absolute_test_count"] += 1
if test.accepts_suite_parameters() or test.accepts_test_parameters():
self.aggregated_data["parameterized_test_count"] += 1
for context in [feature_context, suite_context]:
CliAudit.process_tags(context, test)
CliAudit.process_property(context, "components", test.get_component())
CliAudit.process_property(context, "owners", test.get_owner())
self.update_context(suite, test, feature)
self.aggregated_data["context_by_suites"].update({suite_object.get_class_name(): suite_context})
@staticmethod
def process_property(data_context, prop, key):
if key not in data_context[prop]:
data_context[prop].update({key: 1})
else:
data_context[prop][key] += 1
@staticmethod
def process_tags(data_context, test):
for tag in test.get_tags():
if tag not in data_context["tags"]:
data_context["tags"].update({tag: 1})
else:
data_context["tags"][tag] += 1
def update_context(self, suite, test, feature):
def process_data_context(_data_context, _context):
if _context != "tags":
CliAudit.process_tags(_data_context, test)
if _context != "components":
CliAudit.process_property(_data_context, "components", test.get_component())
if _context != "owners":
CliAudit.process_property(_data_context, "owners", test.get_owner())
CliAudit.process_property(_data_context, "features", feature)
CliAudit.process_property(_data_context, "suites", "{}.{}".format(suite.__module__, suite.__name__))
def get_template(_context):
template = {"total_tests": 0}
for attribute in CliAudit.__SECTIONS:
if attribute != _context:
template.update({attribute: {}})
return template
def update_context_template(_value, _context):
if _value not in self.aggregated_data["context_by_{context}".format(context=context)]:
self.aggregated_data["context_by_{context}".format(context=context)].update(
{_value: get_template(_context)})
data_context = self.aggregated_data["context_by_{context}".format(context=context)][_value]
data_context["total_tests"] += 1
process_data_context(data_context, _context)
for context in ["tags", "components", "owners"]:
if context == "tags":
for tag in test.get_tags():
update_context_template(tag, context)
else:
value = test.get_component() if context == "components" else test.get_owner()
update_context_template(value, context)
def print_results(self):
from test_junkie.cli.cli import CliUtils
match_found = False
output = []
for data_context in CliAudit.__SECTIONS:
if data_context == self.args.command:
data = self.aggregated_data["context_by_{context}".format(context=data_context)]
if data:
section = []
_sorted_data = sorted(data.items(), key=lambda x: x[1]["total_tests"])
_sorted_data.reverse()
_sorted_data = collections.OrderedDict(_sorted_data)
for primary_key, context in _sorted_data.items():
details = []
if data_context == "suites":
details.append("\nSuite: {value} Feature: {feature}"
.format(value=CliUtils.format_bold_string(primary_key),
feature=CliUtils.format_bold_string(context["feature"])))
else:
parent = "".join(list(data_context)[:-1]).capitalize() # exp: features > Feature
if primary_key is None:
primary_key = CliUtils.format_color_string(primary_key, "red")
else:
primary_key = CliUtils.format_bold_string(primary_key)
details.append("\n{parent}: {value}".format(parent=parent, value=primary_key))
from test_junkie.metrics import Aggregator
details.append("\t- Tests:\t{total} of {absolute} total tests ({percentage}%)"
.format(total=context["total_tests"],
absolute=self.aggregated_data["absolute_test_count"],
percentage=Aggregator.percentage(
self.aggregated_data["absolute_test_count"],
context["total_tests"])))
for i in CliAudit.__SECTIONS:
if i in context:
msg = "\t"
counter = 0
_sorted_context = sorted(context[i].items(), key=lambda x: x[1])
_sorted_context.reverse()
_sorted_context = collections.OrderedDict(_sorted_context)
for key, count in _sorted_context.items():
if counter > 0:
msg += "\n\t\t\t"
if key is None:
key = CliUtils.format_color_string(key, "red")
msg += "{} ({})".format(key, count)
counter += 1
if len(msg) > 0:
details.append("\t- {i}: {msg}".format(i=i.capitalize(), msg=msg))
if len(details) >= 5:
section += details
if len(section) > 1:
output.append(section)
break
for section in output:
if len(section) >= 5:
match_found = True
for msg in section:
print(msg)
if not match_found:
print("[{status}] Nothing matches your search criteria!"
.format(status=CliUtils.format_color_string("INFO", "blue")))