fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/awards/models/transaction_delta.py

Summary

Maintainability
A
0 mins
Test Coverage
A
94%
"""
This model was originally created to satisfy DEV-2417:

    As a data re-purposer, I need monthly delta files to also include transactions which
    were added/modified by appropriate data change operation scripts so that the delta
    files keep me in sync with USAspending.gov data.

TransactionDelta (transaction_delta) is used to track transaction records that have been
created/updated outside of the nightly pipeline.  These are most likely to be one-time
data updates/corrections which would otherwise not be captured by the monthly delta process.

To use this table, simply record the transaction_normalized.id of the offending transaction
and the current timestamp (created_at).  If said transaction already exists, update it with
a fresher timestamp.

The monthly delta process will handle the rest.
"""
from datetime import datetime, timezone
from django.db import models, transaction
from django.db.utils import IntegrityError
from usaspending_api.awards.models.transaction_normalized import TransactionNormalized


# To keep queries from getting too large.
CHUNK_SIZE = 5000


class TransactionDeltaManager(models.Manager):
    def delete_by_created_at(self, max_created_at):
        delete_count, _ = self.get_queryset().filter(created_at__lte=max_created_at).delete()
        return delete_count

    def get_max_created_at(self):
        return self.get_queryset().aggregate(models.Max("created_at"))["created_at__max"]

    def update_or_create_transaction(self, transaction_id):
        """
        Update or create the specified transaction id.
        """
        if transaction_id:
            self.update_or_create_transactions([transaction_id])

    @staticmethod
    def update_or_create_transactions(transaction_ids):
        """
        Update or create the specified transaction ids.
        """
        if transaction_ids:
            # Duplicates will cause us problems so let's eliminate them.
            transaction_ids = list(set(transaction_ids))

            # TransactionCheck
            err_transactions = TransactionNormalized.objects.filter(id__in=transaction_ids).count()
            if err_transactions != len(transaction_ids):
                raise IntegrityError("transaction_ids not found in transaction_normalized")

            created_at = datetime.now(timezone.utc)
            with transaction.atomic():

                # Perform upserts in chunks.
                for chunk_start in range(0, len(transaction_ids), CHUNK_SIZE):
                    chunk_of_ids = transaction_ids[chunk_start : chunk_start + CHUNK_SIZE]
                    chunk_of_inserts = tuple(
                        TransactionDelta(transaction_id=transaction_id, created_at=created_at)
                        for transaction_id in chunk_of_ids
                    )

                    # Fake an upsert by first deleting transactions then inserting.
                    TransactionDelta.objects.filter(pk__in=chunk_of_ids).delete()
                    TransactionDelta.objects.bulk_create(chunk_of_inserts)


class TransactionDelta(models.Model):

    transaction = models.OneToOneField(
        "awards.TransactionNormalized", on_delete=models.CASCADE, primary_key=True, db_constraint=False
    )
    created_at = models.DateTimeField()

    objects = TransactionDeltaManager()

    class Meta:
        db_table = "transaction_delta"