CMSgov/dpc-app

View on GitHub
lambda/opt-out-import/main.go

Summary

Maintainability
B
4 hrs
Test Coverage
package main

import (
    "bytes"
    "context"
    "fmt"
    "os"
    "time"

    log "github.com/sirupsen/logrus"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    awsv2 "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    stscredsv2 "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
    "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
    s3v2 "github.com/aws/aws-sdk-go-v2/service/s3"
    stsv2 "github.com/aws/aws-sdk-go-v2/service/sts"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials/stscreds"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
    "github.com/aws/aws-sdk-go/service/s3/s3manager"

    "github.com/ianlopshire/go-fixedwidth"
)

var isTesting = os.Getenv("IS_TESTING") == "true"

var createConnectionVar = createConnection

// Used for dependency injection.  Allows us to easily mock these function in unit tests.
type (
    // s3manager.NewUploader.upload
    S3Uploader func(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)

    // fixedWidth.Marshal
    FileMarshaler func(v interface{}) ([]byte, error)
    // fixedWidth.Unmarshal
    FileUnmarshaler func(data []byte, v interface{}) error
)

func main() {
    if isTesting {
        filename := "bfdeft01/dpc/in/T.NGD.DPC.RSP.D240123.T1122001.IN"
        createdOptOutCount, createdOptInCount, confirmationFileName, err := importResponseFile("demo-bucket", filename)
        if err != nil {
            log.Error(err)
        } else {
            log.Infof("Created %d opt outs, %d opt ins, and generated confirmation %s", createdOptOutCount, createdOptInCount, confirmationFileName)
        }
    } else {
        lambda.Start(handler)
    }
}

func handler(ctx context.Context, sqsEvent events.SQSEvent) (string, error) {
    log.SetFormatter(&log.JSONFormatter{
        DisableHTMLEscape: true,
        TimestampFormat:   time.RFC3339Nano,
    })

    s3Event, err := ParseSQSEvent(sqsEvent)

    if err != nil {
        log.Errorf("Failed to parse S3 event: %v", err)
        return "", err
    } else if s3Event == nil {
        log.Info("No S3 event found, skipping safely.")
        return "", nil
    }

    for _, e := range s3Event.Records {
        if e.EventName == "ObjectCreated:Put" {
            createdOptOutCount, createdOptInCount, confirmationFileName, err := importResponseFile(e.S3.Bucket.Name, e.S3.Object.Key)
            logger := log.WithFields(log.Fields{
                "response_filename":      e.S3.Object.Key,
                "created_opt_outs_count": createdOptOutCount,
                "created_opt_ins_count":  createdOptInCount,
                "confirmation_filename":  confirmationFileName,
            })

            if err != nil {
                logger.Errorf("Failed to import response file: %s", err)
                return e.S3.Object.Key, err
            }

            logger.Info("Successfully imported response file and uploaded confirmation file")

            err = deleteS3File(e.S3.Bucket.Name, e.S3.Object.Key)
            if err != nil {
                logger.Errorf("Failed to delete response file after import: %s", err)
            }
            return e.S3.Object.Key, err
        }
    }

    log.Info("No ObjectCreated:Put events found, skipping safely.")
    return "", nil
}

