rbnvrw/nd2reader

View on GitHub
nd2reader/common.py

Summary

Maintainability
A
25 mins
Test Coverage
import os
import struct
import array
from datetime import datetime
import six
import re
from nd2reader.exceptions import InvalidVersionError


def get_version(fh):
    """Determines what version the ND2 is.

    Args:
        fh: File handle of the .nd2 file

    Returns:
        tuple: Major and minor version

    """
    # the first 16 bytes seem to have no meaning, so we skip them
    fh.seek(16)

    # the next 38 bytes contain the string that we want to parse. Unlike most of the ND2, this is in UTF-8
    data = fh.read(38).decode("utf8")
    return parse_version(data)


def parse_version(data):
    """Parses a string with the version data in it.

    Args:
        data (unicode): the 19th through 54th byte of the ND2, representing the version

    Returns:
        tuple: Major and minor version

    """
    match = re.search(r"""^ND2 FILE SIGNATURE CHUNK NAME01!Ver(?P<major>\d)\.(?P<minor>\d)$""", data)

    if match:
        # We haven't seen a lot of ND2s but the ones we have seen conform to this
        return int(match.group('major')), int(match.group('minor'))

    raise InvalidVersionError("The version of the ND2 you specified is not supported.")


def read_chunk(fh, chunk_location):
    """Reads a piece of data given the location of its pointer.

    Args:
        fh: an open file handle to the ND2
        chunk_location (int): location to read

    Returns:
        bytes: the data at the chunk location

    """
    if chunk_location is None or fh is None:
        return None
    fh.seek(chunk_location)
    # The chunk metadata is always 16 bytes long
    chunk_metadata = fh.read(16)
    header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
    if header != 0xabeceda:
        raise ValueError("The ND2 file seems to be corrupted.")
    # We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
    # start of the actual data field, which is at some arbitrary place after the metadata.
    fh.seek(chunk_location + 16 + relative_offset)
    return fh.read(data_length)


def read_array(fh, kind, chunk_location):
    """

    Args:
        fh: File handle of the nd2 file
        kind: data type, can be one of 'double', 'int' or 'float'
        chunk_location: the location of the array chunk in the binary nd2 file

    Returns:
        array.array: an array of the data

    """
    kinds = {'double': 'd',
             'int': 'i',
             'float': 'f'}
    if kind not in kinds:
        raise ValueError('You attempted to read an array of an unknown type.')
    raw_data = read_chunk(fh, chunk_location)
    if raw_data is None:
        return None
    return array.array(kinds[kind], raw_data)


def _parse_unsigned_char(data):
    """

    Args:
        data: binary data

    Returns:
        char: the data converted to unsigned char

    """
    return struct.unpack("B", data.read(1))[0]


def _parse_unsigned_int(data):
    """

        Args:
            data: binary data

        Returns:
            int: the data converted to unsigned int

        """
    return struct.unpack("I", data.read(4))[0]


def _parse_unsigned_long(data):
    """

        Args:
            data: binary data

        Returns:
            long: the data converted to unsigned long

        """
    return struct.unpack("Q", data.read(8))[0]


def _parse_double(data):
    """

        Args:
            data: binary data

        Returns:
            double: the data converted to double

        """
    return struct.unpack("d", data.read(8))[0]


def _parse_string(data):
    """

        Args:
            data: binary data

        Returns:
            string: the data converted to string

        """
    value = data.read(2)
    # the string ends at the first instance of \x00\x00
    while not value.endswith(six.b("\x00\x00")):
        next_data = data.read(2)
        if len(next_data) == 0:
            break
        value += next_data

    try:
        decoded = value.decode("utf16")[:-1].encode("utf8")
    except UnicodeDecodeError:
        decoded = value.decode('utf8').encode("utf8")

    return decoded


def _parse_char_array(data):
    """

        Args:
            data: binary data

        Returns:
            array.array: the data converted to an array

        """
    array_length = struct.unpack("Q", data.read(8))[0]
    return array.array("B", data.read(array_length))


