python/src/odin_data/meta_writer/meta_writer.py
"""Implementation of odin_data Meta Writer
This module is passed meta data messages for a single acquisition which it writes to disk.
Will need to be subclassed by detector specific implementation.
Matt Taylor, Diamond Light Source
"""
import os
from time import time
import logging
import h5py
from odin_data import __version__
from odin_data.meta_writer.hdf5dataset import HDF5Dataset, Int64HDF5Dataset
from odin_data.util import construct_version_dict
# Data message parameters
FRAME = "frame"
OFFSET = "offset"
RANK = "rank"
ENDPOINT = "_endpoint"
CREATE_DURATION = "create_duration"
WRITE_DURATION = "write_duration"
FLUSH_DURATION = "flush_duration"
CLOSE_DURATION = "close_duration"
MESSAGE_TYPE_ID = "parameter"
# Configuration items
DIRECTORY = "directory"
FILE_PREFIX = "file_prefix"
FLUSH_FRAME_FREQUENCY = "flush_frame_frequency"
FLUSH_TIMEOUT = "flush_timeout"
class MetaWriterConfig(object):
def __init__(self, sensor_shape):
"""
Args:
sensor_shape(tuple): Detector sensor size (y, x)
"""
self.sensor_shape = sensor_shape
def require_open_hdf5_file(func):
"""A decorator to verify the HDF5 file is open before calling the wrapped method
If the HDF5 file is currently open for writing, call the method, else log the reason
the file is not open
NOTE: This should only be used on MetaWriter methods (that take self as the first
argument)
"""
def wrapper(*args, **kwargs):
writer = args[0] # Extract class instance (self) from args
if writer.file_open:
# It is safe to call the wrapped method
return func(*args, **kwargs)
# It is not safe to call the wrapped method - log the reason why
if writer.finished:
reason = "Already finished writing"
else:
reason = "Have not received startacquisition yet"
writer._logger.error(
"%s | Cannot call %s - File not open - %s",
writer._name,
func.__name__,
reason,
)
return wrapper
class MetaWriter(object):
"""This class handles meta data messages and writes parameters to disk"""
FILE_SUFFIX = "_meta.h5"
CONFIGURE_PARAMETERS = [
DIRECTORY,
FILE_PREFIX,
FLUSH_FRAME_FREQUENCY,
FLUSH_TIMEOUT,
]
# Detector-specific parameters received on per-frame meta message
DETECTOR_WRITE_FRAME_PARAMETERS = []
def __init__(self, name, directory, endpoints, config):
"""
Args:
name(str): Unique name to construct file path and to include in
log messages
directory(str): Directory to create the meta file in
endpoints(list): Endpoints parent MetaListener will receive data on
config(MetaWriterConfig): Configuration options
"""
self._logger = logging.getLogger(self.__class__.__name__)
# Config
self.directory = directory
self.file_prefix = None
self.flush_frame_frequency = 100
self.flush_timeout = 1
# Status
self.full_file_path = None
self.write_count = 0
self.finished = False
self.write_timeout_count = 0
# Internal parameters
self._name = name
self._processes_running = [False] * len(endpoints)
self._endpoints = endpoints
self._config = config
self._last_flushed = time() # Seconds since epoch
self._frames_since_flush = 0
self._hdf5_file = None
self._datasets = dict(
(dataset.name, dataset)
for dataset in self._define_datasets() + self._define_detector_datasets()
)
# Child class parameters
self._frame_data_map = dict() # Map of frame number to detector data
self._frame_offset_map = dict() # Map of frame number to offset in dataset
self._writers_finished = False
self._detector_finished = True # See stop_when_detector_finished
@staticmethod
def _define_datasets():
return [
Int64HDF5Dataset(FRAME),
Int64HDF5Dataset(OFFSET),
Int64HDF5Dataset(CREATE_DURATION, cache=False),
Int64HDF5Dataset(WRITE_DURATION),
Int64HDF5Dataset(FLUSH_DURATION),
Int64HDF5Dataset(CLOSE_DURATION, cache=False),
]
def _define_detector_datasets(self):
return []
@property
def file_open(self):
return self._hdf5_file is not None
@property
def active_process_count(self):
return self._processes_running.count(True)
def _generate_full_file_path(self):
prefix = self.file_prefix if self.file_prefix is not None else self._name
self.full_file_path = os.path.join(
self.directory, "{}{}".format(prefix, self.FILE_SUFFIX)
)
return self.full_file_path
def _create_file(self, file_path, dataset_size):
self._logger.debug(
"%s | Opening file %s - Expecting %d frames",
self._name,
file_path,
dataset_size,
)
try:
self._hdf5_file = h5py.File(file_path, "w", libver="latest")
except IOError as error:
self._logger.error(
"%s | Failed to create file:\n%s: %s",
self._name,
error.__class__.__name__,
error.message,
)
return
self._create_datasets(dataset_size)
# Datasets created after this point will not be SWMR-readable
self._hdf5_file.swmr_mode = True
@require_open_hdf5_file
def _close_file(self):
self._logger.info("%s | Closing file", self._name)
self._flush_datasets()
self._hdf5_file.close()
self._hdf5_file = None
@require_open_hdf5_file
def _create_datasets(self, dataset_size):
"""Add predefined datasets to HDF5 file and store handles
Args:
datasets(list(HDF5Dataset)): The datasets to add to the file
"""
self._logger.debug("%s | Creating datasets", self._name)
for dataset in self._datasets.values():
chunks = dataset.chunks
if chunks is None:
chunks = dataset.maxshape
if isinstance(chunks, int):
chunks = (chunks,)
if None in chunks:
chunks = None
self._logger.debug("Dataset {} chunking: {}".format(dataset.name, chunks))
dataset_handle = self._hdf5_file.create_dataset(
name=dataset.name,
shape=dataset.shape,
maxshape=dataset.maxshape,
chunks=chunks,
dtype=dataset.dtype,
fillvalue=dataset.fillvalue,
)
dataset.initialise(dataset_handle, dataset_size)
@require_open_hdf5_file
def _add_dataset(self, dataset_name, data, dataset_size=None):
"""Add a new dataset with the given data
Args:
dataset_name(str): Name of dataset
data(np.ndarray): Data to initialise HDF5 dataset with
dataset_size(int): Dataset size - required if more data will be added
"""
self._logger.debug("%s | Adding dataset %s", self._name, dataset_name)
if dataset_name in self._datasets:
self._logger.debug(
"%s | Dataset %s already created", self._name, dataset_name
)
return
self._logger.debug(
"%s | Creating dataset %s with data:\n%s", self._name, dataset_name, data
)
dataset = HDF5Dataset(dataset_name, dtype=None, fillvalue=None, cache=False)
dataset_handle = self._hdf5_file.create_dataset(name=dataset_name, data=data)
dataset.initialise(dataset_handle, dataset_size)
self._datasets[dataset_name] = dataset
@require_open_hdf5_file
def _add_value(self, dataset_name, value, offset=0):
"""Append a value to the named dataset
Args:
dataset_name(str): Name of dataset
value(): The value to append
index(int): The offset to add the value to
"""
self._logger.debug("%s | Adding value to %s", self._name, dataset_name)
if dataset_name not in self._datasets:
self._logger.error("%s | No such dataset %s", self._name, dataset_name)
return
self._datasets[dataset_name].add_value(value, offset)
@require_open_hdf5_file
def _add_values(self, expected_parameters, data, offset):
"""Take values of parameters from data and write to datasets at offset
Args:
expected_parameters(list(str)): Parameters to write
data(dict): Set of parameter values
offset(int): Offset to write parameters to in datasets
"""
self._logger.debug("%s | Adding values to datasets", self._name)
for parameter in expected_parameters:
if parameter not in data:
self._logger.error(
"%s | Expected parameter %s not found in %s",
self._name,
parameter,
data,
)
continue
self._add_value(parameter, data[parameter], offset)
@require_open_hdf5_file
def _write_dataset(self, dataset_name, data):
"""Write an entire dataset with the given data
Args:
dataset_name(str): Name of dataset
data(np.ndarray): Data to set HDF5 dataset with
"""
self._logger.debug("%s | Writing entire dataset %s", self._name, dataset_name)
if dataset_name not in self._datasets:
self._logger.error("%s | No such dataset %s", self._name, dataset_name)
return
self._datasets[dataset_name].write(data)
@require_open_hdf5_file
def _write_datasets(self, expected_parameters, data):
"""Take values of parameters from data and write datasets
Args:
expected_parameters(list(str)): Parameters to write
data(dict): Set of parameter values
"""
self._logger.debug("%s | Writing datasets", self._name)
for parameter in expected_parameters:
if parameter not in data:
self._logger.error(
"%s | Expected parameter %s not found in %s",
self._name,
parameter,
data,
)
continue
self._write_dataset(parameter, data[parameter])
@require_open_hdf5_file
def _flush_datasets(self):
self._logger.debug("%s | Flushing datasets", self._name)
for dataset in self._datasets.values():
dataset.flush()
def stop_when_detector_finished(self):
"""Register that it is OK to stop when all detector-specific logic is complete
By default, detector_finished is set to True initially so that this check always
passes. Child classes that need to do their own checks can set this to False in
__init__ and call stop_when_writers_finished when ready to stop.
"""
self._writers_finished = True
if self._detector_finished:
self._logger.debug("%s | Detector already finished", self._name)
self.stop()
else:
self._logger.debug("%s | Detector not finished", self._name)
def stop_when_writers_finished(self):
"""Register that it is OK to stop when all monitored writers have finished
Child classes can call this when all detector-specific logic is complete.
"""
self._detector_finished = True
if self._writers_finished:
self._logger.debug("%s | Writers already finished", self._name)
self.stop()
else:
self._logger.debug("%s | Writers not finished", self._name)
def stop(self):
if self.file_open:
self._close_file()
self.finished = True
self._logger.info("%s | Finished", self._name)
def status(self):
"""Return current status parameters"""
return dict(
full_file_path=self.full_file_path,
num_processors=self.active_process_count,
written=self.write_count,
writing=self.file_open and not self.finished,
)
def configure(self, configuration):
"""Configure the writer with a set of one or more parameters
Args:
configuration(dict): Configuration parameters
Returns:
error(None/str): None if successful else an error message
"""
error = None
for parameter, value in configuration.items():
if parameter in self.CONFIGURE_PARAMETERS:
self._logger.debug(
"%s | Setting %s to %s", self._name, parameter, value
)
setattr(self, parameter, value)
else:
error = "Invalid parameter {}".format(parameter)
self._logger.error("%s | %s", self._name, error)
return error
def request_configuration(self):
"""Return the current configuration
Returns:
configuration(dict): Dictionary of current configuration parameters
"""
configuration = dict(
(parameter, getattr(self, parameter))
for parameter in self.CONFIGURE_PARAMETERS
)
return configuration
# Methods for handling various message types
@property
def message_handlers(self):
"""Dictionary of message type to handler method
This should be overridden by child classes to add additional handlers
Returns:
dict: message type handler methods
"""
message_handlers = {
"startacquisition": self.handle_start_acquisition,
"createfile": self.handle_create_file,
"writeframe": self.handle_write_frame,
"closefile": self.handle_close_file,
"stopacquisition": self.handle_stop_acquisition,
}
message_handlers.update(self.detector_message_handlers)
return message_handlers
@property
def detector_message_handlers(self):
return {}
def process_message(self, header, data):
"""Process a message from a data socket
This is main entry point for handling any type of message and calling
the appropriate method.
Look up the appropriate message handler based on the message type and
call it.
Leading underscores on a handler function definition parameter mean it
does not use the argument.
This should be overridden by child classes to handle any additional messages.
Args:
header(str): The header message part
data(str): The data message part (a json string or a data blob)
"""
handler = self.message_handlers.get(header[MESSAGE_TYPE_ID], None)
if handler is not None:
handler(header["header"], data)
else:
self._logger.error(
"%s | Unknown message type: %s", self._name, header[MESSAGE_TYPE_ID]
)
def handle_start_acquisition(self, header, _data):
"""Prepare the data file with the number of frames to write"""
self._logger.debug("%s | Handling start acquisition message", self._name)
if self._processes_running[self._endpoints.index(header[ENDPOINT])]:
self._logger.error(
"%s | Received additional startacquisition from process endpoint %s - ignoring",
self._name,
header[ENDPOINT],
)
return
self._processes_running[self._endpoints.index(header[ENDPOINT])] = True
self._logger.debug(
"%s | Received startacquisition message from endpoint %s - %d processes running",
self._name,
header[ENDPOINT],
self.active_process_count,
)
if not self.file_open:
self._create_file(self._generate_full_file_path(), header["totalFrames"])
def handle_create_file(self, _header, data):
self._logger.debug("%s | Handling create file message", self._name)
self._add_value(CREATE_DURATION, data[CREATE_DURATION])
def handle_write_frame(self, _header, data):
self._logger.debug("%s | Handling write frame message", self._name)
# TODO: Handle getting more frames than expected because of rewinding?
write_frame_parameters = [FRAME, OFFSET, WRITE_DURATION, FLUSH_DURATION]
self._add_values(write_frame_parameters, data, data[OFFSET])
# Here we keep track of whether we need to write to disk based on:
# - Time since last write
# - Number of write frame messages since last write
# Reset timeout count to 0
self.write_timeout_count = 0
self.write_count += 1
self._frames_since_flush += 1
# Write detector meta data for this frame, now that we know the offset
self.write_detector_frame_data(data[FRAME], data[OFFSET])
flush_required = (
time() - self._last_flushed >= self.flush_timeout
or self._frames_since_flush >= self.flush_frame_frequency
)
if flush_required:
self._flush_datasets()
self._last_flushed = time()
self._frames_since_flush = 0
def write_detector_frame_data(self, frame, offset):
"""Write the frame data to at the given offset
Args:
frame(int): Frame to write
offset(int): Offset in datasets to write the frame data to
"""
if not self.DETECTOR_WRITE_FRAME_PARAMETERS:
# No detector specific data to write
return
self._logger.debug("%s | Writing detector data for frame %d", self._name, frame)
if frame not in self._frame_data_map:
self._logger.warning(
"%s | No detector meta data stored for frame %d", self._name, frame
)
self._frame_offset_map[frame] = offset
return
data = self._frame_data_map.pop(frame)
self._add_values(self.DETECTOR_WRITE_FRAME_PARAMETERS, data, offset)
def handle_close_file(self, _header, data):
self._logger.debug("%s | Handling close file message", self._name)
self._add_value(CLOSE_DURATION, data[CLOSE_DURATION])
def handle_stop_acquisition(self, header, _data):
"""Register that a process has finished and stop if it is the last one"""
if not self._processes_running[self._endpoints.index(header[ENDPOINT])]:
self._logger.error(
"%s | Received stopacquisition from process endpoint %s before start - ignoring",
self._name,
header[ENDPOINT],
)
return
self._logger.debug(
"%s | Received stopacquisition from endpoint %s", self._name, header[ENDPOINT]
)
self._processes_running[self._endpoints.index(header[ENDPOINT])] = False
if not any(self._processes_running):
self._logger.info("%s | Last processor stopped", self._name)
self.stop_when_detector_finished()
@staticmethod
def get_version():
return "odin-data", construct_version_dict(__version__)