uktrade/aioftps3

View on GitHub
tests.py

Summary

Maintainability
D
2 days
Test Coverage
A
99%
import asyncio
from collections import (
    namedtuple,
)
from ftplib import (
    FTP_TLS,
    error_perm,
    error_temp,
)
import logging
import os
import random
import re
import ssl
import sys
import unittest

from aiohttp import (
    web,
)

from aioftps3.server_main import (
    async_main,
)


def async_test(func):
    def wrapper(*args, **kwargs):
        future = func(*args, **kwargs)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(future)
    return wrapper


Readable = namedtuple('Readable', ['read'])
LIST_REGEX = '^(p|d)rw(?:-|x)rw(?:-|x)rw(?:-|x) 1 none none +(\\d+) ([a-zA-Z]{3}) +' \
             '(\\d+) (\\d\\d:\\d\\d) (.*)'


class TestAioFtpS3(unittest.TestCase):

    def add_async_cleanup(self, loop, coroutine):
        self.addCleanup(loop.run_until_complete, coroutine())

    async def setup_manual(self):
        loop = asyncio.get_event_loop()

        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
        handler = logging.StreamHandler(sys.stdout)
        handler.setLevel(logging.DEBUG)
        logger.handlers = []
        logger.addHandler(handler)

        async def metadata_task_route(_):
            return web.Response(text='{"Containers":[{"Networks":[{"IPv4Addresses":[""]}]}]}')
        metadata_app = web.Application()
        metadata_app.add_routes([web.get('/metadata/task', metadata_task_route)])

        metadata_runner = web.AppRunner(metadata_app)
        await metadata_runner.setup()
        metadata_site = web.TCPSite(metadata_runner, '0.0.0.0', 8080)
        await metadata_site.start()

        async def route53_rrset(_):
            return web.Response(text='<Id>/the-id</Id>')

        async def route53_record(_):
            return web.Response(text='<Status>Something</Status>')
        route53_app = web.Application()
        route53_app.add_routes(
            [web.post('/2013-04-01/hostedzone/the-zone-id/rrset/', route53_rrset)])
        route53_app.add_routes([web.get('/2013-04-01/the-id', route53_record)])

        route53_runner = web.AppRunner(route53_app)
        await route53_runner.setup()
        ssl_context = ssl.SSLContext()
        ssl_context.load_cert_chain('route53.crt', 'route53.key')
        route53_site = web.TCPSite(route53_runner, '0.0.0.0', 8081, ssl_context=ssl_context)
        await route53_site.start()

        listening = asyncio.Event()
        server = loop.create_task(async_main(loop, env(), logger, listening))
        await listening.wait()

        def delete_everything(ftp):
            lines = ftp_list(ftp)
            for line in lines:
                match = re.match(LIST_REGEX, line)
                func_name = 'delete' if match[1] == 'p' else 'rmd'
                getattr(ftp, func_name)(match[6])

        await ftp_run(delete_everything, loop=loop, user='my-user', passwd=get_password())

        async def cancel_server():
            server.cancel()
            await metadata_runner.cleanup()
            await route53_runner.cleanup()
            await asyncio.sleep(0)

        self.add_async_cleanup(loop, cancel_server)

        return loop

    @async_test
    async def test_if_correct_creds_login_succeeds(self):
        loop = await self.setup_manual()

        def nothing(_):
            pass

        await ftp_run(nothing, loop=loop, user='my-user', passwd=get_password())
        # Will raise if fails

    @async_test
    async def test_if_bad_pass_login_fails(self):
        loop = await self.setup_manual()

        def nothing(_):
            pass

        with self.assertRaises(error_perm):
            await ftp_run(nothing, loop=loop, user='my-user', passwd='not-my-password')

    @async_test
    async def test_if_bad_user_login_fails(self):
        loop = await self.setup_manual()

        def nothing(_):
            pass

        with self.assertRaises(error_perm):
            await ftp_run(nothing, loop=loop, user='not-my-user', passwd=get_password())

    @async_test
    async def test_empty_list_root_directory(self):
        loop = await self.setup_manual()

        lines = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(lines, [])

    @async_test
    async def test_stor_then_list_and_retr(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        def stor_then_list(ftp):
            ftp.storbinary('STOR my ยฃ ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ.bin', file(contents))
            return ftp_list(ftp)

        lines = await ftp_run(stor_then_list, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(len(lines), 1)
        match = re.match(LIST_REGEX, lines[0])
        self.assertEqual(match[1], 'p')
        self.assertEqual(match[6], 'my ยฃ ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ.bin')

        data = bytearray()

        def on_incoming(incoming_data):
            data.extend(bytearray(incoming_data))

        def get_data(ftp):
            ftp.retrbinary('RETR my ยฃ ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ.bin', on_incoming)

        await ftp_run(get_data, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(data, b'Some contents')

    @async_test
    async def test_if_dir_not_exist_then_no_stor(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        def stor(ftp):
            ftp.storbinary('STOR subdirectory/file.bin', file(contents))

        with self.assertRaises(error_temp):
            await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

    @async_test
    async def test_if_rest_0_can_store(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        def stor(ftp):
            ftp.sendcmd('REST 0')
            ftp.storbinary('STOR file.bin', file(contents))

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())
        lines = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        self.assertIn('file.bin', lines[0])
        self.assertIn(str(len(b'Some contents')), lines[0])

    @async_test
    async def test_if_rest_0_can_retr(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        data = bytearray()

        def on_incoming(incoming_data):
            data.extend(bytearray(incoming_data))

        def stor(ftp):
            ftp.storbinary('STOR file.bin', file(contents))
            ftp.sendcmd('REST 0')
            ftp.retrbinary('RETR file.bin', on_incoming)

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(data, b'Some contents')

    @async_test
    async def test_if_rest_not_0_disconnects(self):
        loop = await self.setup_manual()

        def stor(ftp):
            ftp.sendcmd('REST 1')

        with self.assertRaises(BaseException):
            await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

    @async_test
    async def test_create_and_delete_directories(self):
        loop = await self.setup_manual()

        def create_directory(ftp):
            ftp.mkd('my-"  ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐdir')

        await ftp_run(create_directory, loop=loop, user='my-user', passwd=get_password())

        lines = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(len(lines), 1)
        match = re.match(LIST_REGEX, lines[0])
        self.assertEqual(match[1], 'd')
        self.assertEqual(match[6], 'my-"  ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐdir')

        def delete_directory(ftp):
            ftp.rmd('my-"  ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐdir')

        await ftp_run(delete_directory, loop=loop, user='my-user', passwd=get_password())
        lines_after_del = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(len(lines_after_del), 0)

    @async_test
    async def test_delete_must_have_file_specified(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        def stor_then_delete_all(ftp):
            ftp.storbinary('STOR my-file.bin', file(contents))
            ftp.delete('')

        with self.assertRaises(BaseException):
            await ftp_run(stor_then_delete_all, loop=loop, user='my-user', passwd=get_password())

        lines = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        match = re.match(LIST_REGEX, lines[0])
        self.assertEqual(match[6], 'my-file.bin')

    @async_test
    async def test_rmdir_must_have_file_specified(self):
        loop = await self.setup_manual()

        contents = (block for block in [b'Some contents'])

        def stor_then_rmdir_all(ftp):
            ftp.storbinary('STOR my-file.bin', file(contents))
            ftp.rmdir('')

        with self.assertRaises(BaseException):
            await ftp_run(stor_then_rmdir_all, loop=loop, user='my-user', passwd=get_password())

        lines = await ftp_run(ftp_list, loop=loop, user='my-user', passwd=get_password())
        match = re.match(LIST_REGEX, lines[0])
        self.assertEqual(match[6], 'my-file.bin')

    @async_test
    async def test_if_parent_dir_not_exist_then_no_mkdir(self):
        loop = await self.setup_manual()

        def mkd(ftp, directory):
            ftp.mkd(directory)

        with self.assertRaises(BaseException):
            await ftp_run(mkd, 'subdirectory/new-dir',
                          loop=loop, user='my-user', passwd=get_password())

        await ftp_run(mkd, 'subdirectory', loop=loop, user='my-user', passwd=get_password())

    @async_test
    async def test_hierarchy_stor_and_rename(self):
        loop = await self.setup_manual()

        lines_1 = []
        lines_2 = []
        lines_3 = []
        cwd_1 = None
        cwd_2 = None
        cwd_3 = None

        def stor(ftp):
            nonlocal lines_1
            nonlocal lines_2
            nonlocal lines_3
            nonlocal cwd_1
            nonlocal cwd_2
            nonlocal cwd_3

            ftp.storbinary('STOR file-1.bin', file((block for block in [b'Contents 1'])))
            ftp.mkd('๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ"dir')
            ftp.cwd('๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ"dir')
            cwd_1 = ftp.pwd()
            ftp.storbinary('STOR file-2.bin', file((block for block in [b'Contents 2'])))
            ftp.storbinary('STOR file-3.bin', file((block for block in [b'Contents 3'])))
            ftp.mkd('subdir')
            ftp.cwd('subdir')
            ftp.storbinary('STOR file-4.bin', file((block for block in [b'Contents 4'])))
            ftp.cwd('..')
            ftp.cwd('..')
            ftp.rename('๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ"dir', '"another dir"')
            lines_1 = ftp_list(ftp)
            ftp.cwd('"another dir"')
            cwd_2 = ftp.pwd()
            lines_2 = ftp_list(ftp)
            ftp.cwd('subdir')
            cwd_3 = ftp.pwd()
            lines_3 = ftp_list(ftp)

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

        self.assertEqual(cwd_1, '/๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ"dir')
        self.assertEqual(cwd_2, '/"another dir"')
        self.assertEqual(cwd_3, '/"another dir"/subdir')
        self.assertEqual(len(lines_1), 2)
        self.assertIn('"another dir"', lines_1[0])
        self.assertIn('file-1.bin', lines_1[1])
        self.assertEqual(len(lines_2), 3)
        self.assertIn('subdir', lines_2[0])
        self.assertIn('file-2.bin', lines_2[1])
        self.assertIn('file-3.bin', lines_2[2])
        self.assertEqual(len(lines_3), 1)
        self.assertIn('file-4.bin', lines_3[0])

    @async_test
    async def test_100mb_file(self):
        loop = await self.setup_manual()

        def random_bytes(num_bytes):
            return bytes(random.getrandbits(8) for _ in range(num_bytes))

        def random_file():
            random.seed(a=1234)
            contents = (random_bytes(128) * 64 for _ in range(0, 12928))
            return file(contents)

        def stor(ftp):
            ftp.storbinary('STOR my ยฃ" ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ.bin', random_file())

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

        correct_file = random_file()
        correct = b''
        downloaded = b''
        all_equal = True
        num_checked = 0

        def on_incoming(incoming):
            nonlocal correct
            nonlocal downloaded
            nonlocal all_equal
            nonlocal num_checked

            downloaded += incoming

            while len(correct) < len(downloaded):
                correct += correct_file.read(None)

            num_to_check = min(len(downloaded), len(correct))
            all_equal = all_equal and downloaded[:num_to_check] == correct[:num_to_check]

            downloaded = downloaded[num_to_check:]
            correct = correct[num_to_check:]
            num_checked += num_to_check

        def get_data(ftp):
            ftp.retrbinary('RETR my ยฃ" ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ ๐Ÿฐ.bin', on_incoming)

        await ftp_run(get_data, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(num_checked, 105906176)
        self.assertTrue(all_equal)

    @async_test
    async def test_curl_upload_file(self):
        loop = await self.setup_manual()

        def get_random_bytes(num_bytes):
            return bytes(random.getrandbits(8) for _ in range(num_bytes))

        random_bytes = get_random_bytes(100)
        downloaded = b''

        def on_incoming(incoming):
            nonlocal downloaded
            downloaded += incoming

        def get_data(ftp):
            ftp.retrbinary('RETR target.bin', on_incoming)

        proc = await subprocess([
            'curl', '--ftp-ssl', '-k', '--disable-epsv',
            'ftp://my-user:' + get_password() + '@localhost:8021/target.bin',
            '-T', '-',
        ])
        await proc.communicate(random_bytes)
        await ftp_run(get_data, loop=loop, user='my-user', passwd=get_password())
        self.assertEqual(downloaded, random_bytes)

    @async_test
    async def test_curl_download_file(self):
        loop = await self.setup_manual()

        def get_random_bytes(num_bytes):
            return bytes(random.getrandbits(8) for _ in range(num_bytes))

        random_bytes = get_random_bytes(100)

        def random_file():
            contents = (random_bytes for _ in range(0, 1))
            return file(contents)

        def stor(ftp):
            ftp.storbinary('STOR my-target-file.bin', random_file())

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

        downloaded = b''

        proc = await subprocess([
            'curl', '--ftp-ssl', '-k', '--disable-epsv',
            'ftp://my-user:' + get_password() + '@localhost:8021/my-target-file.bin',
        ])
        downloaded, _ = await proc.communicate()
        self.assertEqual(downloaded, random_bytes)

    @async_test
    async def test_curl_list(self):
        loop = await self.setup_manual()

        def get_random_bytes(num_bytes):
            return bytes(random.getrandbits(8) for _ in range(num_bytes))

        random_bytes = get_random_bytes(100)

        def random_file():
            contents = (random_bytes for _ in range(0, 1))
            return file(contents)

        def stor(ftp):
            ftp.storbinary('STOR my-target-file.bin', random_file())

        await ftp_run(stor, loop=loop, user='my-user', passwd=get_password())

        proc = await subprocess([
            'curl', '--ftp-ssl', '-k', '--disable-epsv',
            'ftp://my-user:' + get_password() + '@localhost:8021/',
        ])
        listing, _ = await proc.communicate()
        self.assertIn(b'my-target-file.bin', listing)


def file(generator):
    def read(_):
        try:
            return next(generator)
        except StopIteration:
            return b''

    return Readable(read=read)


def ftp_list(ftp):
    lines = []

    def on_line(line):
        lines.append(line)

    ftp.dir(on_line)

    return lines


async def ftp_run(func, *args, loop, user, passwd):
    def task():
        with FTP_TLS() as ftp:
            ftp.encoding = 'utf-8'
            ftp.connect(host='localhost', port=8021)
            ftp.login(user=user, passwd=passwd)
            ftp.prot_p()  # pylint: disable=no-member
            return func(ftp, *args)

    return await loop.run_in_executor(None, task)


async def subprocess(command):
    return await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )


def env():
    return {
        'AWS_AUTH_MECHANISM': 'secret_access_key',
        'AWS_ACCESS_KEY_ID': 'AKIAIOSFODNN7EXAMPLE',
        'AWS_SECRET_ACCESS_KEY': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
        'AWS_S3_BUCKET__REGION': 'us-east-1',
        'AWS_S3_BUCKET__HOST': 'localhost:9000',
        'AWS_S3_BUCKET__NAME': 'my-bucket',
        'AWS_S3_BUCKET__DIR_SUFFIX': '/.s3keep',
        'AWS_S3_BUCKET__VERIFY_CERTS': 'false',
        'AWS_S3_ACME_BUCKET__REGION': 'us-east-1',
        'AWS_S3_ACME_BUCKET__HOST': 'localhost:9000',
        'AWS_S3_ACME_BUCKET__NAME': 'my-bucket-acme',
        'AWS_S3_ACME_BUCKET__VERIFY_CERTS': 'false',
        'AWS_ROUTE_53__HOST': 'localhost:8081',
        'AWS_ROUTE_53__PRIVATE_DOMAIN': 'untested',
        'AWS_ROUTE_53__REGION': 'untested',
        'AWS_ROUTE_53__VERIFY_CERTS': 'false',
        'AWS_ROUTE_53__ZONE_ID': 'the-zone-id',
        'ACME_DIRECTORY': 'untested',
        'ACME_PATH': os.environ['PWD'],
        'ECS_CONTAINER_METADATA_URI': 'http://127.0.0.1:8080/metadata',
        'FTP_USERS__1__LOGIN': 'my-user',
        'FTP_USERS__1__PASSWORD_HASHED': 'N3HmktqTFxH6RArbScmnwQH3/S3Ow593NFdSVrftp2M=',
        'FTP_USERS__1__PASSWORD_SALT':
            'np1RamJvq2S9YwvvqC5o59fQDFgn4IcBfzmSwJWHvoPMwWCVRUzMePceRbL9FMOT',
        'FTP_COMMAND_PORT': '8021',
        'FTP_DATA_PORTS_FIRST': '4001',
        'FTP_DATA_PORTS_COUNT': '2',
        'FTP_DATA_CIDR_TO_DOMAINS__1__CIDR': '0.0.0.0/0',
        'FTP_DATA_CIDR_TO_DOMAINS__1__DOMAIN': '127.0.0.1',
        'HEALTHCHECK_PORT': '8022',
    }


def get_password():
    return 'kOcAeOQ7Pc'