fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/etl/transaction_loaders/data_load_helpers.py

Summary

Maintainability
A
45 mins
Test Coverage
F
50%
from datetime import datetime
import os
import re
import boto3
import csv
import logging
import dateutil

from django.conf import settings

logger = logging.getLogger("script")


def capitalize_if_string(val):
    try:
        return val.upper()
    except AttributeError:
        return val


# 10/31/2019: According to PO direction, this functionality is NOT desired, and should be phased out as soon as it's safe
def false_if_null(val):
    if val is None:
        return False
    return val


def truncate_timestamp(val):
    if isinstance(val, datetime):
        return val.date()
    elif isinstance(val, str):
        return dateutil.parser.parse(val).date()
    elif val is None:
        return None
    else:
        raise ValueError("{} is not parsable as a date!".format(val.type))


def format_value_for_sql(val, cur):
    return str(cur.mogrify("%s", (val,)), "utf-8")


def format_bulk_insert_list_column_sql(cursor, load_objects, type):
    """creates formatted sql text to put into a bulk insert statement"""
    keys = load_objects[0][type].keys()

    columns = ['"{}"'.format(key) for key in load_objects[0][type].keys()]
    values = [[format_value_for_sql(load_object[type][key], cursor) for key in keys] for load_object in load_objects]

    col_string = "({})".format(",".join(columns))
    val_string = ",".join(["({})".format(",".join(map(str, value))) for value in values])

    return col_string, val_string


def format_insert_or_update_column_sql(cursor, load_object, type):
    """creates formatted sql text to put into a single row insert or update statement"""
    columns = []
    values = []
    update_pairs = []
    for key in load_object[type].keys():
        columns.append('"{}"'.format(key))
        val = format_value_for_sql(load_object[type][key], cursor)
        values.append(val)
        if key not in ["create_date", "created_at"]:
            update_pairs.append(" {}={}".format(key, val))

    col_string = "({})".format(",".join(map(str, columns)))
    val_string = "({})".format(",".join(map(str, values)))
    pairs_string = ",".join(update_pairs)

    return col_string, val_string, pairs_string


def get_deleted_fpds_data_from_s3(date):
    ids_to_delete = []
    regex_str = ".*_delete_records_(IDV|award).*"

    if not date:
        return []

    if settings.IS_LOCAL:
        for file in os.listdir(settings.CSV_LOCAL_PATH):
            if re.search(regex_str, file) and datetime.strptime(file[: file.find("_")], "%m-%d-%Y").date() >= date:
                with open(settings.CSV_LOCAL_PATH + file, "r") as current_file:
                    # open file, split string to array, skip the header
                    reader = csv.reader(current_file.read().splitlines())
                    next(reader)
                    unique_key_list = [rows[0] for rows in reader]

                    ids_to_delete += unique_key_list
    else:
        # Connect to AWS
        aws_region = settings.USASPENDING_AWS_REGION
        DELETED_TRANSACTION_JOURNAL_FILES = settings.DELETED_TRANSACTION_JOURNAL_FILES

        if not (aws_region and DELETED_TRANSACTION_JOURNAL_FILES):
            raise Exception(
                "Missing required environment variables: USASPENDING_AWS_REGION, DELETED_TRANSACTION_JOURNAL_FILES"
            )

        s3client = boto3.client("s3", region_name=aws_region)
        s3resource = boto3.resource("s3", region_name=aws_region)
        s3_bucket = s3resource.Bucket(DELETED_TRANSACTION_JOURNAL_FILES)

        # make an array of all the keys in the bucket
        file_list = [item.key for item in s3_bucket.objects.all()]

        # Only use files that match the date we're currently checking
        for item in file_list:
            # if the date on the file is the same day as we're checking
            if (
                re.search(regex_str, item)
                and "/" not in item
                and datetime.strptime(item[: item.find("_")], "%m-%d-%Y").date() >= date
            ):
                s3_item = s3client.get_object(Bucket=DELETED_TRANSACTION_JOURNAL_FILES, Key=item)
                reader = csv.reader(s3_item["Body"].read().decode("utf-8").splitlines())

                # skip the header, the reader doesn't ignore it for some reason
                next(reader)
                # make an array of all the detached_award_procurement_ids
                unique_key_list = [rows[0] for rows in reader]

                ids_to_delete += unique_key_list

    logger.info("Number of records to delete: %s" % str(len(ids_to_delete)))
    return ids_to_delete