uktrade/iterable-subprocess

View on GitHub
test_iterable_subprocess.py

Summary

Maintainability
A
0 mins
Test Coverage
import io
import sys
import subprocess
import threading
import time
import unittest
import zipfile

import psutil
import pytest
from threading import Thread

from iterable_subprocess import IterableSubprocessError, iterable_subprocess


def test_cat_not_necessarily_streamed():
    def yield_small_input():
        yield b'first'
        yield b'second'
        yield b'third'

    with iterable_subprocess(['cat'], yield_small_input()) as output:
        assert b''.join(output) == b'firstsecondthird'


def test_cat_streamed():
    latest_input = None

    def yield_input():
        nonlocal latest_input

        for i in range(0, 10000000):
            yield b'*' * 10
            latest_input = i

    with iterable_subprocess(['cat'], yield_input()) as output:
        latest_input_during_output = [latest_input for _ in output]

        # Make sure the input is progressing during the output. In test, there
        # are about 915 steps, so checking that it's greater than 50 shouldm't
        # make this test too flakey
        num_steps = 0
        prev_i = 0
        for i in latest_input_during_output:
            if i != prev_i:
                num_steps += 1
            prev_i = i

        assert num_steps > 50


def test_process_closed_after():
    assert len(psutil.Process().children(recursive=True)) == 0
    with iterable_subprocess(['cat'], ()) as output:
        assert len(psutil.Process().children(recursive=True)) == 1
    assert len(psutil.Process().children(recursive=True)) == 0


def test_exception_from_input_before_yield_propagated():
    def yield_input():
        raise Exception('Something went wrong')

    with pytest.raises(Exception, match='Something went wrong'):
        with iterable_subprocess(['cat'], yield_input()) as output:
            pass


def test_exception_from_input_after_yield_propagated():
    def yield_input():
        yield b'*'
        raise Exception('Something went wrong')

    with pytest.raises(Exception, match='Something went wrong'):
        with iterable_subprocess(['cat'], yield_input()) as output:
            pass


def test_exception_from_input_incorrect_type_propagated():
    def yield_input():
        yield 'this-should-be-bytes'


    with pytest.raises(TypeError):
        with iterable_subprocess(['cat'], yield_input()) as output:
            pass


@pytest.mark.parametrize("size", [
    1, 100, 10000, 1000000,
])
def test_exception_from_output_during_input_iterating_propagates_and_does_not_hang(size):
    event = threading.Event()

    def yield_input():
        while True:
            event.set()
            yield b'*' * size

    with pytest.raises(Exception, match='My error'):
        with iterable_subprocess(['cat'], yield_input()) as output:
            event.wait()
            raise Exception('My error')


@pytest.mark.parametrize("chunk_size", [
    1, 100, 10000, 1000000,
])
@pytest.mark.parametrize("at_iteration", [
    0, 1, 100,
])
def test_exception_from_output_iterating_propagates_and_does_not_hang(at_iteration, chunk_size):
    def yield_input():
        while True:
            yield b'*' * chunk_size

    with pytest.raises(Exception, match='My error'):
        with iterable_subprocess(['cat'], yield_input(), chunk_size=chunk_size) as output:
            for i, chunk in enumerate(output):
                if i == at_iteration:
                    raise Exception('My error')


def test_exception_from_not_found_process_propagated():
    with pytest.raises(FileNotFoundError):
        with iterable_subprocess(['does-not-exist'], ()) as output:
            b''.join(output)


def test_exception_from_return_code():
    with pytest.raises(IterableSubprocessError, match='No such file or directory') as excinfo:
        with iterable_subprocess(['ls', 'does-not-exist'], ()) as output:
            a = b''.join(output)

    assert excinfo.value.returncode > 0
    assert b'No such file or directory' in excinfo.value.stderr


def test_exception_from_context_even_though_return_code_with_long_standard_error():
    with pytest.raises(Exception, match="Another exception"):
        with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(1)'], ()) as output:
            for _ in output:
                pass
            raise Exception('Another exception')


