api/anomaly/services/rootCauseAnalyses.py
import jsonimport loggingimport tracebackimport datetime as dtimport dateutil.parser as dpfrom utils.apiResponse import ApiResponsefrom ops.tasks import rootCauseAnalysisJobfrom app.celery import app from anomaly.models import RootCauseAnalysis, RCAAnomaly, Anomalyfrom anomaly.serializers import RootCauseAnalysisSerializer, RCAAnomalySerializerfrom ops.tasks.detection.core.anomalyDetection import detect, dataFrameEmpty logger = logging.getLogger(__name__) class RootCauseAnalyses: """ Class to deal with functionalities associataed with RCA """ @staticmethod def calculateRCA(anomalyId: int): """ Trigger job for RCA calculation :param anomalyId: id of anomaly object needed to be analyzed """ res = ApiResponse("Error in triggering RCA calculation") rootCauseAnalysis, _ = RootCauseAnalysis.objects.get_or_create( anomaly_id=anomalyId ) rootCauseAnalysis.status = RootCauseAnalysis.STATUS_RECEIVED rootCauseAnalysis.save() task = rootCauseAnalysisJob.delay(anomalyId) rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly_id=anomalyId) rootCauseAnalysis.taskIds = [*rootCauseAnalysis.taskIds, task.id] rootCauseAnalysis.save() res.update(True, "Successfully triggered RCA calculation") return res @staticmethod def getRCA(anomalyId: int): """ Get data for RCA :param anomalyId: id of anomaly object whose RCA to be fetched """ res = ApiResponse("Error in getting RCA") anomaly = Anomaly.objects.get(id=anomalyId) rcaAnomalies = RCAAnomaly.objects.filter(anomaly_id=anomalyId).order_by( "-data__anomalyLatest__value" ) rcaAnomaliesData = RCAAnomalySerializer(rcaAnomalies, many=True).data data = { "status": None, "logs": None, "startTimestamp": None, "endTimestamp": None, } if hasattr(anomaly, "rootcauseanalysis"): data = { **data, **RootCauseAnalysisSerializer(anomaly.rootcauseanalysis).data, } data = { **data, "granularity": anomaly.anomalyDefinition.dataset.granularity, "measure": anomaly.anomalyDefinition.metric, "dimension": anomaly.anomalyDefinition.dimension, "dimensionValue": anomaly.dimensionVal, "value": anomaly.data["anomalyLatest"]["value"], "anomalyContribution": anomaly.data["contribution"], "rcaAnomalies": rcaAnomaliesData, "anomalyTime": anomaly.data["anomalyLatest"]["anomalyTimeISO"], } res.update(True, "Successfully retrieved RCA", data) return res @staticmethod def abortRCA(anomalyId: int): """ Abort RCA :param anomalyId: id of anomaly object whose RCA needs to be aborted """ res = ApiResponse("Error in aborting RCA") try: rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly_id=anomalyId) app.control.revoke(rootCauseAnalysis.taskIds, terminate=True) rootCauseAnalysis.status = RootCauseAnalysis.STATUS_ABORTED rootCauseAnalysis.endTimestamp = dt.datetime.now() rootCauseAnalysis.save() res.update(True, "Successfully triggered RCA calculation") except Exception as ex: logger.error("Error in aborting RCA:%s", str(ex)) return res @staticmethodFunction `createRCAAnomaly` has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring. def createRCAAnomaly( anomalyId: int, dimension: str, dimensionValue: str, contriPercent: float, df ): """ Create RCA Anomaly for given anomalyId, dimension, dimensionValue :param anomalyId: id of anomaly object being analyzed :param dimension: dimension for which anomaly is being analyzed :param dimensionValue: dimension value for which anomaly is being analyzed :param contriPercent: percent contribution of given dimension: dimensionValue is whole :param df: data for anomaly detection """ anomaly = Anomaly.objects.get(id=anomalyId) output = {"dimVal": dimensionValue, "success": True} try: if dataFrameEmpty(df): return output granularity = anomaly.anomalyDefinition.dataset.granularity result = detect( df, granularity, "Prophet", anomaly.anomalyDefinition, limit=6 ) del result["anomalyData"]["predicted"] # removing anomalous point other than last one anomalyTimeISO = anomaly.data["anomalyLatest"]["anomalyTimeISO"] if ( not "anomalyLatest" in result or not result["anomalyLatest"] or result["anomalyLatest"]["anomalyTimeISO"] != anomalyTimeISO ): return output for row in result["anomalyData"]["actual"]: if not (row["ds"] == anomalyTimeISO and row["anomaly"] == 15): row["anomaly"] = 1 else: row["anomaly"] = 6 # removing prediction band result["anomalyData"]["band"] = result["anomalyData"]["band"][:-15] result["contribution"] = contriPercent result["anomalyLatest"]["contribution"] = contriPercent rcaAnomaly, _ = RCAAnomaly.objects.get_or_create( anomaly_id=anomalyId, dimension=dimension, dimensionValue=dimensionValue ) rcaAnomaly.data = result rcaAnomaly.save() output["rcaAnomalyId"] = rcaAnomaly.id except Exception as ex: output["error"] = json.dumps( {"message": str(ex), "stackTrace": traceback.format_exc()} ) output["success"] = False return output