deparkes/OOMMFTools

View on GitHub
oommftools/core/oommfdecode.py

Summary

Maintainability
F
3 days
Test Coverage
import numpy as np
import struct
import pickle
import scipy.io as spio
from collections import defaultdict

def unpackFile(filename):
    """
    """
    with open(filename, 'rb') as f:
        headers = {} #I know valuemultiplier isn't always present. This is checked later.
        extraCaptures = {'SimTime':-1, 'Iteration':-1, 'Stage':-1, "MIFSource":""}
        #Parse headers
        a = ""
        while not "Begin: Data" in a:
            
            a = f.readline().strip().decode()
            #Determine if it's actually something we need as header data
            for key in ["xbase",
                        "ybase",
                        "zbase",
                        "xstepsize",
                        "ystepsize",
                        "zstepsize",
                        "xnodes",
                        "ynodes",
                        "znodes",
                        "valuemultiplier"]:
                if key in a:
                    headers[key] = float(a.split()[2]) #Known position FTW
            #All right, it may also be time data, which we should capture
            if "Total simulation time" in a:
                #Split on the colon to get the time with units;
                #strip spaces and split on the space to separate time and units
                #Finally, pluck out the time, stripping defensively
                #(which should be unnecessary).
                extraCaptures['SimTime'] = float(a.split(":")[-1].strip().split()[0].strip())
            if "Iteration:" in a:
                #Another tricky split...
                extraCaptures['Iteration'] = float(a.split(":")[2].split(",")[0].strip())
            if "Stage:" in a:
                extraCaptures['Stage'] = float(a.split(":")[2].split(",")[0].strip())
            if "MIF source file" in a:
                extraCaptures['MIFSource'] = a.split(":", 2)[2].strip()


        #Initialize array to be populated
        outArray = np.zeros((int(headers["xnodes"]),
                             int(headers["ynodes"]),
                             int(headers["znodes"]),
                             3))

        #Determine decoding mode and use that to populate the array
        print("Data indicator:", a)
        decode = a.split()
        if decode[3] == "Text":
            return _textDecode(f, outArray, headers, extraCaptures)
        elif decode[3] == "Binary" and decode[4] == "4":
            #Determine endianness
            endianflag = f.read(4)
            if struct.unpack(">f", endianflag)[0] == 1234567.0:
                print("Big-endian 4-byte detected.")
                dc = struct.Struct(">f")
            elif struct.unpack("<f", endianflag)[0] == 1234567.0:
                print("Little-endian 4-byte detected.")
                dc = struct.Struct("<f")
            else:
                raise Exception("Can't decode 4-byte byte order mark: " + hex(endianflag))
            return _binaryDecode(f, 4, dc, outArray, headers, extraCaptures)
        elif decode[3] == "Binary" and decode[4] == "8":
            #Determine endianness
            endianflag = f.read(8)
            if struct.unpack(">d", endianflag)[0] == 123456789012345.0:
                print("Big-endian 8-byte detected.")
                dc = struct.Struct(">d")
            elif struct.unpack("<d", endianflag)[0] == 123456789012345.0:
                print("Little-endian 8-byte detected.")
                dc = struct.Struct("<d")
            else:
                raise Exception("Can't decode 8-byte byte order mark: " + hex(endianflag))
            return _binaryDecode(f, 8, dc, outArray, headers, extraCaptures)
        else:
            raise Exception("Unknown OOMMF data format:" + decode[3] + " " + decode[4])



def _textDecode(filehandle, targetarray, headers, extraCaptures):
    """
    """
    valm = headers.get("valuemultiplier", 1)
    for k in range(int(headers["znodes"])):
        for j in range(int(headers["ynodes"])):
            for i in range(int(headers["xnodes"])):
                #numpy is fantastic - splice in a tuple
                text = filehandle.readline().strip().split()
                targetarray[i, j, k] = (float(text[0])*valm,
                                        float(text[1])*valm,
                                        float(text[2])*valm)
    print("Decode complete.")
    return (targetarray, headers, extraCaptures)


def _binaryDecode(filehandle, chunksize, decoder, targetarray, headers, extraCaptures):
    """
    """
    valm = headers.get("valuemultiplier", 1)
    for k in range(int(headers["znodes"])):
        for j in range(int(headers["ynodes"])):
            for i in range(int(headers["xnodes"])):
                for coord in range(3): #Slowly populate, coordinate by coordinate
                    targetarray[i, j, k, coord] = decoder.unpack(filehandle.read(chunksize))[0] * valm
    print("Decode complete.")
    return (targetarray, headers, extraCaptures)

def pickleArray(array, headers, extraCaptures, filename):
    """
    """
    temp = dict(headers)
    temp.update(extraCaptures)
    f = open(filename, 'wb')
    pickle.dump((array, temp), f)
    f.close()

def matlabifyArray(array, headers, extraCaptures, filename):
    """
    """
    GridSize = np.array([float(headers["xstepsize"]),
                         float(headers["ystepsize"]),
                         float(headers["zstepsize"])])
    OutDict = {"OOMMFData":array, "GridSize":GridSize}
    OutDict.update(extraCaptures)
    spio.savemat(filename, OutDict)

def slowlyPainfullyMaximize(filenames):
    """
    This is a special utility function used by OOMMFConvert to find the single largest-magnitude
    vector in a set of vector files
    """
    #There is no nice way to do this.
    def mag(a, b, c):
        """
        """
        return np.sqrt(a**2 + b**2 + c**2)
    maxmag = 0

    for filename in filenames:
        thisArray, headers, extraCaps = unpackFile(filename)
        for k in range(int(headers["znodes"])):
            for j in range(int(headers["ynodes"])):
                for i in range(int(headers["xnodes"])):
                    maxmag = max(maxmag, mag(*thisArray[i, j, k]))
    return maxmag


def sortBySimTime(extra, arrays):

    # We do some enumeration later on that needs 'arrays'
    # to be an iterable.
    if len(arrays) < 1:
        arrays = (arrays,)
    originalTimeIndex = list(extra["SimTime"])
    if len(set(extra["MIFSource"])) == 1:
        if not -1 in extra["SimTime"]:
            # Rather than sort / zip the arrays themselves
            # we do it with indices. This is because python
            # didn't like sorting / zipping numpy arrays.
            array_indices = [i for i,j in enumerate(arrays)]
            sorted_arrays = []
            zipped = zip(originalTimeIndex, array_indices)
            sorted_zipped = sorted(zipped) # problem here
            zipped_sorted_zipped = zip(*sorted_zipped)
            extra["SimTime"], array_order = list(zipped_sorted_zipped)
            #Sadly, the cleverness ends here - the rest must be bruteforced.
            for key in extra:
                if not key == "SimTime": #We did that one.
                    junk, extra[key] = list(zip(*sorted(zip(originalTimeIndex, extra[key]))))
            for count, array in enumerate(arrays):
                sorted_arrays.append(arrays[array_order[count]])
            arrays = tuple(sorted_arrays)
    return arrays, extra


def groupUnpack(targetList):
    decodedArrays = []
    headers = {}
    extraData = defaultdict(list)
    firstTime = True
    for target in targetList:
        collect = unpackFile(target)
        if firstTime:
            firstTime = False
            headers = collect[1]
        decodedArrays.append(collect[0])
        #Unpack extra collected data
        for key, value in list(collect[2].items()):
            extraData[key].append(value)

    return (np.array(decodedArrays), headers, extraData)