def parse_date(text_info):
    """
    The date and time when acquisition began.

    Args:
        text_info: the text that contains the date and time information

    Returns:
        datetime: the date and time of the acquisition

    """
    for line in text_info.values():
        line = line.decode("utf8")
        # ND2s seem to randomly switch between 12- and 24-hour representations.
        possible_formats = ["%m/%d/%Y  %H:%M:%S", "%m/%d/%Y  %I:%M:%S %p", "%d/%m/%Y %H:%M:%S"]
        for date_format in possible_formats:
            try:
                absolute_start = datetime.strptime(line, date_format)
            except (TypeError, ValueError):
                continue
            return absolute_start

    return None


def _parse_metadata_item(data, cursor_position):
    """Reads hierarchical data, analogous to a Python dict.

    Args:
        data: the binary data that needs to be parsed
        cursor_position: the position in the binary nd2 file

    Returns:
        dict: a dictionary containing the metadata item

    """
    new_count, length = struct.unpack("<IQ", data.read(12))
    length -= data.tell() - cursor_position
    next_data_length = data.read(length)
    value = read_metadata(next_data_length, new_count)

    # Skip some offsets
    data.read(new_count * 8)

    return value


def _get_value(data, data_type, cursor_position):
    """ND2s use various codes to indicate different data types, which we translate here.

    Args:
        data: the binary data
        data_type: the data type (unsigned char = 1, unsigned int = 2 or 3, unsigned long = 5, double = 6, string = 8,
         char array = 9, metadata item = 11)
        cursor_position: the cursor position in the binary nd2 file

    Returns:
        mixed: the parsed value

    """
    parser = {1: _parse_unsigned_char,
              2: _parse_unsigned_int,
              3: _parse_unsigned_int,
              5: _parse_unsigned_long,
              6: _parse_double,
              8: _parse_string,
              9: _parse_char_array,
              11: _parse_metadata_item}
    try:
        value = parser[data_type](data) if data_type < 11 else parser[data_type](data, cursor_position)
    except (KeyError, struct.error):
        value = None

    return value


def read_metadata(data, count):
    """
    Iterates over each element of some section of the metadata and parses it.

    Args:
        data: the metadata in binary form
        count: the number of metadata elements

    Returns:
        dict: a dictionary containing the parsed metadata

    """
    if data is None:
        return None

    data = six.BytesIO(data)
    metadata = {}

    for _ in range(count):
        cursor_position = data.tell()
        header = data.read(2)

        if not header:
            # We've reached the end of some hierarchy of data
            break

        data_type, name_length = struct.unpack('BB', header)
        name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
        value = _get_value(data, data_type, cursor_position)

        metadata = _add_to_metadata(metadata, name, value)

    return metadata


def _add_to_metadata(metadata, name, value):
    """
    Add the name value pair to the metadata dict

    Args:
        metadata (dict): a dictionary containing the metadata
        name (string): the dictionary key
        value: the value to add

    Returns:
        dict: the new metadata dictionary

    """
    if name not in metadata.keys():
        metadata[name] = value
    else:
        if not isinstance(metadata[name], list):
            # We have encountered this key exactly once before. Since we're seeing it again, we know we
            # need to convert it to a list before proceeding.
            metadata[name] = [metadata[name]]

        # We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
        # the value to the already-existing list.
        metadata[name].append(value)

    return metadata


def get_from_dict_if_exists(key, dictionary, convert_key_to_binary=True):
    """
    Get the entry from the dictionary if it exists
    Args:
        key: key to lookup
        dictionary: dictionary to look in
        convert_key_to_binary: convert the key from string to binary if true

    Returns:
        the value of dictionary[key] or None

    """
    if convert_key_to_binary:
        key = six.b(key)

    if key not in dictionary:
        return None
    return dictionary[key]


def check_or_make_dir(directory):
    """
    Check if a directory exists, if not, create it
    Args:
        directory: the path to the directory
    """
    if not os.path.exists(directory):
        os.makedirs(directory)