fuzeman/byte

View on GitHub
byte/formats/json/tasks.py

Summary

Maintainability
D
2 days
Test Coverage
"""byte - json format tasks module."""

from __future__ import absolute_import, division, print_function

from byte.core.models import StreamTask, StreamReadTask, StreamSelectTask, StreamWriteTask, Task
from byte.formats.json.decoder import JsonDecoder
from byte.formats.json.encoder import JsonEncoder


class JsonTask(StreamTask):
    """JSON task base class."""

    pass


class JsonReadTask(StreamReadTask, JsonTask):
    """JSON read task class."""

    def __init__(self, executor, operation):
        """Create JSON read task.

        :param executor: Executor
        :type executor: byte.executors.core.base.Executor

        :param operation: Read operation
        :type operation: byte.core.models.Operation
        """
        super(JsonReadTask, self).__init__(executor, operation)

        self.decoder = None

    @property
    def state(self):
        """Retrieve task state.

        :rtype: int
        """
        state = super(JsonReadTask, self).state

        if self.decoder is None or state == Task.State.created:
            return Task.State.created

        if self.decoder.closed or state == Task.State.closed:
            return Task.State.closed

        return Task.State.started

    def open(self):
        """Open task."""
        super(JsonReadTask, self).open()

        # Create decoder
        self.decoder = JsonDecoder(self.stream)

    def close(self):
        """Close task."""
        if not super(JsonReadTask, self).close():
            return False

        # Close decoder
        if self.decoder:
            self.decoder.close()

        return True


class JsonSelectTask(StreamSelectTask, JsonReadTask):
    """JSON select task class."""

    def decode(self):
        """Decode items."""
        return self.decoder.items()


class JsonWriteTask(StreamWriteTask, JsonTask):
    """JSON write task class."""

    def __init__(self, executor, operations):
        """Create JSON write task.

        :param executor: Executor
        :type executor: byte.executors.core.base.Executor

        :param operations: Write operations
        :type operations: list of byte.core.models.Operation
        """
        super(JsonWriteTask, self).__init__(executor, operations)

        self.decoder = None

    @property
    def state(self):
        """Retrieve task state.

        :rtype: int
        """
        state = super(JsonWriteTask, self).state

        if self.decoder is None or state == Task.State.created:
            return Task.State.created

        if self.decoder.closed or state == Task.State.closed:
            return Task.State.closed

        return Task.State.started

    def decode(self):
        """Decode items."""
        return self.decoder.items()

    def encode(self, revision, items):
        """Encode items.

        :param revision: Executor revision
        :type revision: byte.executors.file.revision.FileRevision

        :param items: Items to encode
        :type items: dict or list
        """
        # Create encoder
        encoder = JsonEncoder(
            revision.stream,
            indent=4
        )

        # Encode items, and write to revision stream
        encoder.write_dict(items)

    def open(self):
        """Open task."""
        super(JsonWriteTask, self).open()

        # Create decoder
        self.decoder = JsonDecoder(self.stream)

    def close(self):
        """Close task."""
        if not super(JsonWriteTask, self).close():
            return False

        # Close decoder
        if self.decoder:
            self.decoder.close()

        return True