uktrade/aioftps3

View on GitHub
healthcheck.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
from collections import (
    namedtuple,
)
from ftplib import (
    FTP_TLS,
)
import logging
import os
import sys
import time

from aiohttp import (
    web,
)


Readable = namedtuple('Readable', ['read'])


async def async_main(loop, logger, healthcheck_port, ftp_host, ftp_port, ftp_user, ftp_password):

    async def handle_alb_healthcheck(_):
        return web.Response(text='OK')

    async def handle_pingdom_healthcheck(_):
        contents = b'healthcheck-test-contents'
        contents_generator = (block for block in [contents])
        file_name = '__HEALTHCHECK_PLEASE_IGNORE__'

        data = bytearray()

        def on_incoming(incoming_data):
            data.extend(bytearray(incoming_data))

        def test_ftp():
            logger.debug('Connecting to %s', ftp_host)
            with FTP_TLS() as ftp:
                ftp.encoding = 'utf-8'
                ftp.connect(host=ftp_host, port=ftp_port)
                logger.debug('Connecting as %s...', ftp_user)
                ftp.login(user=ftp_user, passwd=ftp_password)
                logger.debug('Connecting as %s... (done)', ftp_user)
                ftp.prot_p()  # pylint: disable=no-member

                # In case the previous healthcheck died before we deleted the file
                logger.debug('Fetching list...')
                original_list = ftp_list(ftp)
                logger.debug('Fetching list... (done)')
                in_original_list = [line for line in original_list if file_name in line]
                if in_original_list:
                    ftp.delete(file_name)
                    # Very rough way to deal with eventual consistency. Should be rare that
                    # we hit this case however
                    time.sleep(2)

                logger.debug('STOR %s...', file_name)
                ftp.storbinary(f'STOR {file_name}', ftp_file(contents_generator))
                logger.debug('STOR %s... (done)', file_name)

                logger.debug('Fetching list again...')
                after_store_list = ftp_list(ftp)
                logger.debug('Fetching list again...(done')

                in_after_store_list = [line for line in after_store_list if file_name in line]
                if not in_after_store_list:
                    raise Exception('File not stored')

                logger.debug('RETR %s...', file_name)
                ftp.retrbinary(f'RETR {file_name}', on_incoming)
                logger.debug('RETR %s... (done)', file_name)

                if bytes(data) != contents:
                    raise Exception('File stored incorrectly')

                logger.debug('Deleting %s...', file_name)
                ftp.delete(file_name)
                logger.debug('Deleting %s... (done)', file_name)

        await loop.run_in_executor(None, test_ftp)
        return web.Response(text='OK')

    app = web.Application()
    app.add_routes([
        web.get('/alb_healthcheck', handle_alb_healthcheck),
        web.get('/pingdom_healthcheck', handle_pingdom_healthcheck),
    ])

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, '0.0.0.0', healthcheck_port)
    await site.start()


def ftp_file(generator):
    def read(_):
        try:
            return next(generator)
        except StopIteration:
            return b''

    return Readable(read=read)


def ftp_list(ftp):
    lines = []

    def on_line(line):
        lines.append(line)

    ftp.dir(on_line)

    return lines


def main():
    healthcheck_port = int(os.environ['HEALTHCHECK_PORT'])

    ftp_host = os.environ['FTP_HOST']
    ftp_port = int(os.environ['FTP_COMMAND_PORT'])
    ftp_user = os.environ['FTP_USER']
    ftp_password = os.environ['FTP_PASSWORD']

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    logger.addHandler(handler)

    loop = asyncio.get_event_loop()
    loop.create_task(async_main(loop, logger, healthcheck_port,
                                ftp_host, ftp_port, ftp_user, ftp_password))
    loop.run_forever()


if __name__ == '__main__':
    main()