simple_history/management/commands/clean_duplicate_history.py
from django.db import transactionfrom django.utils import timezone from ... import utilsfrom . import populate_history class Command(populate_history.Command): args = "<app.model app.model ...>" help = ( "Scans HistoricalRecords for identical sequencial entries " "(duplicates) in a model and deletes them." ) DONE_CLEANING_FOR_MODEL = "Removed {count} historical records for {model}\n" def add_arguments(self, parser): parser.add_argument("models", nargs="*", type=str) parser.add_argument( "--auto", action="store_true", dest="auto", default=False, help="Automatically search for models with the HistoricalRecords field " "type", ) parser.add_argument( "-d", "--dry", action="store_true", help="Dry (test) run only, no changes" ) parser.add_argument( "-m", "--minutes", type=int, help="Only search the last MINUTES of history" ) parser.add_argument( "--excluded_fields", nargs="+", help="List of fields to be excluded from the diff_against check", ) parser.add_argument( "--base-manager", action="store_true", default=False, help="Use Django's base manager to handle all records stored in the" " database, including those that would otherwise be filtered or modified" " by a custom manager.", ) def handle(self, *args, **options): self.verbosity = options["verbosity"] self.excluded_fields = options.get("excluded_fields") self.base_manager = options.get("base_manager") to_process = set() model_strings = options.get("models", []) or args if model_strings: for model_pair in self._handle_model_list(*model_strings): to_process.add(model_pair) elif options["auto"]: to_process = self._auto_models() else: self.log(self.COMMAND_HINT) self._process(to_process, date_back=options["minutes"], dry_run=options["dry"]) Cyclomatic complexity is too high in method _process. (9)
Function `_process` has a Cognitive Complexity of 16 (exceeds 7 allowed). Consider refactoring.
Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed. def _process(self, to_process, date_back=None, dry_run=True): if date_back: stop_date = timezone.now() - timezone.timedelta(minutes=date_back) else: stop_date = None for model, history_model in to_process: m_qs = history_model.objects if stop_date: m_qs = m_qs.filter(history_date__gte=stop_date) if self.verbosity >= 2: found = m_qs.count() self.log(f"{model} has {found} historical entries", 2) if not m_qs.exists(): continue # Break apart the query so we can add additional filtering if self.base_manager: model_query = model._base_manager.all() else: model_query = model._default_manager.all() # If we're provided a stop date take the initial hit of getting the # filtered records to iterate over if stop_date: model_query = model_query.filter( pk__in=(m_qs.values_list(model._meta.pk.name).distinct()) ) for o in model_query.iterator(): self._process_instance(o, model, stop_date=stop_date, dry_run=dry_run) Cyclomatic complexity is too high in method _process_instance. (6) def _process_instance(self, instance, model, stop_date=None, dry_run=True): entries_deleted = 0 history = utils.get_history_manager_for_model(instance) o_qs = history.all() if stop_date: # to compare last history match extra_one = o_qs.filter(history_date__lte=stop_date).first() o_qs = o_qs.filter(history_date__gte=stop_date) else: extra_one = None with transaction.atomic(): # ordering is ('-history_date', '-history_id') so this is ok f1 = o_qs.first() if not f1: return for f2 in o_qs[1:]: entries_deleted += self._check_and_delete(f1, f2, dry_run) f1 = f2 if extra_one: entries_deleted += self._check_and_delete(f1, extra_one, dry_run) self.log( self.DONE_CLEANING_FOR_MODEL.format(model=model, count=entries_deleted) ) def log(self, message, verbosity_level=1): if self.verbosity >= verbosity_level: self.stdout.write(message) def _check_and_delete(self, entry1, entry2, dry_run=True): delta = entry1.diff_against(entry2, excluded_fields=self.excluded_fields) if not delta.changed_fields: if not dry_run: entry1.delete() return 1 return 0