CenterForOpenScience/waterbutler

View on GitHub
waterbutler/providers/dropbox/provider.py

Summary

Maintainability
C
1 day
Test Coverage
import json
import typing
import logging
from http import HTTPStatus

from waterbutler.core import provider, streams
from waterbutler.core.path import WaterButlerPath
from waterbutler.core import exceptions as core_exceptions

from waterbutler.providers.dropbox import settings as pd_settings
from waterbutler.providers.dropbox import exceptions as pd_exceptions
from waterbutler.providers.dropbox.metadata import (DropboxRevision,
                                                    BaseDropboxMetadata,
                                                    DropboxFileMetadata,
                                                    DropboxFolderMetadata, )

logger = logging.getLogger(__name__)


class DropboxProvider(provider.BaseProvider):
    """Provider for the Dropbox.com cloud storage service.

    This provider uses the v2 Dropbox API. The v2 API assigns IDs to files and folders, but not all
    endpoints currently support IDs. Dropbox WaterButlerPath objects will continue to use string
    paths until they do. As of Jan. 2, 2018, endpoint ID support is classified as follows.

    Can use ID as path::

        /files/get_metadata
        /files/copy_reference/get
        /files/download
        /files/list_revisions
        /files/copy_v2
        /files/move_v2
        /files/delete_v2
        /files/list_folder

    Cannot use ID as path::

        /files/upload
        /files/copy_reference/save
        /files/create_folder_v2

    Does not use path::

        /files/list_folder/continue

    Deprecated API Update (as of Dec. 26th, 2017)::

        /files/copy             --->    /files/copy_v2
        /files/move             --->    /files/move_v2
        /files/delete           --->    /files/delete_v2
        /files/create_folder    --->    /files/create_folder_v2

    API docs: https://www.dropbox.com/developers/documentation/http/documentation

    Quirks: Dropbox paths are case-insensitive.
    """
    NAME = 'dropbox'
    BASE_URL = pd_settings.BASE_URL
    CONTIGUOUS_UPLOAD_SIZE_LIMIT = pd_settings.CONTIGUOUS_UPLOAD_SIZE_LIMIT
    CHUNK_SIZE = pd_settings.CHUNK_SIZE

    def __init__(self, auth, credentials, settings, **kwargs):
        super().__init__(auth, credentials, settings, **kwargs)
        self.token = self.credentials['token']
        self.folder = self.settings['folder']
        self.metrics.add('folder_is_root', self.folder == '/')

    async def dropbox_request(self,
                              url: str,
                              body: dict,
                              expects: typing.Tuple=(200, 409,),
                              *args,
                              **kwargs) -> dict:
        r"""Convenience wrapper around ``BaseProvider.make_request`` for simple Dropbox API calls.
        Sets the method to ``POST``, jsonifies the ``body`` param, and provides default error
        handling for Dropbox's standard 409 error response structure.

        Note: This ``.dropbox_request()`` doesn't fit all requests. For example, download doesn't
              expect a JSON response.  In addition, this wrapper shouldn't be applied to requests
              that clearly don't have the 409 problem even if they have a JSON response.

        TODO: review all ``.dropbox_request()`` and ``.make_request()`` usages in the provider and
              make sure each uses either one properly.  Make changes if necessary.

        :param str url: the url of the endpoint to POST to
        :param dict body: the data to send in the request body, will be jsonified
        :param tuple expects: expected error codes, defaults to 200 (success) and 409 (error)
        :param tuple \*args: passed through to BaseProvider.make_request()
        :param dict \*\*kwargs: passed through to BaseProvider.make_request()
        """
        resp = await self.make_request(
            'POST',
            url,
            data=json.dumps(body),
            expects=expects,
            *args,
            **kwargs,
        )
        data = await resp.json()
        if resp.status == 409:
            self.dropbox_conflict_error_handler(data, body.get('path', ''))
        return data

    def dropbox_conflict_error_handler(self, data: dict, error_path: str='') -> None:
        """Takes a standard Dropbox error response and an optional path and tries to throw a
        meaningful error based on it.

        :param dict data: the error received from Dropbox
        :param str error_path: the path where the error occurred. Base folder will be stripped.
        """

        if error_path.startswith(self.folder):
            error_path = error_path[len(self.folder):]
        if not error_path.startswith('/'):
            error_path = '/{}'.format(error_path)

        if 'error' in data:
            error_class = data['error']['.tag']
            if error_class in data['error']:
                error_type = data['error'][error_class]
                if error_type['.tag'] == 'not_found':
                    raise core_exceptions.NotFoundError(error_path)
                if 'conflict' in error_type:
                    raise pd_exceptions.DropboxNamingConflictError(error_path)
            if data['error'].get('reason', False) and 'conflict' in data['error']['reason']['.tag']:
                raise pd_exceptions.DropboxNamingConflictError(error_path)
        raise pd_exceptions.DropboxUnhandledConflictError(str(data))

    async def validate_v1_path(self, path: str, **kwargs) -> WaterButlerPath:
        if path == '/':
            return WaterButlerPath(path, prepend=self.folder)
        implicit_folder = path.endswith('/')
        data = await self.dropbox_request(
            self.build_url('files', 'get_metadata'),
            {'path': self.folder.rstrip('/') + path.rstrip('/')},
            throws=core_exceptions.MetadataError,
        )
        explicit_folder = data['.tag'] == 'folder'
        if explicit_folder != implicit_folder:
            raise core_exceptions.NotFoundError(str(path))
        return WaterButlerPath(path, prepend=self.folder)

    async def validate_path(self, path: str, **kwargs) -> WaterButlerPath:
        return WaterButlerPath(path, prepend=self.folder)

    def can_duplicate_names(self) -> bool:
        return False

    def shares_storage_root(self, other: provider.BaseProvider) -> bool:
        """Dropbox settings only include the root folder. If a cross-resource move occurs
        between two dropbox providers that are on different accounts but have the same folder
        base name, the parent method could incorrectly think the action is a self-overwrite.
        Comparing credentials means that this is unique per connected account."""
        return super().shares_storage_root(other) and self.credentials == other.credentials

    @property
    def default_headers(self) -> dict:
        return {'Authorization': 'Bearer {}'.format(self.token),
                'Content-Type': 'application/json'}

    async def intra_copy(self,  # type: ignore
                         dest_provider: 'DropboxProvider',
                         src_path: WaterButlerPath,
                         dest_path: WaterButlerPath) \
            -> typing.Tuple[typing.Union[DropboxFileMetadata, DropboxFolderMetadata], bool]:
        dest_folder = dest_provider.folder
        try:
            if self == dest_provider:
                data = await self.dropbox_request(
                    self.build_url('files', 'copy_v2'),
                    {
                        'from_path': src_path.full_path.rstrip('/'),
                        'to_path': dest_path.full_path.rstrip('/'),
                    },
                    expects=(200, 201, 409),
                    throws=core_exceptions.IntraCopyError,
                )
            else:
                from_ref_data = await self.dropbox_request(
                    self.build_url('files', 'copy_reference', 'get'),
                    {'path': src_path.full_path.rstrip('/')},
                    throws=core_exceptions.IntraCopyError,
                )
                from_ref = from_ref_data['copy_reference']

                data = await dest_provider.dropbox_request(
                    self.build_url('files', 'copy_reference', 'save'),
                    {'copy_reference': from_ref, 'path': dest_path.full_path.rstrip('/')},
                    expects=(200, 201, 409),
                    throws=core_exceptions.IntraCopyError,
                )
            data = data['metadata']
        except pd_exceptions.DropboxNamingConflictError:
            await dest_provider.delete(dest_path)
            resp, _ = await self.intra_copy(dest_provider, src_path, dest_path)
            return resp, False

        if data['.tag'] == 'file':
            return DropboxFileMetadata(data, dest_folder), True
        folder = DropboxFolderMetadata(data, dest_folder)
        folder.children = [item for item in await dest_provider.metadata(dest_path)]  # type: ignore
        return folder, True

    async def intra_move(self,  # type: ignore
                         dest_provider: 'DropboxProvider',
                         src_path: WaterButlerPath,
                         dest_path: WaterButlerPath) -> typing.Tuple[BaseDropboxMetadata, bool]:
        if dest_path.full_path.lower() == src_path.full_path.lower():
            # Dropbox does not support changing the casing in a file name
            raise core_exceptions.InvalidPathError(
                'In Dropbox to change case, add or subtract other characters.')

        try:
            data = await self.dropbox_request(
                self.build_url('files', 'move_v2'),
                {
                    'from_path': src_path.full_path.rstrip('/'),
                    'to_path': dest_path.full_path.rstrip('/'),
                },
                expects=(200, 201, 409),
                throws=core_exceptions.IntraMoveError,
            )
            data = data['metadata']
        except pd_exceptions.DropboxNamingConflictError:
            await dest_provider.delete(dest_path)
            resp, _ = await self.intra_move(dest_provider, src_path, dest_path)
            return resp, False

        dest_folder = dest_provider.folder
        if data['.tag'] == 'file':
            return DropboxFileMetadata(data, dest_folder), True
        folder = DropboxFolderMetadata(data, dest_folder)
        folder.children = [item for item in await dest_provider.metadata(dest_path)]  # type: ignore
        return folder, True

    async def download(self,  # type: ignore
                       path: WaterButlerPath,
                       revision: str=None,
                       range: typing.Tuple[int, int]=None,
                       **kwargs) -> streams.ResponseStreamReader:
        """
        Dropbox V2 API Files Download
        https://www.dropbox.com/developers/documentation/http/documentation#files-download

        Request and Response Format: Content-download endpoints
        https://www.dropbox.com/developers/documentation/http/documentation#formats

        According to Dropbox's API docs for files download and content-download endpoints, the file
        content is contained in the response body and the result (metadata about the file) appears
        as JSON in the "Dropbox-API-Result" response header.  As far as the WB Dropbox provider is
        concerned, the header contains the size (in bytes) of the file that ``ResponseStreamReader``
        needs if the "Content-Length" header is not provided.
        """
        path_arg = {"path": ("rev:" + revision if revision else path.full_path)}
        resp = await self.make_request(
            'POST',
            self._build_content_url('files', 'download'),
            headers={'Dropbox-API-Arg': json.dumps(path_arg), 'Content-Type': ''},
            range=range,
            expects=(200, 206, 409,),
            throws=core_exceptions.DownloadError,
        )
        if resp.status == 409:
            data = await resp.json()
            self.dropbox_conflict_error_handler(data)
        if 'Content-Length' not in resp.headers:
            size = json.loads(resp.headers['dropbox-api-result'])['size']
        else:
            size = None  # ResponseStreamReader will extract it from the resp
        return streams.ResponseStreamReader(resp, size=size)

    async def upload(self,  # type: ignore
                     stream: streams.BaseStream,
                     path: WaterButlerPath,
                     conflict: str='replace',
                     **kwargs) -> typing.Tuple[DropboxFileMetadata, bool]:
        """Upload file stream to Dropbox.  If file exceeds `CONTIGUOUS_UPLOAD_SIZE_LIMIT`, Dropbox's
        multipart upload endpoints will be used.
        """
        path, exists = await self.handle_name_conflict(path, conflict=conflict)

        if stream.size > self.CONTIGUOUS_UPLOAD_SIZE_LIMIT:
            data = await self._chunked_upload(stream, path, conflict=conflict)
        else:
            data = await self._contiguous_upload(stream, path, conflict=conflict)

        return DropboxFileMetadata(data, self.folder), not exists

    async def _contiguous_upload(self,
                                 stream: streams.BaseStream,
                                 path: WaterButlerPath,
                                 conflict: str='replace') -> dict:
        """Upload file in a single request.

        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload

        :param stream: the stream to upload
        :param path: the WB path of the file
        :param conflict: whether to replace upon conflict
        :rtype: `dict`
        :return: A dictionary of the metadata about the file just uploaded
        """

        path_arg = {"path": path.full_path}
        if conflict == 'replace':
            path_arg['mode'] = 'overwrite'

        resp = await self.make_request(
            'POST',
            self._build_content_url('files', 'upload'),
            headers={
                'Content-Type': 'application/octet-stream',
                'Dropbox-API-Arg': json.dumps(path_arg),
                'Content-Length': str(stream.size),
            },
            data=stream,
            expects=(200, 409,),
            throws=core_exceptions.UploadError,
        )

        data = await resp.json()
        if resp.status == 409:
            self.dropbox_conflict_error_handler(data, path.path)
        return data

    async def _chunked_upload(self, stream: streams.BaseStream, path: WaterButlerPath,
                              conflict: str='replace') -> dict:
        """Chunked uploading is a 3-step process using Dropbox's "Upload Session".

        First, start a new upload session and receive an upload session ID.
        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-start

        Then, split the file into multiple chunks and upload them across multiple requests.
        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-append

        Finally, when all of the parts have finished uploading, send a complete session request to
        let Dropbox combine the uploaded data and save it to the given file path.
        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-finish

        Quirks:
        1. A single request should not upload more than 150 MB.
        2. The maximum size of a file one can upload to an upload session is 350 GB.
        3. An upload session can be used for a maximum of 48 hours.
        """

        # 1. Create an upload session and retrieves the session id to upload parts.
        session_id = await self._create_upload_session()

        # 2. Upload all parts in the session
        await self._upload_parts(stream, session_id)

        # 3. Complete the session and return the uploaded file's metadata.
        return await self._complete_session(stream, session_id, path, conflict=conflict)

    async def _create_upload_session(self) -> str:
        """Create an upload session for chunked upload.

        "Upload sessions allow you to upload a single file in one or more requests, for example
        where the size of the file is greater than 150 MB. This call starts a new upload session
        with the given data."

        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-start

        :rtype: str
        :return: session identifier
        """

        resp = await self.make_request(
            'POST',
            self._build_content_url('files', 'upload_session', 'start'),
            headers={
                'Content-Type': 'application/octet-stream',
                'Dropbox-API-Arg': json.dumps({'close': False}),
            },
            expects=(200, ),
            throws=core_exceptions.UploadError
        )
        data = await resp.json()
        return data['session_id']

    async def _upload_parts(self, stream: streams.BaseStream, session_id: str) -> None:
        """Determines the necessary partitioning of the stream (based on max chunk size), and
        calls `_upload_part` for each partition.
        """

        upload_args = {
            'close': False,
            'cursor': {'session_id': session_id, 'offset': 0, }
        }

        parts = [self.CHUNK_SIZE for _ in range(0, stream.size // self.CHUNK_SIZE)]
        if stream.size % self.CHUNK_SIZE:
            parts.append(stream.size - (len(parts) * self.CHUNK_SIZE))
        logger.debug('Chunked upload segment sizes: {}'.format(parts))

        last_chunk_size = 0
        for chunk_id, chunk_size in enumerate(parts):
            # Calculates the the ``offset`` that is required for ``/chunked_upload`` and that
            # represents the number of bytes transferred so far. If the offset does not match the
            # expected offset on the server, the server will ignore the request and respond with a
            # 400 error that includes the current offset.
            upload_args['cursor']['offset'] += last_chunk_size  # type: ignore
            logger.debug('  uploading part {} with size {} starting at offset '
                         '{}'.format(chunk_id + 1, chunk_size,
                                     upload_args['cursor']['offset']))  # type: ignore
            await self._upload_part(stream, chunk_size, upload_args)
            last_chunk_size = chunk_size

    async def _upload_part(self, stream: streams.BaseStream,
                           chunk_size: int, upload_args: dict) -> None:
        """Upload one part/chunk of the given stream to Dropbox

        "Append more data to an upload session. When the parameter close is set, this call will
        close the session. A single request should not upload more than 150 MB. ..."

        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-append
        """

        cutoff_stream = streams.CutoffStream(stream, cutoff=chunk_size)

        resp = await self.make_request(
            'POST',
            self._build_content_url('files', 'upload_session', 'append_v2'),
            headers={
                # ``Content-Length`` is required for ``asyncio`` to use inner chunked stream read
                'Content-Length': str(chunk_size),
                'Content-Type': 'application/octet-stream',
                'Dropbox-API-Arg': json.dumps(upload_args),
            },
            data=cutoff_stream,
            expects=(200, ),
            throws=core_exceptions.UploadError
        )

        await resp.release()

    async def _complete_session(self, stream: streams.BaseStream, session_id: str,
                                path: WaterButlerPath, conflict: str='replace') -> dict:
        """Complete the chunked upload session.

        "Finish an upload session and save the uploaded data to the given file path. ... The maximum
        size of a file one can upload to an upload session is 350 GB."

        API Docs: https://www.dropbox.com/developers/documentation/http/documentation#files-upload_session-finish

        :param stream: the stream to upload
        :param session_id: the ID of the chunked upload session
        :param path: the WB path of the file
        :param conflict: whether to replace upon conflict
        :rtype: `dict`
        :return: A dictionary of the metadata about the file just uploaded
        """

        upload_args = {
            'cursor': {'session_id': session_id, 'offset': stream.size, },
            'commit': {"path": path.full_path, },
        }
        if conflict == 'replace':
            upload_args['commit']['mode'] = 'overwrite'

        resp = await self.make_request(
            'POST',
            self._build_content_url('files', 'upload_session', 'finish'),
            headers={
                'Content-Type': 'application/octet-stream',
                'Dropbox-API-Arg': json.dumps(upload_args),
            },
            expects=(200, ),
            throws=core_exceptions.UploadError,
        )

        return await resp.json()

    async def delete(self, path: WaterButlerPath, confirm_delete: int=0,  # type: ignore
                     **kwargs) -> None:  # type: ignore
        """Delete file, folder, or provider root contents

        :param WaterButlerPath path: WaterButlerPath path object for folder
        :param int confirm_delete: Must be 1 to confirm root folder delete
        """
        if path.is_root:
            if confirm_delete == 1:
                return await self._delete_folder_contents(path)
            else:
                raise core_exceptions.DeleteError(
                    'confirm_delete=1 is required for deleting root provider folder',
                    code=400
                )
        await self.dropbox_request(
            self.build_url('files', 'delete_v2'),
            {'path': self.folder.rstrip('/') + '/' + path.path.rstrip('/')},
            throws=core_exceptions.DeleteError,
        )

    async def metadata(self,  # type: ignore
                       path: WaterButlerPath,
                       revision: str=None,
                       **kwargs) \
                       -> typing.Union[BaseDropboxMetadata, typing.List[BaseDropboxMetadata]]:
        full_path = path.full_path.rstrip('/')
        url = self.build_url('files', 'get_metadata')
        body = {'path': full_path}
        if revision:
            body = {'path': 'rev:' + revision}
        elif path.is_folder:
            url = self.build_url('files', 'list_folder')

        if path.is_folder:
            ret = []  # type: typing.List[BaseDropboxMetadata]
            has_more = True
            page_count = 0
            while has_more:
                page_count += 1
                data = await self.dropbox_request(url, body, throws=core_exceptions.MetadataError)
                for entry in data['entries']:
                    if entry['.tag'] == 'folder':
                        ret.append(DropboxFolderMetadata(entry, self.folder))
                    else:
                        ret.append(DropboxFileMetadata(entry, self.folder))
                if not data['has_more']:
                    has_more = False
                else:
                    url = self.build_url('files', 'list_folder', 'continue')
                    body = {'cursor': data['cursor']}
            self.metrics.add('metadata.folder.pages', page_count)
            return ret

        data = await self.dropbox_request(url, body, throws=core_exceptions.MetadataError)
        # Dropbox v2 API will not indicate file/folder if path "deleted"
        if data['.tag'] == 'deleted':
            raise core_exceptions.MetadataError(
                "Could not retrieve '{}'".format(path),
                code=HTTPStatus.NOT_FOUND,
            )

        # Dropbox will match a file or folder by name within the requested path
        if path.is_file and data['.tag'] == 'folder':
            raise core_exceptions.MetadataError(
                "Could not retrieve file '{}'".format(path),
                code=HTTPStatus.NOT_FOUND,
            )

        return DropboxFileMetadata(data, self.folder)

    async def revisions(self, path: WaterButlerPath, **kwargs) -> typing.List[DropboxRevision]:
        # Dropbox v2 API limits the number of revisions returned to a maximum
        # of 100, default 10. Previously we had set the limit to 250.

        data = await self.dropbox_request(
            self.build_url('files', 'list_revisions'),
            {'path': path.full_path.rstrip('/'), 'limit': 100},
            throws=core_exceptions.RevisionsError,
        )
        if data['is_deleted'] is True:
            raise core_exceptions.RevisionsError(
                "Could not retrieve '{}'".format(path),
                code=HTTPStatus.NOT_FOUND,
            )
        if data['is_deleted']:
            return []
        return [DropboxRevision(item) for item in data['entries']]

    async def create_folder(self, path: WaterButlerPath, **kwargs) -> DropboxFolderMetadata:
        """
        :param str path: The path to create a folder at
        """
        WaterButlerPath.validate_folder(path)
        data = await self.dropbox_request(
            self.build_url('files', 'create_folder_v2'),
            {'path': path.full_path.rstrip('/')},
            throws=core_exceptions.CreateFolderError,
        )
        return DropboxFolderMetadata(data['metadata'], self.folder)

    def can_intra_copy(self, dest_provider: provider.BaseProvider,
                       path: WaterButlerPath=None) -> bool:
        return type(self) == type(dest_provider)

    def can_intra_move(self, dest_provider: provider.BaseProvider,
                       path: WaterButlerPath=None) -> bool:
        return self == dest_provider  # dropbox can only intra move on same account

    def _build_content_url(self, *segments, **query):
        return provider.build_url(pd_settings.BASE_CONTENT_URL, *segments, **query)

    async def _delete_folder_contents(self, path: WaterButlerPath, **kwargs) -> None:
        """Delete the contents of a folder. For use against provider root.

        :param WaterButlerPath path: WaterButlerPath path object for folder
        """
        meta = (await self.metadata(path))
        for child in meta:  # type: ignore
            dropbox_path = await self.validate_path(child.path)
            await self.delete(dropbox_path)