api/ops/tasks/rootCauseAnalysis.py
import jsonimport timeimport loggingimport tracebackimport datetime as dtimport pandas as pdimport html2textfrom django.template import Template, Contextfrom celery import shared_task, groupfrom celery.result import allow_join_resultfrom app.celery import app from anomaly.models import Anomaly, RootCauseAnalysis, RCAAnomaly from anomaly.serializers import AnomalySerializerfrom access.data import Datafrom access.utils import prepareAnomalyDataframesfrom anomaly.services.telemetry import rca_event_log # ANOMALY_DAILY_TEMPLATE = "Anomaly Daily Template"# ANOMALY_HOURLY_TEMPLATE = "Anomaly Hourly Template"logger = logging.getLogger(__name__) @shared_taskFunction `_anomalyDetectionForValue` has 5 arguments (exceeds 4 allowed). Consider refactoring.def _anomalyDetectionForValue( anomalyId: int, dimension: str, dimensionVal: str, contriPercent: float, dfDict: list,): """ Internal anomaly detection subtask to be grouped by celery for each anomaly object :param anomaly_id: id of anomaly object under analysis :param dimension: dimension for which anomaly is being detected :param dimensionVal: dimension value being analyzed :para data: data for dimension """ from anomaly.services import RootCauseAnalyses try: anomalyServiceResult = RootCauseAnalyses.createRCAAnomaly( anomalyId, dimension, dimensionVal, contriPercent, pd.DataFrame(dfDict) ) return anomalyServiceResult except Exception as ex: return False def _parallelizeAnomalyDetection(anomalyId: int, dimension: str, dimValsData: list): """ Run anomaly detection in parallel in celery :param dimValsData: Data for anomaly detection """ detectionJobs = group( _anomalyDetectionForValue.s( anomalyId, dimension, obj["dimVal"], obj["contriPercent"], obj["df"].to_dict("records"), ) for obj in dimValsData ) rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly_id=anomalyId) _detectionJobs = detectionJobs.apply_async() taskIds = [task.id for task in _detectionJobs.children] rootCauseAnalysis.taskIds = [*rootCauseAnalysis.taskIds, *taskIds] rootCauseAnalysis.save() with allow_join_result(): results = _detectionJobs.get() return results @shared_taskdef _anomalyDetectionForDimension(anomalyId: int, dimension: str, data: list): """ Method to find initiate anomaly detection for a given anomaly definition :param anomaly_id: id of anomaly object under analysis :param dimension: dimension for which anomaly is being detected :para data: data for dimension """ DEFAULT_OPERATION = "Min % Contribution" DEFAULT_OPERATION_VALUE = 1 anomaly = Anomaly.objects.get(id=anomalyId) try: df = pd.DataFrame(data=data) operation = anomaly.anomalyDefinition.operation operationValue = int(anomaly.anomalyDefinition.value) operationValue = operationValue if operation else DEFAULT_OPERATION_VALUE operation = operation if operation else DEFAULT_OPERATION dimValsData = prepareAnomalyDataframes( df, anomaly.anomalyDefinition.dataset.timestampColumn, anomaly.anomalyDefinition.metric, dimension, operation, operationValue, ) anomaly.rootcauseanalysis.logs = { **anomaly.rootcauseanalysis.logs, dimension: "Analyzing..", } anomaly.rootcauseanalysis.save() results = _parallelizeAnomalyDetection(anomalyId, dimension, dimValsData) anomaly = Anomaly.objects.get(id=anomalyId) anomaly.rootcauseanalysis.logs = { **anomaly.rootcauseanalysis.logs, dimension: "Analyzed", } anomaly.rootcauseanalysis.save() if not all([x["success"] for x in results]): return False except Exception as ex: anomaly.rootcauseanalysis.logs = { **anomaly.rootcauseanalysis.logs, dimension: "Analysis Failed", dimension + " Error Stack Trace": traceback.format_exc(), dimension + " Error Message": str(ex), } anomaly.rootcauseanalysis.save() return False return True @shared_taskdef rootCauseAnalysisJob(anomalyId: int): """ Method to do root cause analysis for a given anomaly :param anomaly_id: id of anomaly object under analysis """ from anomaly.services.alerts import SlackAlert anomaly = Anomaly.objects.get(id=anomalyId) rootCauseAnalysis, _ = RootCauseAnalysis.objects.get_or_create(anomaly=anomaly) rootCauseAnalysis.startTimestamp = dt.datetime.now() rootCauseAnalysis.endTimestamp = None rootCauseAnalysis.logs = {} rootCauseAnalysis.status = RootCauseAnalysis.STATUS_RUNNING rootCauseAnalysis.taskIds = [ *rootCauseAnalysis.taskIds, rootCauseAnalysisJob.request.id, ] rootCauseAnalysis.save() logger.info("Removing already existing rca data") anomaly.rcaanomaly_set.all().delete() # Todo pre-fetch related data try: # **rootCauseAnalysis.logs, rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly=anomaly) rootCauseAnalysis.logs = {"Data": "Fetching..."} rootCauseAnalysis.save() datasetDf = Data.fetchDatasetDataframe(anomaly.anomalyDefinition.dataset) dimension = anomaly.anomalyDefinition.dimension dimensions = json.loads(anomaly.anomalyDefinition.dataset.dimensions) otherDimensions = [x for x in dimensions if x != dimension] if dimension: filteredDf = datasetDf[datasetDf[dimension] == anomaly.dimensionVal] else: filteredDf = datasetDf timestampColumn = anomaly.anomalyDefinition.dataset.timestampColumn metric = anomaly.anomalyDefinition.metric rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly=anomaly) # **rootCauseAnalysis.logs, # "Data": "Fetched", rootCauseAnalysis.logs = {"Analyzing Dimensions": ", ".join(otherDimensions)} rootCauseAnalysis.save() results = [ _anomalyDetectionForDimension( anomalyId, dim, filteredDf[[timestampColumn, metric, dim]].to_dict("records"), ) for dim in otherDimensions ] rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly=anomaly) if all(results): rootCauseAnalysis.status = RootCauseAnalysis.STATUS_SUCCESS else: rootCauseAnalysis.status = RootCauseAnalysis.STATUS_ERROR except Exception as ex: rootCauseAnalysis = RootCauseAnalysis.objects.get(anomaly=anomaly) rootCauseAnalysis.logs = { **rootCauseAnalysis.logs, "Error Stack Trace": traceback.format_exc(), "Error Message": str(ex), } rootCauseAnalysis.status = RootCauseAnalysis.STATUS_ERROR rootCauseAnalysis.endTimestamp = dt.datetime.now() rootCauseAnalysis.save() rca_event_log(rootCauseAnalysis.status) return True