func importResponseFile(bucket string, file string) (int, int, string, error) {
    log.Infof("Importing opt out file: %s (bucket: %s)", file, bucket)
    metadata, err := ParseMetadata(bucket, file)
    if err != nil {
        log.Warningf("Failed to parse opt out file metadata: %s", err)
        return 0, 0, "", err
    }

    dbuser := fmt.Sprintf("/dpc/%s/consent/db_user_dpc_consent", os.Getenv("ENV"))
    dbpassword := fmt.Sprintf("/dpc/%s/consent/db_pass_dpc_consent", os.Getenv("ENV"))
    secrets, err := getConsentDbSecrets(dbuser, dbpassword)
    if err != nil {
        log.Warningf("Failed to get DB secrets: %s", err)
        return 0, 0, "", err
    }

    db, err := createConnectionVar(secrets[dbuser], secrets[dbpassword])
    if err != nil {
        log.Warningf("Failed to create db connection: %s", err)
        return 0, 0, "", err
    }
    if err := db.Ping(); err != nil {
        log.Warningf("Ping error: %s", err)
        return 0, 0, "", err
    }
    defer db.Close()

    optOutFileEntity, err := insertResponseFileMetadata(db, &metadata)
    if err != nil {
        log.Warningf("Failed to insert opt out metadata: %s", err)
        return 0, 0, "", err
    }

    bytes, err := downloadS3File(bucket, file)
    if err != nil {
        log.Warningf("Failed to download opt out file from S3: %s", err)
        if updateStatusErr := updateResponseFileImportStatus(db, optOutFileEntity.id, ImportFail); updateStatusErr != nil {
            return 0, 0, "", updateStatusErr
        }
        return 0, 0, "", err
    }

    records, err := ParseConsentRecords(&metadata, bytes)
    if err != nil {
        log.Warningf("Failed to parse consent records: %s", err)
        if updateStatusErr := updateResponseFileImportStatus(db, optOutFileEntity.id, ImportFail); updateStatusErr != nil {
            return 0, 0, "", updateStatusErr
        }
        return 0, 0, "", err
    }

    createdRecords, err := insertConsentRecords(db, optOutFileEntity.id, records)

    createdOptOutCount := 0
    createdOptInCount := 0

    log.Info("Created consent records with the following ID fields:")
    for _, rec := range createdRecords {
        log.Infof("ID: %s, Opt Out Preference: %s", rec.ID, rec.PolicyCode)
        if rec.PolicyCode == "OPTIN" {
            createdOptInCount++
        } else if rec.PolicyCode == "OPTOUT" {
            createdOptOutCount++
        } else {
            log.Warningf("Unknown policy code saved to database for consent record %s", rec.ID)
        }
    }

    if err != nil {
        log.Warningf("Failed to insert consent records: %s", err)
        return createdOptOutCount, createdOptInCount, "", err
    }

    confirmationFileName := GenerateConfirmationFileName(file, time.Now())
    confirmationFile, err := generateConfirmationFile(true, createdRecords, fixedwidth.Marshal)
    if err != nil {
        log.Warningf("Failed to generate confirmation file: %s", err)
        return createdOptOutCount, createdOptInCount, confirmationFileName, err
    }

    if sess, err := createSession(); err != nil {
        log.Warning("Failed to create session for uploading confirmation file")
        return createdOptOutCount, createdOptInCount, confirmationFileName, err
    } else {
        if err = uploadConfirmationFile(bucket, confirmationFileName, s3manager.NewUploader(sess).Upload, confirmationFile); err != nil {
            log.Warning("Failed to write upload confirmation file")
            return createdOptOutCount, createdOptInCount, confirmationFileName, err
        }
    }

    return createdOptOutCount, createdOptInCount, confirmationFileName, err
}

func createV2Cfg() (*awsv2.Config, error) {
    cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
    assumeRoleArn, err := getAssumeRoleArn()

    if err != nil {
        return nil, err
    }

    client := stsv2.NewFromConfig(cfg)
    creds := stscredsv2.NewAssumeRoleProvider(client, assumeRoleArn)
    cfg.Credentials = awsv2.NewCredentialsCache(creds)
    return &cfg, nil
}

func createSession() (*session.Session, error) {
    sess := session.Must(session.NewSession())
    var err error
    if isTesting {
        sess, err = session.NewSessionWithOptions(session.Options{
            Profile: "default",
            Config: aws.Config{
                Region:           aws.String("us-east-1"),
                S3ForcePathStyle: aws.Bool(true),
                Endpoint:         aws.String("http://localhost:4566"),
            },
        })
    } else {
        assumeRoleArn, err := getAssumeRoleArn()

        if err == nil {
            sess, err = session.NewSession(&aws.Config{
                Region: aws.String("us-east-1"),
                Credentials: stscreds.NewCredentials(
                    sess,
                    assumeRoleArn,
                ),
            })
        }
    }

    if err != nil {
        return nil, err
    }

    return sess, nil
}

