coforma/swift-tech-challenge

View on GitHub
utilities/data-ingestion/ingest-institutions.py

Summary

Maintainability
A
0 mins
Test Coverage
import boto3
import json
from decimal import Decimal
from datetime import datetime
import logging
import os

s3_client = boto3.client("s3")
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.getenv("DYNAMODB_TABLE") or "institutions")
DESCRIPTIONS_QUEUE_URL = (
    os.getenv("DESCRIPTIONS_QUEUE_URL")
    or "https://sqs.us-east-1.amazonaws.com/905418154281/get-institution-descriptions"
)
IMAGES_QUEUE_URL = (
    os.getenv("IMAGES_QUEUE_URL")
    or "https://sqs.us-east-1.amazonaws.com/905418154281/get-institution-images"
)
sqs = boto3.client("sqs")
logger = logging.getLogger()

# Helper function to convert to Decimal, has to have an
# option to return original string
def convertToNumeric(inputString, isInteger=False):
    if len(inputString) == 0:
        return None
    try:
        if isInteger:
            return Decimal(int(inputString))
        return Decimal(inputString)
    finally:
        return inputString


# Helper function to map id to value
def mapControl(control_id):
    match control_id:
        case "1":
            return "Public"
        case "2":
            return "Private nonprofit"
        case _:
            return "Private for-profit"


