sdds/writer.py
"""
Writer
------
This module contains the writing functionality of ``sdds``.
It provides a high-level function to write SDDS files in different formats, and a series of helpers.
"""
import pathlib
import struct
from typing import IO, Any, Iterable, List, Tuple, Union
import numpy as np
from sdds.classes import ENCODING, Array, Column, Data, Definition, Description, Parameter, SddsFile, get_dtype_str
def write_sdds(sdds_file: SddsFile, output_path: Union[pathlib.Path, str]) -> None:
"""
Writes SddsFile object into ``output_path``.
The byteorder will be big-endian, independent of the byteorder of the current machine.
Args:
sdds_file: `SddsFile` object to write
output_path (Union[pathlib.Path, str]): `Path` object to the output SDDS file. Can be
a `string`, in which case it will be cast to a `Path` object.
"""
output_path = pathlib.Path(output_path)
with output_path.open("wb") as outbytes:
names = _write_header(sdds_file, outbytes)
_write_data(names, sdds_file, outbytes)
def _write_header(sdds_file: SddsFile, outbytes: IO[bytes]) -> List[str]:
outbytes.writelines(("SDDS1\n".encode(ENCODING), "!# big-endian\n".encode(ENCODING)))
names = []
if sdds_file.description is not None:
# TODO: For Description and Data, this is not implemented yet
outbytes.write(_sdds_def_as_str(sdds_file.description).encode(ENCODING))
for def_name in sdds_file.definitions:
names.append(def_name)
definition = sdds_file.definitions[def_name]
outbytes.write(_sdds_def_as_str(definition).encode(ENCODING))
outbytes.write("&data mode=binary, &end\n".encode(ENCODING))
return names
def _sdds_def_as_str(definition: Union[Description, Definition, Data]) -> str:
return f"{definition.TAG} {definition.get_key_value_string()} &end\n"
def _write_data(names: List[str], sdds_file: SddsFile, outbytes: IO[bytes]) -> None:
# row_count:
outbytes.write(np.array(0, dtype=get_dtype_str("long")).tobytes())
parameters: List[Tuple[Parameter, Any]] = []
arrays: List[Tuple[Array, Any]] = []
columns: List[Tuple[Column, Any]] = []
for name in names:
if isinstance(sdds_file[name][0], Parameter):
parameters.append(sdds_file[name]) # type: ignore
elif isinstance(sdds_file[name][0], Array):
arrays.append(sdds_file[name]) # type: ignore
elif isinstance(sdds_file[name][0], Column):
columns.append(sdds_file[name]) # type: ignore
_write_parameters(parameters, outbytes)
_write_arrays(arrays, outbytes)
_write_columns(columns, outbytes)
def _write_parameters(param_gen: Iterable[Tuple[Parameter, Any]], outbytes: IO[bytes]):
for param_def, value in param_gen:
if param_def.type == "string":
_write_string(value, outbytes)
else:
outbytes.write(np.array(value, dtype=get_dtype_str(param_def.type)).tobytes())
def _write_arrays(array_gen: Iterable[Tuple[Array, Any]], outbytes: IO[bytes]):
def get_dimensions_from_array(value):
# Return the number of items per dimension
# For an array a[n][m], returns [n, m]
if isinstance(value, np.ndarray) or isinstance(value, list):
if len(value) == 0:
return [0]
else:
return [len(value)] + get_dimensions_from_array(value[0])
return []
for array_def, value in array_gen:
# Number of items per dimensions need to be written before the data
elements_per_dim = get_dimensions_from_array(value)
long_array = np.array(elements_per_dim, dtype=get_dtype_str("long"))
outbytes.write(long_array.tobytes())
if array_def.type == "string":
for string in value:
_write_string(string, outbytes)
else:
outbytes.write(np.array(value, dtype=get_dtype_str(array_def.type)).tobytes())
def _write_columns(col_gen: Iterable[Tuple[Column, Any]], outbytes: IO[bytes]):
# TODO: Implement the columns thing.
pass
def _write_string(string: str, outbytes: IO[bytes]):
outbytes.write(np.array(len(string), dtype=get_dtype_str("long")).tobytes())
outbytes.write(
struct.pack(
get_dtype_str("string", length=len(string)),
string.encode(ENCODING),
)
)