amenezes/isilon-client

View on GitHub
isilon/api/objects.py

Summary

Maintainability
A
0 mins
Test Coverage
from typing import Optional

from isilon import utils
from isilon.api.base import BaseAPI
from isilon.api.metadata import object_metadata


class Objects(BaseAPI):
    async def get(
        self,
        container_name: str,
        object_name: str,
        **kwargs,
    ) -> bytes:
        """Get object content and metadata."""
        kwargs = await self.include_auth_header(**kwargs)
        async with self.http.get(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        ) as resp:
            content: bytes = await resp.content.read()
            return content

    async def get_large(
        self,
        container_name: str,
        object_name: str,
        filename: str,
        chunk_size: int = 50,
        **kwargs,
    ) -> int:
        """Get large object content and metadata."""
        kwargs = await self.include_auth_header(**kwargs)
        async with self.http.get(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        ) as resp:
            with open(filename, "w+b") as f:
                while True:
                    chunk = await resp.content.read(chunk_size)
                    if not chunk:
                        break
                    f.write(chunk)
            return resp.status

    async def presigned_url(self, container_name: str, object_name: str, **kwargs):
        user_key = await self.account_primary_key()
        uri = utils.generate_presigned_uri(
            user_key,
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        )
        return uri

    async def create(
        self,
        container_name: str,
        object_name: str,
        data,
        metadata: Optional[dict] = None,
        **kwargs,
    ) -> int:
        """Create or replace object."""
        kwargs = await self.include_auth_header(**kwargs)
        kwargs = await self._include_object_metadata(metadata, **kwargs)
        async with self.http.put(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            data=data,
            **kwargs,
        ) as resp:
            return resp.status

    async def create_large(
        self,
        container_name: str,
        object_name: str,
        filename: str,
        metadata: Optional[dict] = None,
        **kwargs,
    ) -> int:
        """Create or replace large object."""
        kwargs = await self.include_auth_header(**kwargs)
        kwargs = await self._include_object_metadata(metadata, **kwargs)
        with open(filename, "rb") as f:
            async with self.http.put(
                f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
                data=f,
                **kwargs,
            ) as resp:
                return resp.status

    async def copy(self, container_name, object_name, **kwargs):
        """Copy object."""
        raise NotImplementedError("Operation not supported")

    async def delete(self, container_name: str, object_name, **kwargs) -> int:
        """Delete object."""
        kwargs = await self.include_auth_header(**kwargs)
        async with self.http.delete(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        ) as resp:
            return resp.status

    async def show_metadata(
        self, container_name: str, object_name: str, **kwargs
    ) -> dict:
        """Show object metadata."""
        kwargs = await self.include_auth_header(**kwargs)
        async with self.http.head(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        ) as resp:
            return dict(resp.headers)

    async def update_metadata(
        self,
        container_name: str,
        object_name: str,
        metadata: Optional[dict] = None,
        **kwargs,
    ) -> int:
        """Create or update object metadata."""
        kwargs = await self.include_auth_header(**kwargs)
        kwargs = await self._include_object_metadata(metadata, **kwargs)
        async with self.http.post(
            f"{self.address}/{self.API_VERSION}/AUTH_{self.account}/{container_name}/{object_name}",
            **kwargs,
        ) as resp:
            return resp.status

    async def _include_object_metadata(self, metadata, **kwargs) -> dict:
        if metadata:
            metadata_headers = object_metadata(metadata)
            kwargs["headers"].update(metadata_headers)
        return kwargs