borgbackup/borg

View on GitHub
src/borg/testsuite/chunker_test.py

Summary

Maintainability
D
2 days
Test Coverage
# Note: these tests are part of the self test, do not use or import pytest functionality here.
#       See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT

from io import BytesIO

from ..chunker import ChunkerFixed, Chunker, get_chunker, buzhash, buzhash_update
from ..constants import *  # NOQA
from . import BaseTestCase


def cf(chunks):
    """chunk filter"""

    # this is to simplify testing: either return the data piece (bytes) or the hole length (int).
    def _cf(chunk):
        if chunk.meta["allocation"] == CH_DATA:
            assert len(chunk.data) == chunk.meta["size"]
            return bytes(chunk.data)  # make sure we have bytes, not memoryview
        if chunk.meta["allocation"] in (CH_HOLE, CH_ALLOC):
            assert chunk.data is None
            return chunk.meta["size"]
        assert False, "unexpected allocation value"

    return [_cf(chunk) for chunk in chunks]


class ChunkerFixedTestCase(BaseTestCase):
    def test_chunkify_just_blocks(self):
        data = b"foobar" * 1500
        chunker = ChunkerFixed(4096)
        parts = cf(chunker.chunkify(BytesIO(data)))
        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])

    def test_chunkify_header_and_blocks(self):
        data = b"foobar" * 1500
        chunker = ChunkerFixed(4096, 123)
        parts = cf(chunker.chunkify(BytesIO(data)))
        self.assert_equal(
            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
        )

    def test_chunkify_just_blocks_fmap_complete(self):
        data = b"foobar" * 1500
        chunker = ChunkerFixed(4096)
        fmap = [(0, 4096, True), (4096, 8192, True), (8192, 99999999, True)]
        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
        self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]])

    def test_chunkify_header_and_blocks_fmap_complete(self):
        data = b"foobar" * 1500
        chunker = ChunkerFixed(4096, 123)
        fmap = [(0, 123, True), (123, 4096, True), (123 + 4096, 4096, True), (123 + 8192, 4096, True)]
        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
        self.assert_equal(
            parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]]
        )

    def test_chunkify_header_and_blocks_fmap_zeros(self):
        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
        chunker = ChunkerFixed(4096, 123)
        fmap = [(0, 123, True), (123, 4096, False), (123 + 4096, 4096, True), (123 + 8192, 4096, False)]
        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
        # because we marked the '_' ranges as holes, we will get hole ranges instead!
        self.assert_equal(parts, [data[0:123], 4096, data[123 + 4096 : 123 + 8192], 4096])

    def test_chunkify_header_and_blocks_fmap_partial(self):
        data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096
        chunker = ChunkerFixed(4096, 123)
        fmap = [
            (0, 123, True),
            # (123, 4096, False),
            (123 + 4096, 4096, True),
            # (123+8192, 4096, False),
        ]
        parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap))
        # because we left out the '_' ranges from the fmap, we will not get them at all!
        self.assert_equal(parts, [data[0:123], data[123 + 4096 : 123 + 8192]])


class ChunkerTestCase(BaseTestCase):
    def test_chunkify(self):
        data = b"0" * int(1.5 * (1 << CHUNK_MAX_EXP)) + b"Y"
        parts = cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data)))
        self.assert_equal(len(parts), 2)
        self.assert_equal(b"".join(parts), data)
        self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), [])
        self.assert_equal(
            cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"fooba", b"rboobaz", b"fooba", b"rboobaz", b"fooba", b"rboobaz"],
        )
        self.assert_equal(
            cf(Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"fo", b"obarb", b"oob", b"azf", b"oobarb", b"oob", b"azf", b"oobarb", b"oobaz"],
        )
        self.assert_equal(
            cf(Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"foob", b"ar", b"boobazfoob", b"ar", b"boobazfoob", b"ar", b"boobaz"],
        )
        self.assert_equal(
            cf(Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
        )
        self.assert_equal(
            cf(Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"foobar", b"boobazfo", b"obar", b"boobazfo", b"obar", b"boobaz"],
        )
        self.assert_equal(
            cf(Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"foob", b"arboobaz", b"foob", b"arboobaz", b"foob", b"arboobaz"],
        )
        self.assert_equal(
            cf(Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3]
        )
        self.assert_equal(
            cf(Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"foobarbo", b"obazfoobar", b"boobazfo", b"obarboobaz"],
        )
        self.assert_equal(
            cf(Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))),
            [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"],
        )

    def test_buzhash(self):
        self.assert_equal(buzhash(b"abcdefghijklmnop", 0), 3795437769)
        self.assert_equal(buzhash(b"abcdefghijklmnop", 1), 3795400502)
        self.assert_equal(
            buzhash(b"abcdefghijklmnop", 1), buzhash_update(buzhash(b"Xabcdefghijklmno", 1), ord("X"), ord("p"), 16, 1)
        )
        # Test with more than 31 bytes to make sure our barrel_shift macro works correctly
        self.assert_equal(buzhash(b"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", 0), 566521248)

    def test_small_reads(self):
        class SmallReadFile:
            input = b"a" * (20 + 1)

            def read(self, nbytes):
                self.input = self.input[:-1]
                return self.input[:1]

        chunker = get_chunker(*CHUNKER_PARAMS, seed=0)
        reconstructed = b"".join(cf(chunker.chunkify(SmallReadFile())))
        assert reconstructed == b"a" * 20