def test_exception_from_return_code_with_long_standard_error():
    with pytest.raises(IterableSubprocessError) as excinfo:
        with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(2)'], ()) as output:
            for _ in output:
                pass

    assert excinfo.value.returncode == 2
    assert len(excinfo.value.stderr) == 65536


def test_if_process_exits_with_non_zero_error_code_and_inner_exception_it_propagates():
    def yield_input():
        while True:
            yield b'*' * 10

    with pytest.raises(Exception, match='Another exception'):
        with iterable_subprocess([
            sys.executable, '-c', 'import sys; print("The error", file=sys.stderr); print("After output"); sys.exit(1)',
        ], yield_input()) as output:
            all_output = b''.join(output)
            raise Exception('Another exception')

    assert all_output == b'After output\n'



def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_broken_pipe_error():
    def yield_input():
        while True:
            yield b'*' * 10

    with pytest.raises(BrokenPipeError):
        with iterable_subprocess([
            sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(0)',
        ], yield_input()) as output:
            all_output = b''.join(output)

    assert all_output == b'After output\n'


def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_iterable_subprocess_error():
    def yield_input():
        while True:
            yield b'*' * 10

    with pytest.raises(IterableSubprocessError) as excinfo:
        with iterable_subprocess([
            sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(3)',
        ], yield_input()) as output:
            all_output = b''.join(output)

    assert all_output == b'After output\n'
    assert excinfo.value.returncode == 3
    assert excinfo.value.stderr == b'The error\n'


def test_program_that_outputs_for_a_long_time_is_interrupted_on_context_exit():
    start = time.monotonic()

    with pytest.raises(IterableSubprocessError) as excinfo:
        with iterable_subprocess([sys.executable, '-c', 'import time; start = time.monotonic()\nwhile (time.monotonic() - start) < 60:\n    print("Output" * 1000)'], ()) as output:
            pass

    end = time.monotonic()

    assert excinfo.value.returncode != 0
    assert b'BrokenPipeError' in excinfo.value.stderr
    assert end - start < 10


def test_program_that_sleeps_exits_quickly_if_exception():
    start = time.monotonic()

    with pytest.raises(Exception, match='From context'):
        with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output:
            raise Exception('From context')

    end = time.monotonic()

    assert end - start < 10


def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt():
    start = time.monotonic()

    with pytest.raises(KeyboardInterrupt, match='From context'):
        with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output:
            raise KeyboardInterrupt('From context')

    end = time.monotonic()

    assert end - start < 10


def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_before_thread_starts(monkeypatch):
    start = time.monotonic()

    def start_that_raises_keyboard_interrupt(self):
        raise KeyboardInterrupt('Just before starting thread')
    monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt)

    with pytest.raises(KeyboardInterrupt, match='Just before starting thread'):
        iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__()

    end = time.monotonic()

    assert end - start < 10


def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_after_thread_starts(monkeypatch):
    start = time.monotonic()

    original_start = Thread.start
    def start_that_raises_keyboard_interrupt(self):
        original_start(self)
        raise KeyboardInterrupt('Just after starting thread')
    monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt)

    with pytest.raises(KeyboardInterrupt, match='Just after starting thread'):
        iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__()

    end = time.monotonic()

    assert end - start < 10


def test_program_that_sleeps_not_quickly_if_no_exception():
    start = time.monotonic()

    with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(2)'], ()) as output:
        pass

    end = time.monotonic()

    assert end - start > 2


def test_funzip_no_compression():
    contents = b'*' * 100000

    def yield_input():
        file = io.BytesIO()
        with zipfile.ZipFile(file, 'w', zipfile.ZIP_STORED) as zf:
            zf.writestr('any.txt', contents)

        yield file.getvalue()

    with iterable_subprocess(['funzip'], yield_input()) as output:
        assert b''.join(output) == contents


def test_funzip_deflate():
    contents = b'*' * 100000

    def yield_input():
        file = io.BytesIO()
        with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf:
            zf.writestr('any.txt', contents)

        yield file.getvalue()

    with iterable_subprocess(['funzip'], yield_input()) as output:
        assert b''.join(output) == contents