def lambda_handler(event, context):
    try:
        # Grab file from S3
        s3_Bucket_Name = event["Records"][0]["s3"]["bucket"]["name"]
        s3_File_Name = event["Records"][0]["s3"]["object"]["key"]
        object = s3_client.get_object(Bucket=s3_Bucket_Name, Key=s3_File_Name)

        # massage data
        data = object["Body"].read().decode("utf-8")
        institutions = data.split("\n")
        institutions.pop(0)  # removes header row
        institutions_to_save = []

        # transform data into Item and add to queue
        for institution in institutions:
            institution_data = institution.split(",")
            try:
                Item = {
                    "institutionId": int(institution_data[0]),
                    "recordType": "data",
                    "updatedAt": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f"),
                    "description": "",
                    "imageLocation": "",
                    "institutionName": institution_data[1],
                    "studentPopulation": institution_data[2],
                    "city": institution_data[3],
                    "state": institution_data[4],
                    "zip": institution_data[5],
                    "latitude": convertToNumeric(institution_data[11]),
                    "longitude": convertToNumeric(institution_data[12]),
                    "url": institution_data[7],
                    "institutionType": mapControl(institution_data[6]),
                    "predominantUndergradDegree": institution_data[9],
                    "highestDegreeAwarded": institution_data[10],
                    "specialties": {
                        "humanities": institution_data[61],
                        "stem": institution_data[62],
                        "socialScience": institution_data[63],
                        "occupational": institution_data[64],
                        "interdisciplinary": institution_data[65],
                    },
                    "raceDemographics": {
                        "percentWhite": convertToNumeric(institution_data[14]),
                        "percentBlack": convertToNumeric(institution_data[15]),
                        "percentAsian": convertToNumeric(institution_data[17]),
                        "percentHispanic": convertToNumeric(institution_data[16]),
                        "percentAian": convertToNumeric(institution_data[18]),
                        "percentNhpi": convertToNumeric(institution_data[19]),
                        "percentTwoOrMore": convertToNumeric(institution_data[20]),
                        "percentNonResidentAlien": convertToNumeric(
                            institution_data[21]
                        ),
                        "percentUnknownRace": convertToNumeric(institution_data[22]),
                    },
                    "admissionRate": convertToNumeric(institution_data[13]),
                    "averageAttendanceCost": convertToNumeric(institution_data[35]),
                    "tuitionInState": convertToNumeric(institution_data[36]),
                    "tuitionOutOfState": convertToNumeric(institution_data[37]),
                    "percentUndergradWithLoan": convertToNumeric(institution_data[43]),
                    "netPriceCalculatorUrl": institution_data[8],
                    "satScores": {
                        "satAverageScore": convertToNumeric(institution_data[60], True),
                        "satReadingPercentile25": convertToNumeric(
                            institution_data[51], True
                        ),
                        "satReadingPercentile50": convertToNumeric(
                            institution_data[57], True
                        ),
                        "satReadingPercentile75": convertToNumeric(
                            institution_data[52], True
                        ),
                        "satWritingPercentile25": convertToNumeric(
                            institution_data[55], True
                        ),
                        "satWritingPercentile50": convertToNumeric(
                            institution_data[59], True
                        ),
                        "satWritingPercentile75": convertToNumeric(
                            institution_data[56], True
                        ),
                        "satMathPercentile25": convertToNumeric(
                            institution_data[53], True
                        ),
                        "satMathPercentile50": convertToNumeric(
                            institution_data[58], True
                        ),
                        "satMathPercentile75": convertToNumeric(
                            institution_data[54], True
                        ),
                    },
                    "publicNetPrice": {
                        "averagePrice": convertToNumeric(institution_data[23], True),
                        "averagePriceUnder30k": convertToNumeric(
                            institution_data[25], True
                        ),
                        "averagePriceUnder30To48k": convertToNumeric(
                            institution_data[26], True
                        ),
                        "averagePriceUnder48To75k": convertToNumeric(
                            institution_data[27], True
                        ),
                        "averagePriceUnder75To110k": convertToNumeric(
                            institution_data[28], True
                        ),
                        "averagePriceUnder110kPlus": convertToNumeric(
                            institution_data[29], True
                        ),
                    },
                    "privateNetPrice": {
                        "averagePrice": convertToNumeric(institution_data[24], True),
                        "averagePriceUnder30k": convertToNumeric(
                            institution_data[30], True
                        ),
                        "averagePriceUnder30To48k": convertToNumeric(
                            institution_data[31], True
                        ),
                        "averagePriceUnder48To75k": convertToNumeric(
                            institution_data[32], True
                        ),
                        "averagePriceUnder75To110k": convertToNumeric(
                            institution_data[33], True
                        ),
                        "averagePriceUnder110kPlus": convertToNumeric(
                            institution_data[34], True
                        ),
                    },
                    "facultyAverageSalary": convertToNumeric(institution_data[39]),
                    "facultyPercentageEmployedFull": convertToNumeric(
                        institution_data[40]
                    ),
                    "studentToFacultyRatio": convertToNumeric(institution_data[47]),
                    "instructionalExpenditurePerSt": convertToNumeric(
                        institution_data[38]
                    ),
                    "completionRates": {
                        "fourYearInstitution": convertToNumeric(institution_data[41]),
                        "underFourYearInstitution": convertToNumeric(
                            institution_data[42]
                        ),
                    },
                    "percentCompletedInFourYears": convertToNumeric(
                        institution_data[44]
                    ),
                    "percentCompletedInEightYears": convertToNumeric(
                        institution_data[50]
                    ),
                    "earnings": {
                        "meanGraduateEarnings10Years": convertToNumeric(
                            institution_data[45], True
                        ),
                        "medianGraduateEarnings10Years": convertToNumeric(
                            institution_data[46], True
                        ),
                    },
                    "retentionRate": {
                        "fourYearInstitution": convertToNumeric(institution_data[48]),
                        "underFourYearInstitution": convertToNumeric(
                            institution_data[49]
                        ),
                    },
                }
            except Exception as e:
                print(f"Skipping institution because of {e}")
            institutions_to_save.append(Item)

        logger.info("Processed: " + str(len(institutions_to_save)))
        print(type(institutions_to_save))

        # remove dupes
        unique_dictionary_collection = {}
        deduped_institutions = []

        for item in institutions_to_save:
            item_id = item["institutionId"]
            if item_id not in unique_dictionary_collection:
                unique_dictionary_collection[item_id] = item
                deduped_institutions.append(item)

        # write to Dynamo
        with table.batch_writer() as batch:
            for item in deduped_institutions:
                batch.put_item(Item=item)

        # write to get queues for AI content creation
        try:
            for item in deduped_institutions:
                # description queue
                sqs.send_message(
                    QueueUrl=DESCRIPTIONS_QUEUE_URL, MessageBody=json.dumps(item)
                )
                # images queue
                sqs.send_message(
                    QueueUrl=IMAGES_QUEUE_URL, MessageBody=json.dumps(item)
                )
        except Exception as e:
            print(e)

    except Exception as err:
        print(err)