fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/common/helpers/s3_helpers.py

Summary

Maintainability
A
35 mins
Test Coverage
F
45%
import boto3
import io
import logging
import math
import time

from boto3.s3.transfer import TransferConfig, S3Transfer
from botocore.exceptions import ClientError
from django.conf import settings
from pathlib import Path
from typing import List
from botocore.client import BaseClient

from usaspending_api.config import CONFIG

logger = logging.getLogger("script")


def _get_boto3_s3_client(region_name=CONFIG.AWS_REGION) -> BaseClient:
    """Returns the correct boto3 client based on the
    environment.

    Returns:
        BaseClient: Boto3 client implementatoin
    """
    if not CONFIG.USE_AWS:
        boto3_session = boto3.session.Session(
            region_name=region_name,
            aws_access_key_id=CONFIG.AWS_ACCESS_KEY.get_secret_value(),
            aws_secret_access_key=CONFIG.AWS_SECRET_KEY.get_secret_value(),
        )
        s3_client = boto3_session.client(
            service_name="s3",
            region_name=region_name,
            endpoint_url=f"http://{CONFIG.AWS_S3_ENDPOINT}",
        )
    else:
        s3_client = boto3.client(
            service_name="s3",
            region_name=region_name,
            endpoint_url=f"https://{CONFIG.AWS_S3_ENDPOINT}",
        )
    return s3_client


def retrieve_s3_bucket_object_list(bucket_name: str) -> List["boto3.resources.factory.s3.ObjectSummary"]:
    try:
        bucket = get_s3_bucket(bucket_name=bucket_name)
        bucket_objects = list(bucket.objects.all())
    except Exception as e:
        message = (
            f"Problem accessing S3 bucket '{bucket_name}' for deleted records.  Most likely the "
            f"AWS region or bucket name is configured incorrectly."
        )
        logger.exception(message)
        raise RuntimeError(message) from e
    return bucket_objects


def get_s3_bucket(
    bucket_name: str, region_name: str = settings.USASPENDING_AWS_REGION
) -> "boto3.resources.factory.s3.Instance":
    s3 = boto3.resource("s3", region_name=region_name)
    return s3.Bucket(bucket_name)


def access_s3_object(bucket_name: str, obj: "boto3.resources.factory.s3.ObjectSummary") -> io.BytesIO:
    """Return the Bytes of an S3 object"""
    bucket = get_s3_bucket(bucket_name=bucket_name)
    data = io.BytesIO()
    bucket.download_fileobj(obj.key, data)
    data.seek(0)  # Like rewinding a VCR cassette
    return data


def upload_download_file_to_s3(file_path, sub_dir=None):
    bucket = settings.BULK_DOWNLOAD_S3_BUCKET_NAME
    region = settings.USASPENDING_AWS_REGION
    keyname = file_path.name
    multipart_upload(bucket, region, str(file_path), keyname, sub_dir)


def multipart_upload(bucketname, regionname, source_path, keyname, sub_dir=None):
    s3client = boto3.client("s3", region_name=regionname)
    source_size = Path(source_path).stat().st_size
    # Sets the chunksize at minimum ~5MB to sqrt(5MB) * sqrt(source size)
    bytes_per_chunk = max(int(math.sqrt(5242880) * math.sqrt(source_size)), 5242880)
    config = TransferConfig(multipart_chunksize=bytes_per_chunk)
    transfer = S3Transfer(s3client, config)
    file_name = Path(keyname).name
    if sub_dir is not None:
        file_name = f"{sub_dir}/{file_name}"
    transfer.upload_file(source_path, bucketname, file_name, extra_args={"ACL": "bucket-owner-full-control"})


def download_s3_object(
    bucket_name: str,
    key: str,
    file_path: str,
    s3_client: BaseClient = None,
    retry_count: int = 3,
    retry_cooldown: int = 30,
    region_name: str = settings.USASPENDING_AWS_REGION,
):
    """Download an S3 object to a file.
    Args:
        bucket_name: The name of the bucket where the key is located.
        key: The name of the key to download from.
        file_path: The path to the file to download to.
        max_retries: The number of times to retry the download.
        retry_delay: The amount of time in seconds to wait after a failure before retrying.
        region_name: AWS region
    """
    if not s3_client:
        s3_client = _get_boto3_s3_client(region_name)
    for attempt in range(retry_count + 1):
        try:
            s3_client.download_file(bucket_name, key, file_path)
            return
        except ClientError as e:
            logger.info(
                f"Attempt {attempt + 1} of {retry_count + 1} failed to download {key} from bucket {bucket_name}. Error: {e}"
            )
            if attempt < retry_count:
                time.sleep(retry_cooldown)
            else:
                logger.error(f"Failed to download {key} from bucket {bucket_name} after {retry_count + 1} attempts.")
                raise e


def delete_s3_object(bucket_name: str, key: str, region_name: str = settings.USASPENDING_AWS_REGION):
    """Delete an S3 object
    Args:
        bucket_name: The name of the bucket where the key is located.
        key: The name of the key to delete
    """
    s3 = _get_boto3_s3_client(region_name)
    s3.delete_object(Bucket=bucket_name, Key=key)