voidfiles/ssshelf

View on GitHub
ssshelf/storages/s3.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import aiobotocore
from retrying import retry

from ssshelf.keys import IndexKey

@retry(stop_max_attempt_number=2)
def retry_client(method, *args, **kwargs):
    return method(*args, **kwargs)


class S3Storage(object):

    def __init__(self, bucket, aws_client_kwargs=None, s3_client=None):
        self.aws_client_kwargs = aws_client_kwargs if aws_client_kwargs else {
            'region_name': 'us-east-1',
        }
        self.bucket = bucket
        self.s3_client = s3_client

    def get_s3_client(self):
        if self.s3_client:
            return self.s3_client

        session = aiobotocore.get_session()
        self.s3_client = session.create_client('s3', **self.aws_client_kwargs)

        return self.s3_client

    async def create_key(self, storage_key, data=None):
        data = data if data else bytes()

        await retry_client(
            self.get_s3_client().put_object,
            Bucket=self.bucket,
            Key=storage_key.as_url_path(),
            Body=data
        )

        return data

    async def remove_key(self, storage_key):
        await retry_client(
            self.get_s3_client().delete_object,
            Bucket=self.bucket,
            Key=storage_key.as_url_path(),
        )

    async def remove_keys(self, storage_keys):
        reqs = []
        for key in storage_keys:
            reqs += [self.remove_key(key)]

        resps = [await x for x in reqs]
        # await retry_client(
        #     self.get_s3_client().delete_objects,
        #     Bucket=self.bucket,
        #     Delete={
        #         "Objects": [{'Key': x} for x in storage_keys],
        #         "Quiet": True,
        #     },
        # )

    async def get_key(self, storage_key):
        try:
            resp = await retry_client(
                self.get_s3_client().get_object,
                Bucket=self.bucket,
                Key=storage_key.as_url_path(),
            )
        except self.get_s3_client().exceptions.NoSuchKey:
            return {
                'data': None,
                'metadata': None,
            }

        return {
            'data': await resp.get('Body').read(),
            'metadata': resp.get('Metadata', {}),
        }

    async def get_keys(self, prefix, max_keys=200, after=None):
        resp = None

        kwargs = {
            "Bucket": self.bucket,
            "MaxKeys": max_keys,
            "Prefix": prefix.as_url_path(),
        }

        if after:
            kwargs['StartAfter'] = after

        resp = await retry_client(
            self.get_s3_client().list_objects_v2,
            **kwargs,
        )

        response_keys = [IndexKey.from_url_path(x['Key']) for x in resp.get('Contents', [])]

        resp_data = {
            'keys': response_keys,
        }

        after = resp.get('IsTruncated')
        if after:
            resp_data['after'] = resp.get('Contents', [])[-1]['Key']

        return resp_data