func downloadS3File(bucket string, file string) ([]byte, error) {
    if os.Getenv("ENV") == "local" {
        sess, err := createSession()
        if err != nil {
            return []byte{}, err
        }
        downloader := s3manager.NewDownloader(sess)
        buff := &aws.WriteAtBuffer{}
        numBytes, err := downloader.Download(buff, &s3.GetObjectInput{
            Bucket: aws.String(bucket),
            Key:    aws.String(file),
        })
        log.Printf("file downloaded: size=%d", numBytes)
        byte_arr := buff.Bytes()

        return byte_arr, err
    } else {
        cfg, err := createV2Cfg()
        if err != nil {
            return []byte{}, err
        }

        downloader := manager.NewDownloader(s3v2.NewFromConfig(*cfg))
        buff := &aws.WriteAtBuffer{}
        numBytes, err := downloader.Download(context.TODO(), buff, &s3v2.GetObjectInput{
            Bucket: aws.String(bucket),
            Key:    aws.String(file),
        })

        if err == nil {
            log.Printf("file downloaded: size=%d", numBytes)
        }

        return buff.Bytes(), err
    }
}

func generateConfirmationFile(successful bool, records []*OptOutRecord, marshaler FileMarshaler) ([]byte, error) {
    fileCreationDate := time.Now().Format("20060102")
    fileHeader := FileHeader{
        HeaderCode:       "HDR_BENECONFIRM",
        FileCreationDate: fileCreationDate,
    }

    fileTrailer := FileTrailer{
        TrailerCode:       "TRL_BENECONFIRM",
        FileCreationDate:  fileCreationDate,
        DetailRecordCount: fmt.Sprintf("%010d", len(records)),
    }

    var rows []ConfirmationFileRow
    for _, record := range records {
        sharingPreference := "N"
        if record.PolicyCode == "OPTIN" {
            sharingPreference = "Y"
        }
        recordStatus := "Accepted"
        reasonCode := "00"
        if !successful {
            recordStatus = "Rejected"
            reasonCode = "02"
        }

        row := ConfirmationFileRow{
            MBI:               record.MBI,
            EffectiveDate:     record.EffectiveDt.Format("20060102"),
            SharingPreference: sharingPreference,
            RecordStatus:      recordStatus,
            ReasonCode:        reasonCode,
        }

        rows = append(rows, row)
    }

    formattedHeader, err := marshaler(fileHeader)
    if err != nil {
        return []byte{}, err
    }
    formattedHeader = append(formattedHeader, "\n"...)
    formattedRows, err := marshaler(rows)
    if err != nil {
        return []byte{}, err
    }
    formattedRows = append(formattedRows, "\n"...)
    formattedTrailer, err := marshaler(fileTrailer)
    if err != nil {
        return []byte{}, err
    }

    output := append(formattedHeader, formattedRows...)
    return append(output, formattedTrailer...), nil
}

func uploadConfirmationFile(bucket string, file string, uploader S3Uploader, confirmationFile []byte) error {
    _, err := uploader(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(file),
        Body:   bytes.NewReader(confirmationFile),
    })
    return err
}

func deleteS3File(bucket string, file string) error {
    sess, err := createSession()
    if err != nil {
        return err
    }
    svc := s3.New(sess)
    _, err = svc.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(bucket), Key: aws.String(file)})
    if err != nil {
        log.Errorf("Unable to delete object: %v", err)
        return err
    }

    err = svc.WaitUntilObjectNotExists(&s3.HeadObjectInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(file),
    })
    if err != nil {
        log.Warningf("Error occurred while waiting for object %q to be deleted, %v", file, err)
        return err
    }

    log.Printf("Object %q successfully deleted\n", file)
    return nil
}