cuebook/CueObserve

View on GitHub
api/ops/tasks/detection/core/anomalyDetection.py

Summary

Maintainability
A
2 hrs
Test Coverage
C
72%
import json
import traceback
import datetime as dt
import dateutil.parser as dp
 
import pandas as pd
 
from .detectionTypes.prophet import prophetDetect
from .detectionTypes.percentageChange import percentChangeDetect
from .detectionTypes.lifetime import lifetimeDetect
from .detectionTypes.valueThreshold import valueThresholdDetect
 
def dataFrameEmpty(df):
"""Checks whether dataFrame has enough data for prophet"""
if df is None:
return True
if df.empty:
return True
if df.shape[0] < 20:
return True
return False
 
 
Function `detect` has 5 arguments (exceeds 4 allowed). Consider refactoring.
def detect(df, granularity, detectionRuleType, detectionParams, limit=None):
"""
Method to detect anomaly depending on the detection rule type
"""
if detectionRuleType == "Prophet":
return prophetDetect(df, granularity, limit)
elif detectionRuleType == "Percentage Change":
return percentChangeDetect(df, granularity, detectionParams["threshold"])
elif detectionRuleType == "Lifetime High/Low":
return lifetimeDetect(df, granularity)
elif detectionRuleType == "Value Threshold":
return valueThresholdDetect(df, granularity, detectionParams["operator"], detectionParams["value1"], detectionParams["value2"])
 
 
Function `anomalyService` has a Cognitive Complexity of 8 (exceeds 5 allowed). Consider refactoring.
Function `anomalyService` has 5 arguments (exceeds 4 allowed). Consider refactoring.
def anomalyService(dimValObj, dfDict, anomalyDefProps, detectionRuleType, detectionParams):
"""
Method to conduct the anomaly detection process
"""
df = pd.DataFrame(dfDict)
anomalyId = dimValObj["anomalyId"]
dimVal = dimValObj["dimVal"]
contriPercent = dimValObj["contriPercent"]
output = {"dimVal": dimVal, "anomalyId": anomalyId}
granularity = anomalyDefProps["granularity"]
try:
if dataFrameEmpty(df):
output["error"] = json.dumps({"message": "Insufficient data in dataframe."})
output["success"] = False
return output
 
result = detect(df, granularity, detectionRuleType, detectionParams)
result["contribution"] = contriPercent
toPublish = False
if result["anomalyLatest"]:
result["anomalyLatest"]["contribution"] = contriPercent
timeThreshold = 3600 * 24 * 5 if granularity == "day" else 3600 * 24
toPublish = (
dt.datetime.now().timestamp()
- dp.parse(result["anomalyLatest"]["anomalyTimeISO"]).timestamp()
<= timeThreshold
)
if anomalyDefProps["highOrLow"]:
toPublish = (
toPublish
and anomalyDefProps["highOrLow"].lower()
== result["anomalyLatest"]["highOrLow"]
)
output["data"] = result
output["published"] = toPublish
output["success"] = True
except Exception as ex:
output["error"] = json.dumps(
{"message": str(ex), "stackTrace": traceback.format_exc()}
)
output["success"] = False
 
return output