christoph2/objutils

View on GitHub
objutils/hexfile.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__version__ = "0.1.1"

__copyright__ = """
    objutils - Object file library for Python.

   (C) 2010-2019 by Christoph Schueler <cpu12.gems@googlemail.com>

   All Rights Reserved

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License along
  with this program; if not, write to the Free Software Foundation, Inc.,
  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""

from collections import defaultdict, namedtuple
from functools import partial
import math
import os
import re

from objutils.section import Section, join_sections
from objutils.image import Image
from operator import itemgetter
from objutils.utils import slicer, create_string_buffer, PYTHON_VERSION
from objutils.logger import Logger


"""
MemoryBlocks
------------
Segment-Types: generic,code,const,bss,... (s. ELF!?)
"""

"""
TODO:   Create a base class for Reader/Writer common functionality.
        FactoryMethods ???
        Parameter ['exceptionsOnErrors'].
"""

SIXTEEN_BITS = 0
TWENTY_BITS = 1
TWENTYFOUR_BITS = 2
THIRTYTWO_BITS = 3

START = 0
LENGTH = 1
TYPE = 2
ADDRESS = 3
DATA = 4
UNPARSED = 5
CHECKSUM = 6
ADDR_CHECKSUM = 7

MAP_GROUP_TO_REGEX = {
    LENGTH: "(?P<length>[0-9a-zA-Z]{%d})",
    TYPE: "(?P<type>\d{%d})",
    ADDRESS: "(?P<address>[0-9a-zA-Z]{%d})",
    DATA: "(?P<chunk>[0-9a-zA-Z]+)",
    UNPARSED: "(?P<chunk>.*)",
    CHECKSUM: "(?P<checksum>[0-9a-zA-Z]{%d})",
    ADDR_CHECKSUM: "(?P<addrChecksum>[0-9a-zA-Z]{%d})",
}

MAP_CHAR_TO_GROUP = {
    "L": LENGTH,
    "T": TYPE,
    "A": ADDRESS,
    "D": DATA,
    "U": UNPARSED,
    "C": CHECKSUM,
    "B": ADDR_CHECKSUM,
}

TYPE_FROM_RECORD = 0

atoi = partial(int, base=16)


class Invalidrecord_typeError(Exception):
    pass


class InvalidRecordLengthError(Exception):
    pass


class InvalidRecordChecksumError(Exception):
    pass


class AddressRangeToLargeError(Exception):
    pass


MetaRecord = namedtuple("MetaRecord", "format_type address chunk")


class FormatParser(object):
    def __init__(self, fmt, data_separator=None):
        self.fmt = fmt
        self.translated_format = []
        self.data_separator = data_separator

    def parse(self):
        group = ""
        prevCh = ""
        for ch in self.fmt:
            if ch != prevCh:
                if group != "":
                    self.translateFormat(group)
                    group = ""
            group += ch
            prevCh = ch
        self.translateFormat(group)
        self.translated_format.append((0, 0, r"(?P<junk>(.*?))$"))  # ??
        ft = "^{0}".format("".join(map(itemgetter(2), self.translated_format)))
        return re.compile(ft, re.DOTALL | re.MULTILINE)

    def translateFormat(self, group):
        group_number = MAP_CHAR_TO_GROUP.get(group[0])
        length = len(group)
        if group_number is None:  # Handle invariants (i.e. fixed chars).
            if group[0] == " ":
                expr = "\s{{{0!s}}}".format(length)
            else:
                expr = group[0] * length
        else:
            expr = MAP_GROUP_TO_REGEX.get(group_number)
            if group_number == START:
                expr = expr % (self.startSign,)
            elif group_number == DATA:
                "(?P<chunk>[0-9a-zA-Z]*)"
                if self.data_separator is not None:
                    expr = "(?P<chunk>[0-9a-zA-Z{0!s}]*)".format(self.data_separator)
                else:
                    pass
            elif group_number == UNPARSED:
                # print expr
                pass
            else:
                expr = expr % (length,)
        self.translated_format.append((group_number, length, expr))


class Container(object):
    def __init__(self):
        self.processing_instructions = []

    def add_processing_instruction(self, pi):
        self.processing_instructions.append(pi)


class BaseType(object):
    def error(self, msg):
        self.logger.error(msg)
        self.valid = False

    def warn(self, msg):
        self.logger.warn(msg)

    def info(self, msg):
        self.logger.info(msg)

    def debug(self, msg):
        self.logger.debug(msg)


class Reader(BaseType):
    ALIGMENT = 0  # 2**n
    DATA_SEPARATOR = None
    VALID_CHARS = re.compile(
        r"^[a-fA-F0-9 :/;,%\n\r!?S]*$"
    )  # General case, fits most formats.

    def __init__(self):
        self.logger = Logger("Reader")
        if isinstance(self.FORMAT_SPEC, str):
            self.formats = [FormatParser(self.FORMAT_SPEC, self.DATA_SEPARATOR).parse()]
        elif isinstance(self.FORMAT_SPEC, (list, tuple)):
            self.formats = []
            for format_type, format in self.FORMAT_SPEC:
                self.formats.append(
                    (format_type, FormatParser(format, self.DATA_SEPARATOR).parse())
                )

    def load(self, fp, **kws):
        if isinstance(fp, str):
            fp = open(fp, "rb")
        if PYTHON_VERSION.major == 3:
            data = self.read(fp)  # .decode()
            if hasattr(fp, "close"):
                fp.close()
            return data
        else:
            data = self.read(fp)
            if hasattr(fp, "close"):
                fp.close()
            return data

    def loads(self, image, **kws):
        if PYTHON_VERSION.major == 3:
            if isinstance(image, str):
                return self.load(create_string_buffer(bytes(image, "ascii")))
            else:
                return self.load(create_string_buffer(image))
        else:
            return self.load(create_string_buffer(image))

    def read(self, fp):
        sections = []
        matched = False
        self.valid = True
        meta_data = defaultdict(list)
        for line_number, line in enumerate(fp.readlines(), 1):
            for format_type, format in self.formats:
                if isinstance(line, bytes):
                    match = format.match(line.decode())
                else:
                    match = format.match(line)
                if match:
                    matched = True
                    container = Container()
                    container.line_number = line_number
                    dict_ = match.groupdict()
                    if dict_ != {}:
                        # Handle scalar values.
                        for key, value in dict_.items():
                            if key not in ("chunk", "junk"):
                                setattr(container, key, atoi(value))
                            elif key == "junk":
                                setattr(container, key, value)
                        if "chunk" in dict_:
                            if self.parseData(container, format_type):
                                chunk = bytearray.fromhex(dict_["chunk"])
                            else:
                                # don't convert/parse stuff like symbols.
                                chunk = dict_["chunk"]
                            dict_.pop("chunk")
                            setattr(container, "chunk", chunk)
                        self.check_line(container, format_type)
                        # this is to handle esoteric stuff like Intel seg:offs addressing and symbols.
                        self.special_processing(container, format_type)
                        if self.is_data_line(container, format_type):
                            sections.append(Section(container.address, container.chunk))
                        else:
                            chunk = (
                                container.chunk if hasattr(container, "chunk") else None
                            )
                            address = (
                                container.address
                                if hasattr(container, "address")
                                else None
                            )
                            meta_data[format_type].append(
                                MetaRecord(format_type, address, chunk)
                            )
                    break
            if not matched:
                self.warn("Ignoring garbage line #{0:d}".format(line_number))
        if sections:
            return Image(sections=join_sections(sections), meta=meta_data)
        else:
            self.error("File seems to be invalid.")
            return Image([], valid=False)

    def _address_space(self, value):
        if value < 2**16:
            return SIXTEEN_BITS
        elif value < 2**20:
            return TWENTY_BITS
        elif value < 2**24:
            return TWENTYFOUR_BITS
        elif value < 2**32:
            return THIRTYTWO_BITS
        else:
            raise ValueError("Unsupported Addressspace size.")

    def maybe_binary_file(self, fp):
        fp.seek(0, os.SEEK_SET)
        header = fp.read(128)
        fp.seek(0, os.SEEK_SET)
        result = not bool(self.VALID_CHARS.match(header.decode()))
        return result

    def probe(self, fp):
        "Determine if object is valid."
        if self.maybe_binary_file(fp):
            return False
        matched = False
        for line_number, line in enumerate(fp.readlines(), 1):
            for format_type, format in self.formats:  # NOTE: Same as in 'read()'!
                if isinstance(line, bytes):
                    match = format.match(line.decode())
                else:
                    match = format.match(line)
                if match:
                    matched = True
                    break
            if matched or line_number > 3:
                break
        return matched

    def probes(self, image):
        if PYTHON_VERSION.major == 3:
            if isinstance(image, str):
                return self.probe(create_string_buffer(bytes(image, "ascii")))
            else:
                return self.probe(create_string_buffer(image))
        else:
            return self.probe(create_string_buffer(image))

    def check_line(self, line, format_type):
        raise NotImplementedError()

    def is_data_line(self, line, format_type):
        raise NotImplementedError()

    def classifyLine(self, line):
        raise NotImplementedError()

    def special_processing(self, line, format_type):
        pass

    def parseData(self, line, format_type):
        return True


class Writer(BaseType):
    def __init__(self):
        self.logger = Logger("Writer")

    def dump(self, fp, image, row_length=16, **kws):  # TODO: rename to bytesPerRow!
        if isinstance(fp, str):
            fp = open(fp, "wb")
        fp.write(self.dumps(image, row_length))
        if hasattr(fp, "close"):
            fp.close()

    def dumps(self, image, row_length=16, **kws):
        result = []
        self.row_length = row_length
        if hasattr(image, "sections") and not image.sections:
            return b""
        if self.calculate_address_bits(image) > self.MAX_ADDRESS_BITS:
            raise AddressRangeToLargeError("could not encode image.")
        params = self.set_parameters(**kws)
        self.pre_processing(image)
        header = self.compose_header(image.meta if hasattr(image, "meta") else {})
        if header:
            result.append(header)
        for section in image:
            address = section.start_address
            rows = slicer(section.data, row_length, lambda x: [int(y) for y in x])
            for row in rows:
                length = len(row)
                result.append(self.compose_row(address, length, row))
                address += row_length
        footer = self.compose_footer(image.meta if hasattr(image, "meta") else {})
        if footer:
            result.append(footer)
        if PYTHON_VERSION.major == 3:
            return self.post_processing(bytes("\n".join(result), "ascii"))
        else:
            return self.post_processing(bytes("\n".join(result)))

    def calculate_address_bits(self, image):
        if hasattr(image, "sections"):
            last_segment = sorted(image.sections, key=lambda s: s.start_address)[-1]
        else:
            last_segment = image
        highest_address = last_segment.start_address + last_segment.length
        return int(math.ceil(math.log(highest_address + 1) / math.log(2)))

    def post_processing(self, data):
        if not data.endswith(b"\n"):
            data += b"\n"
        return data

    def pre_processing(self, image):
        pass

    def set_parameters(self, **kws):
        params = {}
        for k, v in kws.items():
            try:
                params[k] = getattr(self, k)
            except AttributeError:
                raise AttributeError("Invalid keyword argument '{0!s}'.".format(k))
            else:
                setattr(self, k, v)
        return params

    def compose_row(self, address, length, row):
        raise NotImplementedError()

    def compose_header(self, meta):
        return None

    def compose_footer(self, meta):
        return None

    def word_to_bytes(self, word):
        word = int(word)
        h = (word & 0xFF00) >> 8
        l = word & 0x00FF
        return h, l

    @staticmethod
    def hex_bytes(row, spaced=False):
        spacer = " " if spaced else ""
        return spacer.join(["{0:02X}".format(x) for x in row])


class ASCIIHexReader(Reader):
    FORMAT_SPEC = None

    def __init__(self, address_pattern, data_pattern, etx_pattern, separators=", "):
        self.separators = separators
        self.DATA_PATTERN = re.compile(
            data_pattern.format(separators), re.DOTALL | re.MULTILINE
        )
        self.ADDRESS_PATTERN = re.compile(address_pattern, re.DOTALL | re.MULTILINE)
        self.ETX_PATTERN = re.compile(etx_pattern, re.DOTALL | re.MULTILINE)
        self.SPLITTER = re.compile("[{0}]".format(separators))
        self.patterns = (
            (self.ADDRESS_PATTERN, self.getAddress),
            (self.DATA_PATTERN, self.parse_line),
            (self.ETX_PATTERN, self.nop),
        )
        self.formats = [
            (0, self.ADDRESS_PATTERN),
            (1, self.DATA_PATTERN),
            (2, self.ETX_PATTERN),
        ]
        super(ASCIIHexReader, self).__init__()

    def getAddress(self, line, match):
        self.address = int(match.group("address"), 16)
        self.previous_address = self.address
        return True

    def nop(self, line, match):
        return False

    def parse_line(self, line, match):
        section = Section(
            self.address,
            bytearray(
                [int(ch, 16) for ch in filter(lambda x: x, self.SPLITTER.split(line))]
            ),
        )
        self.sections.append(section)
        self.address += len(section)
        return True

    def read(self, fp):
        if PYTHON_VERSION.major == 3:
            lines = fp.read().decode()
        else:
            lines = fp.read()
        self.sections = []
        self.address = 0
        breakRequest = False
        for line in lines.splitlines():
            for pattern, action in self.patterns:
                match = pattern.match(line)
                if match:
                    if not action(line, match):
                        breakRequest = True
                    break
            if breakRequest:
                break
        return Image(join_sections(self.sections))


class ASCIIHexWriter(Writer):
    MAX_ADDRESS_BITS = 16
    previous_address = None

    def __init__(self, address_designator):
        self.separator = " "
        self.address_designator = address_designator
        super(ASCIIHexWriter, self).__init__()

    def compose_row(self, address, length, row):
        prepend_address = True if address != self.previous_address else False
        self.previous_address = address + length
        if prepend_address:
            line = "{0}\n{1}".format(
                "{0}{1:04X}".format(self.address_designator, address),
                "{0}".format(self.separator).join(["{0:02X}".format(x) for x in row]),
            )
        else:
            line = " ".join(["{0:02X}".format(x) for x in row])
        self.row_callout(address, length, row)
        return line

    def row_callout(self, address, length, row):
        pass