CMSgov/dpc-app

View on GitHub
lambda/opt-out-import/db.go

Summary

Maintainability
A
1 hr
Test Coverage
package main

import (
    "database/sql"
    "fmt"
    "os"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/ssm"
    "github.com/google/uuid"
    _ "github.com/lib/pq"
    log "github.com/sirupsen/logrus"
)

const (
    Accepted = "Accepted"
    Rejected = "Rejected"
)

func getConsentDbSecrets(dbuser string, dbpassword string) (map[string]string, error) {
    var secretsInfo map[string]string = make(map[string]string)
    if isTesting {
        secretsInfo[dbuser] = os.Getenv("DB_USER_DPC_CONSENT")
        secretsInfo[dbpassword] = os.Getenv("DB_PASS_DPC_CONSENT")
    } else {
        var keynames []*string = make([]*string, 2)
        keynames[0] = &dbuser
        keynames[1] = &dbpassword

        sess, err := session.NewSession(&aws.Config{
            Region: aws.String("us-east-1"),
        })
        if err != nil {
            return nil, fmt.Errorf("getConsentDbSecrets: Error creating AWS session: %w", err)
        }
        ssmsvc := ssm.New(sess)

        withDecryption := true
        params, err := ssmsvc.GetParameters(&ssm.GetParametersInput{
            Names:          keynames,
            WithDecryption: &withDecryption,
        })
        if err != nil {
            return nil, fmt.Errorf("getConsentDbSecrets: Error connecting to parameter store: %w", err)
        }

        // Unknown keys will come back as invalid, make sure we error on them
        if len(params.InvalidParameters) > 0 {
            invalidParamsStr := ""
            for i := 0; i < len(params.InvalidParameters); i++ {
                invalidParamsStr += fmt.Sprintf("%s,\n", *params.InvalidParameters[i])
            }
            return nil, fmt.Errorf("invalid parameters error: %s", invalidParamsStr)
        }

        for _, item := range params.Parameters {
            secretsInfo[*item.Name] = *item.Value
        }
    }

    return secretsInfo, nil
}

func getAssumeRoleArn() (string, error) {
    if isTesting {
        val := os.Getenv("AWS_ASSUME_ROLE_ARN")
        if val == "" {
            return "", fmt.Errorf("AWS_ASSUME_ROLE_ARN must be set during testing")
        }

        return val, nil
    }

    parameterName := fmt.Sprintf("/opt-out-import/dpc/%s/bfd-bucket-role-arn", os.Getenv("ENV"))

    var keynames []*string = make([]*string, 1)
    keynames[0] = &parameterName

    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("us-east-1"),
    })

    if err != nil {
        return "", fmt.Errorf("getAssumeRoleArn: Error creating AWS session: %w", err)
    }

    ssmsvc := ssm.New(sess)

    withDecryption := true
    result, err := ssmsvc.GetParameter(&ssm.GetParameterInput{
        Name:           &parameterName,
        WithDecryption: &withDecryption,
    })

    if err != nil {
        return "", fmt.Errorf("getAssumeRoleArn: Error connecting to parameter store: %w", err)
    }

    arn := *result.Parameter.Value

    if arn == "" {
        return "", fmt.Errorf("getAssumeRoleArn: No value found for bfd-bucket-role-arn")
    }

    return arn, nil
}

func insertResponseFileMetadata(db *sql.DB, optOutMetadata *ResponseFileMetadata) (OptOutFileEntity, error) {
    optOutFile := &OptOutFileEntity{}
    id := uuid.New().String()
    optOutMetadata.FileID = id
    query := `INSERT INTO opt_out_file (id, name, timestamp, import_status, created_at, updated_at) 
        VALUES ($1, $2, $3, 'In-Progress', NOW(), NOW())
        RETURNING id, name, timestamp, import_status`
    row := db.QueryRow(query, id, optOutMetadata.Name, optOutMetadata.Timestamp.Format(time.DateOnly))
    if err := row.Scan(&optOutFile.id, &optOutFile.name, &optOutFile.timestamp, &optOutFile.import_status); err != nil {
        return *optOutFile, fmt.Errorf("insertOptOutMetadata: Query error: %w", err)
    }

    log.Info("Successfully imported opt out file metadata.")
    return *optOutFile, nil
}

func insertConsentRecords(db *sql.DB, optOutFileId string, records []*OptOutRecord) ([]*OptOutRecord, error) {
    createdRecords := []*OptOutRecord{}

    // If there aren't any rows, skip this and update the import_status of the file
    if len(records) > 0 {
        query := `INSERT INTO consent (id, mbi, effective_date, policy_code, loinc_code, opt_out_file_id, created_at, updated_at) 
                VALUES `
        for i, rec := range records {
            query += fmt.Sprintf("('%s', '%s', NOW()::date, '%s', '64292-6', '%s', 'NOW()', 'NOW()')",
                rec.ID, rec.MBI, rec.PolicyCode, optOutFileId)
            if i < len(records)-1 {
                query += ", "
            } else {
                query += "\n"
            }
        }
        query += "RETURNING id, mbi, effective_date, policy_code, opt_out_file_id"

        rows, err := db.Query(query)
        if err != nil {
            if err := updateResponseFileImportStatus(db, optOutFileId, ImportFail); err != nil {
                return createdRecords, fmt.Errorf(
                    "insertConsentRecords: failed to update opt_out_file status to Failed: %w", err)
            }
            return createdRecords, fmt.Errorf("insertConsentRecords: failed to insert to consent table: %w", err)
        }

        for rows.Next() {
            record := OptOutRecord{}
            if err := rows.Scan(&record.ID, &record.MBI, &record.EffectiveDt, &record.PolicyCode, &record.OptOutFileID); err != nil {
                return createdRecords, fmt.Errorf("insertConsentRecords: Failed to read newly created consent records: %w", err)
            }
            record.Status = Accepted
            createdRecords = append(createdRecords, &record)
        }

        // We're inserting all records in one batch, so if there wasn't an error they were all processed successfully
        for _, record := range records {
            record.Status = Accepted
        }

        log.Info("Successfully inserted consent records.")
    } else {
        log.Info("No consent records to insert.")
    }

    err := updateResponseFileImportStatus(db, optOutFileId, ImportComplete)
    if err != nil {
        return createdRecords, fmt.Errorf(
            "insertConsentRecords: failed to update opt_out_file status to Complete: %w", err)
    }
    return createdRecords, err
}

func updateResponseFileImportStatus(db *sql.DB, optOutFileId string, status string) error {
    entity := &OptOutFileEntity{}
    query := `UPDATE opt_out_file
              SET import_status = $1, updated_at = NOW()
              WHERE id = $2
              RETURNING id, import_status`
    row := db.QueryRow(query, status, optOutFileId)
    if err := row.Scan(&entity.id, &entity.import_status); err != nil {
        return fmt.Errorf("updateOptOutFileImportStatus: %w", err)
    }
    return nil
}

func createConnection(dbUser string, dbPassword string) (*sql.DB, error) {
    var dbName string = "dpc_consent"
    var dbHost string = os.Getenv("DB_HOST")
    var dbPort int = 5432
    psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", dbHost, dbPort, dbUser, dbPassword, dbName)

    db, err := sql.Open("postgres", psqlInfo)
    if err != nil {
        return db, fmt.Errorf("createConnection: %w", err)
    }

    return db, err
}