cpp/frameProcessor/src/FileWriterPlugin.cpp
/*
* FileWriter.cpp
*
*/
#include <assert.h>
#include <boost/filesystem.hpp>
#include <hdf5_hl.h>
#include "Frame.h"
#include "FileWriterPlugin.h"
#include "FrameProcessorDefinitions.h"
#include "logging.h"
#include "DebugLevelLogger.h"
#include "version.h"
#ifdef BOOST_HAS_PLACEHOLDERS
using namespace boost::placeholders;
#endif
namespace FrameProcessor
{
const std::string FileWriterPlugin::CONFIG_PROCESS = "process";
const std::string FileWriterPlugin::CONFIG_PROCESS_NUMBER = "number";
const std::string FileWriterPlugin::CONFIG_PROCESS_RANK = "rank";
const std::string FileWriterPlugin::CONFIG_PROCESS_BLOCKSIZE = "frames_per_block";
const std::string FileWriterPlugin::CONFIG_PROCESS_BLOCKS_PER_FILE = "blocks_per_file";
const std::string FileWriterPlugin::CONFIG_PROCESS_EARLIEST_VERSION = "earliest_version";
const std::string FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_THRESHOLD = "alignment_threshold";
const std::string FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_VALUE = "alignment_value";
const std::string FileWriterPlugin::CONFIG_FILE = "file";
const std::string FileWriterPlugin::CONFIG_FILE_NAME = "name";
const std::string FileWriterPlugin::CONFIG_FILE_USE_NUMBERS = "use_numbers";
const std::string FileWriterPlugin::CONFIG_FILE_NUMBER_START = "first_number";
const std::string FileWriterPlugin::CONFIG_FILE_POSTFIX = "postfix";
const std::string FileWriterPlugin::CONFIG_FILE_PATH = "path";
const std::string FileWriterPlugin::CONFIG_FILE_EXTENSION = "extension";
const std::string FileWriterPlugin::CONFIG_DATASET = "dataset";
const std::string FileWriterPlugin::CONFIG_DATASET_TYPE = "datatype";
const std::string FileWriterPlugin::CONFIG_DATASET_DIMS = "dims";
const std::string FileWriterPlugin::CONFIG_DATASET_CHUNKS = "chunks";
const std::string FileWriterPlugin::CONFIG_DATASET_COMPRESSION = "compression";
const std::string FileWriterPlugin::CONFIG_DATASET_INDEXES = "indexes";
const std::string FileWriterPlugin::CONFIG_DATASET_BLOSC_COMPRESSOR = "blosc_compressor";
const std::string FileWriterPlugin::CONFIG_DATASET_BLOSC_LEVEL = "blosc_level";
const std::string FileWriterPlugin::CONFIG_DATASET_BLOSC_SHUFFLE = "blosc_shuffle";
const std::string FileWriterPlugin::CONFIG_DELETE_DATASETS = "delete_datasets";
const std::string FileWriterPlugin::CONFIG_FRAMES = "frames";
const std::string FileWriterPlugin::CONFIG_MASTER_DATASET = "master";
const std::string FileWriterPlugin::CONFIG_WRITE = "write";
const std::string FileWriterPlugin::ACQUISITION_ID = "acquisition_id";
const std::string FileWriterPlugin::CLOSE_TIMEOUT_PERIOD = "timeout_timer_period";
const std::string FileWriterPlugin::START_CLOSE_TIMEOUT = "start_timeout_timer";
const std::string FileWriterPlugin::CREATE_ERROR_DURATION = "create_error_duration";
const std::string FileWriterPlugin::WRITE_ERROR_DURATION = "write_error_duration";
const std::string FileWriterPlugin::FLUSH_ERROR_DURATION = "flush_error_duration";
const std::string FileWriterPlugin::CLOSE_ERROR_DURATION = "close_error_duration";
/**
* Create a FileWriterPlugin with default values.
* File path is set to default of current directory, and the
* filename is set to a default.
*
* The writer plugin is also configured to be a single
* process writer (no other expected writers).
*/
FileWriterPlugin::FileWriterPlugin() :
writing_(false),
concurrent_processes_(1),
concurrent_rank_(0),
frames_per_block_(1),
blocks_per_file_(0),
first_file_index_(0),
use_file_numbering_(true),
file_postfix_(""),
file_extension_("h5"),
use_earliest_hdf5_(false),
alignment_threshold_(1),
alignment_value_(1),
timeout_period_(0),
timeout_thread_running_(true),
timeout_thread_(boost::bind(&FileWriterPlugin::run_close_file_timeout, this))
{
this->logger_ = Logger::getLogger("FP.FileWriterPlugin");
LOG4CXX_INFO(logger_, "FileWriterPlugin version " << this->get_version_long() << " loaded");
this->current_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
this->next_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
hdf5_error_definition_.create_duration = 0;
hdf5_error_definition_.write_duration = 0;
hdf5_error_definition_.flush_duration = 0;
hdf5_error_definition_.close_duration = 0;
hdf5_error_definition_.callback = boost::bind(&FileWriterPlugin::set_warning, this, _1);
}
/**
* Destructor.
*/
FileWriterPlugin::~FileWriterPlugin()
{
timeout_thread_running_ = false;
timeout_active_ = false;
// Notify the close timeout thread to clean up resources
{
boost::mutex::scoped_lock lock2(close_file_mutex_);
timeout_condition_.notify_all();
}
{
boost::mutex::scoped_lock lock(start_timeout_mutex_);
start_condition_.notify_all();
}
timeout_thread_.join();
if (writing_) {
stop_writing();
}
}
/** Process an incoming frame.
*
* Checks we have been asked to write frames. If we are in writing mode
* then the frame is checked for subframes. If subframes are found then
* writeSubFrames is called. If no subframes are found then writeFrame
* is called.
* Finally counters are updated and if the number of required frames has
* been reached then the stopWriting method is called.
*
* \param[in] frame - Pointer to the Frame object.
*/
void FileWriterPlugin::process_frame(boost::shared_ptr<Frame> frame)
{
// Protect this method
boost::mutex::scoped_lock cflock(close_file_mutex_);
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
// check it matches the current (or next) acquisition.
// frames that don't match are dropped / ignored.
if (frame_in_acquisition(frame)) {
if (writing_) {
ProcessFrameStatus status = current_acquisition_->process_frame(frame, hdf5_call_durations_);
// Check if this acquisition is complete and stop
if (status == status_complete) {
stop_acquisition();
} else if (status == status_complete_missing_frames) {
LOG4CXX_INFO(logger_, "Starting close file timeout as received last frame but missing some frames");
start_close_file_timeout();
} else if (status == status_invalid) {
this->set_error("Frame invalid");
this->set_error(current_acquisition_->get_last_error());
}
// Push frame to any registered callbacks
this->push(frame);
// Notify the close timeout thread that a frame has been processed
timeout_condition_.notify_all();
}
}
}
/** Process an EndOfAcquisitionFrame.
*
* Checks we are writing. If we are in writing mode then the acquisition is
* stopped and the timeout_active_ flag is set to false.
*/
void FileWriterPlugin::process_end_of_acquisition()
{
if (writing_) {
LOG4CXX_INFO(logger_, "End of acquisition frame received, stopping writer");
stop_acquisition();
}
}
/** Start writing frames to file.
*
* This method checks that the writer is not already writing. Then it creates
* the datasets required (from their definitions) and creates the HDF5 file
* ready to write frames. The framesWritten counter is reset to 0.
*/
void FileWriterPlugin::start_writing()
{
// Set the current acquisition details to the ones held for the next acquisition and reset the next ones
if (!writing_) {
// Re-calculate the number of frames to write in case the process and
// rank has been changed since the frame count was set
next_acquisition_->frames_to_write_ = calc_num_frames(this->next_acquisition_->total_frames_);
this->current_acquisition_ = next_acquisition_;
this->next_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
// Set up datasets within the current acquisition
std::map<std::string, DatasetDefinition>::iterator iter;
for (iter = this->dataset_defs_.begin(); iter != this->dataset_defs_.end(); ++iter){
this->current_acquisition_->dataset_defs_[iter->first] = iter->second;
}
// Start the acquisition and set writing flag to true if it started successfully
writing_ = this->current_acquisition_->start_acquisition(
concurrent_rank_,
concurrent_processes_,
frames_per_block_,
blocks_per_file_,
first_file_index_,
use_file_numbering_,
file_postfix_,
file_extension_,
use_earliest_hdf5_,
alignment_threshold_,
alignment_value_,
master_frame_,
hdf5_call_durations_);
}
}
/** Stop writing frames to file.
*
* This method checks that the writer is currently writing. Then it closes
* the file and stops writing frames.
*/
void FileWriterPlugin::stop_writing()
{
if (writing_) {
writing_ = false;
this->current_acquisition_->stop_acquisition(hdf5_call_durations_);
}
}
/**
* Set configuration options for the file writer.
*
* This sets up the file writer plugin according to the configuration IpcMessage
* objects that are received. The options are searched for:
* CONFIG_PROCESS - Calls the method processConfig
* CONFIG_FILE - Calls the method fileConfig
* CONFIG_DATASET - Calls the method dsetConfig
*
* Checks to see if the number of frames to write has been set.
* Checks to see if the writer should start or stop writing frames.
*
* \param[in] config - IpcMessage containing configuration data.
* \param[out] reply - Response IpcMessage.
*/
void FileWriterPlugin::configure(OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
{
// Protect this method
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
LOG4CXX_INFO(logger_, config.encode());
try {
// Check to see if we are configuring the process number and rank
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS)) {
OdinData::IpcMessage processConfig(config.get_param<const rapidjson::Value &>(FileWriterPlugin::CONFIG_PROCESS));
this->configure_process(processConfig, reply);
}
// Check to see if we are configuring the file path and name
if (config.has_param(FileWriterPlugin::CONFIG_FILE)) {
OdinData::IpcMessage fileConfig(config.get_param<const rapidjson::Value &>(FileWriterPlugin::CONFIG_FILE));
this->configure_file(fileConfig, reply);
}
// Check to see if we are configuring a dataset
if (config.has_param(FileWriterPlugin::CONFIG_DATASET)) {
// Attempt to retrieve the value as a string parameter
try {
LOG4CXX_INFO(logger_, "Checking for string name of dataset");
std::string dataset_name = config.get_param<std::string>(FileWriterPlugin::CONFIG_DATASET);
LOG4CXX_INFO(logger_, "Dataset name " << dataset_name << " found, creating...");
// If we can retrieve a single string parameter then we are being asked to create
// a new dataset. Only create it if it doesn't already exist.
create_new_dataset(dataset_name);
} catch (OdinData::IpcMessageException &e) {
// The object passed to us is a dataset description so pass to the configure_dataset method.
OdinData::IpcMessage dataset_config(
config.get_param<const rapidjson::Value &>(FileWriterPlugin::CONFIG_DATASET));
std::vector <std::string> dataset_names = dataset_config.get_param_names();
for (std::vector<std::string>::iterator iter = dataset_names.begin();
iter != dataset_names.end(); ++iter) {
std::string dataset_name = *iter;
LOG4CXX_INFO(logger_, "Dataset name " << dataset_name << " found, creating...");
create_new_dataset(dataset_name);
OdinData::IpcMessage dsetConfig(dataset_config.get_param<const rapidjson::Value &>(dataset_name));
this->configure_dataset(dataset_name, dsetConfig, reply);
}
}
}
// Check to see if we are deleting all datasets
if (config.has_param(FileWriterPlugin::CONFIG_DELETE_DATASETS)) {
this->delete_datasets();
}
// Check to see if we are being told how many frames to write
if (config.has_param(FileWriterPlugin::CONFIG_FRAMES) &&
config.get_param<size_t>(FileWriterPlugin::CONFIG_FRAMES) >= 0) {
size_t totalFrames = config.get_param<size_t>(FileWriterPlugin::CONFIG_FRAMES);
next_acquisition_->total_frames_ = totalFrames;
next_acquisition_->frames_to_write_ = calc_num_frames(totalFrames);
LOG4CXX_INFO(logger_,
"Expecting " << next_acquisition_->frames_to_write_ << " frames (total " << totalFrames << ")");
}
// Check to see if the master dataset is being set
if (config.has_param(FileWriterPlugin::CONFIG_MASTER_DATASET)) {
master_frame_ = config.get_param<std::string>(FileWriterPlugin::CONFIG_MASTER_DATASET);
LOG4CXX_INFO(logger_, "Setting master frame dataset to " << master_frame_);
}
// Check to see if the acquisition id is being set
if (config.has_param(FileWriterPlugin::ACQUISITION_ID)) {
next_acquisition_->acquisition_id_ = config.get_param<std::string>(FileWriterPlugin::ACQUISITION_ID);
LOG4CXX_INFO(logger_, "Setting next Acquisition ID to " << next_acquisition_->acquisition_id_);
}
// Check to see if the close file timeout period is being set
if (config.has_param(FileWriterPlugin::CLOSE_TIMEOUT_PERIOD)) {
timeout_period_ = config.get_param<size_t>(FileWriterPlugin::CLOSE_TIMEOUT_PERIOD);
LOG4CXX_INFO(logger_, "Setting close file timeout to " << timeout_period_);
}
// Check to see if the close file timeout is being started
if (config.has_param(FileWriterPlugin::START_CLOSE_TIMEOUT)) {
if (config.get_param<bool>(FileWriterPlugin::START_CLOSE_TIMEOUT) == true) {
LOG4CXX_INFO(logger_, "Configure call to start close file timeout");
if (writing_) {
start_close_file_timeout();
} else {
LOG4CXX_INFO(logger_, "Not starting timeout as not currently writing");
}
}
}
// Final check is to start or stop writing
if (config.has_param(FileWriterPlugin::CONFIG_WRITE)) {
if (config.get_param<bool>(FileWriterPlugin::CONFIG_WRITE) == true) {
// Only start writing if we have frames to write, or if the total number of frames is 0 (free running mode)
if (next_acquisition_->total_frames_ > 0 && next_acquisition_->frames_to_write_ == 0) {
// We're not expecting any frames, so just clear out the nextAcquisition for the next one and don't start writing
this->next_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
if (!writing_) {
this->current_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
}
LOG4CXX_INFO(logger_,
"FrameProcessor will not receive any frames from this acquisition and so no output file will be created");
} else {
this->start_writing();
}
} else {
this->stop_writing();
}
}
}
catch (std::runtime_error& e)
{
this->set_error(e.what());
throw;
}
}
void FileWriterPlugin::requestConfiguration(OdinData::IpcMessage& reply)
{
// Return the configuration of the file writer plugin
std::string process_str = get_name() + "/" + FileWriterPlugin::CONFIG_PROCESS + "/";
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_NUMBER, concurrent_processes_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_RANK, concurrent_rank_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_BLOCKSIZE, frames_per_block_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_BLOCKS_PER_FILE, blocks_per_file_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_EARLIEST_VERSION, use_earliest_hdf5_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_THRESHOLD, alignment_threshold_);
reply.set_param(process_str + FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_VALUE, alignment_value_);
std::string file_str = get_name() + "/" + FileWriterPlugin::CONFIG_FILE + "/";
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_PATH, next_acquisition_->file_path_);
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_NAME, next_acquisition_->configured_filename_);
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_USE_NUMBERS, use_file_numbering_);
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_NUMBER_START, first_file_index_);
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_POSTFIX, file_postfix_);
reply.set_param(file_str + FileWriterPlugin::CONFIG_FILE_EXTENSION, file_extension_);
// Configure HDF5 call error durations
reply.set_param(file_str + FileWriterPlugin::CREATE_ERROR_DURATION, hdf5_error_definition_.create_duration);
reply.set_param(file_str + FileWriterPlugin::WRITE_ERROR_DURATION, hdf5_error_definition_.write_duration);
reply.set_param(file_str + FileWriterPlugin::FLUSH_ERROR_DURATION, hdf5_error_definition_.flush_duration);
reply.set_param(file_str + FileWriterPlugin::CLOSE_ERROR_DURATION, hdf5_error_definition_.close_duration);
reply.set_param(get_name() + "/" + FileWriterPlugin::CONFIG_FRAMES, next_acquisition_->total_frames_);
reply.set_param(get_name() + "/" + FileWriterPlugin::CONFIG_MASTER_DATASET, master_frame_);
reply.set_param(get_name() + "/" + FileWriterPlugin::ACQUISITION_ID, next_acquisition_->acquisition_id_);
reply.set_param(get_name() + "/" + FileWriterPlugin::CLOSE_TIMEOUT_PERIOD, timeout_period_);
// Check for datasets
std::map<std::string, DatasetDefinition>::iterator iter;
for (iter = this->dataset_defs_.begin(); iter != this->dataset_defs_.end(); ++iter) {
// Add the dataset type
reply.set_param(get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_TYPE, get_type_from_enum(iter->second.data_type));
// Add the dataset compression
reply.set_param(get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_COMPRESSION, get_compress_from_enum(iter->second.compression));
reply.set_param(get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_BLOSC_COMPRESSOR, (int)iter->second.blosc_compressor);
reply.set_param(get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_BLOSC_LEVEL, (int)iter->second.blosc_level);
reply.set_param(get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_BLOSC_SHUFFLE, (int)iter->second.blosc_shuffle);
// Check for and add dimensions
if (iter->second.frame_dimensions.size() > 0) {
std::string dimParamName = get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_DIMS + "[]";
for (int index = 0; index < iter->second.frame_dimensions.size(); index++) {
reply.set_param(dimParamName, (int)iter->second.frame_dimensions[index]);
}
}
// Check for and add chunking dimensions
if (iter->second.chunks.size() > 0) {
std::string chunkParamName = get_name() + "/dataset/" + iter->first + "/" + FileWriterPlugin::CONFIG_DATASET_CHUNKS + "[]";
for (int index = 0; index < iter->second.chunks.size(); index++) {
reply.set_param(chunkParamName, (int)iter->second.chunks[index]);
}
}
}
}
/**
* Set configuration options for the file writer process count.
*
* This sets up the file writer plugin according to the configuration IpcMessage
* objects that are received. The options are searched for:
* CONFIG_PROCESS_NUMBER - Sets the number of writer processes executing
* CONFIG_PROCESS_RANK - Sets the rank of this process
*
* The configuration is not applied if the writer is currently writing.
*
* \param[in] config - IpcMessage containing configuration data.
* \param[out] reply - Response IpcMessage.
*/
void FileWriterPlugin::configure_process(OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
{
// Check for process number
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_NUMBER)) {
size_t processes = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_NUMBER);
if (this->concurrent_processes_ != processes) {
// If we are writing a file then we cannot change concurrent processes
if (this->writing_) {
std::string message = "Cannot change concurrent processes whilst writing";
set_error(message);
throw std::runtime_error(message);
}
this->concurrent_processes_ = processes;
LOG4CXX_DEBUG_LEVEL(1, logger_, "Concurrent processes changed to " << this->concurrent_processes_);
}
else {
LOG4CXX_DEBUG_LEVEL(1, logger_, "Concurrent processes is already " << this->concurrent_processes_);
}
}
// Check for rank number
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_RANK)) {
size_t rank = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_RANK);
if (this->concurrent_rank_ != rank) {
// If we are writing a file then we cannot change concurrent rank
if (this->writing_) {
std::string message = "Cannot change process rank whilst writing";
set_error(message);
throw std::runtime_error(message);
}
this->concurrent_rank_ = rank;
LOG4CXX_DEBUG_LEVEL(1, logger_, "Process rank changed to " << this->concurrent_rank_);
}
else {
LOG4CXX_DEBUG_LEVEL(1, logger_, "Process rank is already " << this->concurrent_rank_);
}
}
// Check to see if the frames per block is being set
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_BLOCKSIZE)) {
size_t block_size = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_BLOCKSIZE);
if (this->frames_per_block_ != block_size) {
if (block_size < 1) {
std::string message = "Must have at least one frame per block";
set_error(message);
throw std::runtime_error(message);
}
// If we are writing a file then we cannot change block size
if (this->writing_) {
std::string message = "Cannot change block size whilst writing";
set_error(message);
throw std::runtime_error(message);
}
this->frames_per_block_ = block_size;
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of frames per block to " << frames_per_block_);
}
else {
LOG4CXX_DEBUG_LEVEL(1, logger_, "Block size is already " << this->frames_per_block_);
}
}
// Check to see if the frames per block is being set
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_BLOCKS_PER_FILE)) {
size_t blocks_per_file = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_BLOCKS_PER_FILE);
if (this->blocks_per_file_ != blocks_per_file) {
// If we are writing a file then we cannot change block size
if (this->writing_) {
std::string message = "Cannot change blocks per file whilst writing";
set_error(message);
throw std::runtime_error(message);
}
this->blocks_per_file_ = blocks_per_file;
LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of blocks per file to " << blocks_per_file_);
}
else {
LOG4CXX_DEBUG_LEVEL(1, logger_, "Blocks per file is already " << this->blocks_per_file_);
}
}
// Check for hdf5 version
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_EARLIEST_VERSION)) {
this->use_earliest_hdf5_ = config.get_param<bool>(FileWriterPlugin::CONFIG_PROCESS_EARLIEST_VERSION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Use earliest version of HDF5 library to write file set to " << this->use_earliest_hdf5_);
}
// Check for alignment value and threshold
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_THRESHOLD)) {
this->alignment_threshold_ = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_THRESHOLD);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Chunk alignment threshold set to " << this->alignment_threshold_);
}
if (config.has_param(FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_VALUE)) {
this->alignment_value_ = config.get_param<size_t>(FileWriterPlugin::CONFIG_PROCESS_ALIGNMENT_VALUE);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Chunk alignment value set to " << this->alignment_value_);
}
}
/**
* Set file configuration options for the file writer.
*
* This sets up the file writer plugin according to the configuration IpcMessage
* objects that are received. The options are searched for:
* CONFIG_FILE_PATH - Sets the path of the file to write to
* CONFIG_FILE_NAME - Sets the filename of the file to write to
*
* The configuration is not applied if the writer is currently writing.
*
* \param[in] config - IpcMessage containing configuration data.
* \param[out] reply - Response IpcMessage.
*/
void FileWriterPlugin::configure_file(OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
{
LOG4CXX_DEBUG_LEVEL(1, logger_, "Configure file name and path");
// Check for file path and file name
if (config.has_param(FileWriterPlugin::CONFIG_FILE_PATH)) {
std::string file_path = config.get_param<std::string>(FileWriterPlugin::CONFIG_FILE_PATH);
boost::filesystem::path p(file_path);
// Check path exists
boost::system::error_code ec;
if (boost::filesystem::exists(p, ec)){
// Check path is a directory
if (boost::filesystem::is_directory(p, ec)){
// Check directory has write permission
if (eaccess(file_path.c_str(), W_OK)){
// Return code other then zero is a failure
std::stringstream ss;
ss << "User does not have write permissions for directory: " << file_path;
LOG4CXX_ERROR(logger_, ss.str());
reply.set_nack(ss.str());
} else {
// All checks passed, we can write to this location
this->next_acquisition_->file_path_ = config.get_param<std::string>(FileWriterPlugin::CONFIG_FILE_PATH);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Next file path changed to " << this->next_acquisition_->file_path_);
}
} else {
std::stringstream ss;
ss << "Path is not a directory: " << file_path;
LOG4CXX_ERROR(logger_, ss.str());
reply.set_nack(ss.str());
}
} else {
std::stringstream ss;
ss << "Invalid path requested: " << file_path;
LOG4CXX_ERROR(logger_, ss.str());
reply.set_nack(ss.str());
}
}
if (config.has_param(FileWriterPlugin::CONFIG_FILE_NAME)) {
this->next_acquisition_->configured_filename_ = config.get_param<std::string>(FileWriterPlugin::CONFIG_FILE_NAME);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Next file name changed to " << this->next_acquisition_->configured_filename_);
}
if (config.has_param(FileWriterPlugin::CONFIG_FILE_USE_NUMBERS)) {
this->use_file_numbering_ = config.get_param<bool>(FileWriterPlugin::CONFIG_FILE_USE_NUMBERS);
LOG4CXX_DEBUG_LEVEL(1, logger_, "File name 'use file numbers' changed to " << this->use_file_numbering_);
}
if (config.has_param(FileWriterPlugin::CONFIG_FILE_NUMBER_START)) {
this->first_file_index_ = config.get_param<int>(FileWriterPlugin::CONFIG_FILE_NUMBER_START);
LOG4CXX_DEBUG_LEVEL(1, logger_, "File name first index number changed to " << this->first_file_index_);
}
if (config.has_param(FileWriterPlugin::CONFIG_FILE_POSTFIX)) {
this->file_postfix_ = config.get_param<std::string>(FileWriterPlugin::CONFIG_FILE_POSTFIX);
LOG4CXX_DEBUG_LEVEL(1, logger_, "File name postfix changed to " << this->file_postfix_);
}
if (config.has_param(FileWriterPlugin::CONFIG_FILE_EXTENSION)) {
this->file_extension_ = config.get_param<std::string>(FileWriterPlugin::CONFIG_FILE_EXTENSION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "File extension changed to " << this->file_extension_);
}
// Check for HDF5 call error durations
if (config.has_param(FileWriterPlugin::CREATE_ERROR_DURATION)) {
this->hdf5_error_definition_.create_duration = config.get_param<unsigned int>(FileWriterPlugin::CREATE_ERROR_DURATION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Open error duration changed to " << this->hdf5_error_definition_.create_duration);
}
if (config.has_param(FileWriterPlugin::WRITE_ERROR_DURATION)) {
this->hdf5_error_definition_.write_duration = config.get_param<unsigned int>(FileWriterPlugin::WRITE_ERROR_DURATION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Write error duration changed to " << this->hdf5_error_definition_.write_duration);
}
if (config.has_param(FileWriterPlugin::FLUSH_ERROR_DURATION)) {
this->hdf5_error_definition_.flush_duration = config.get_param<unsigned int>(FileWriterPlugin::FLUSH_ERROR_DURATION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Flush error duration changed to " << this->hdf5_error_definition_.flush_duration);
}
if (config.has_param(FileWriterPlugin::CLOSE_ERROR_DURATION)) {
this->hdf5_error_definition_.close_duration = config.get_param<unsigned int>(FileWriterPlugin::CLOSE_ERROR_DURATION);
LOG4CXX_DEBUG_LEVEL(1, logger_, "Close error duration changed to " << this->hdf5_error_definition_.close_duration);
}
}
/**
* Set dataset configuration options for the file writer.
*
* This sets up the file writer plugin according to the configuration IpcMessage
* objects that are received. The options are searched for:
* CONFIG_DATASET_CMD - Should we create/delete a dataset definition
* CONFIG_DATASET_NAME - Name of the dataset
* CONFIG_DATASET_TYPE - Datatype of the dataset
* CONFIG_DATASET_DIMS - Dimensions of the dataset
* CONFIG_DATASET_CHUNKS - Chunking parameters of the dataset
* CONFIG_DATASET_COMPRESSION - Compression of raw data
*
* The configuration is not applied if the writer is currently writing.
*
* \param[in] config - IpcMessage containing configuration data.
* \param[out] reply - Response IpcMessage.
*/
void FileWriterPlugin::configure_dataset(const std::string& dataset_name, OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
{
LOG4CXX_DEBUG_LEVEL(1, logger_, "Configuring dataset [" << dataset_name << "]");
DatasetDefinition dset = dataset_defs_[dataset_name];
// If there is a type present then set it
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_TYPE)) {
dset.data_type = get_type_from_string(config.get_param<std::string>(FileWriterPlugin::CONFIG_DATASET_TYPE));
}
// If there are dimensions present for the dataset then set them
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_DIMS)) {
const rapidjson::Value& val = config.get_param<const rapidjson::Value&>(FileWriterPlugin::CONFIG_DATASET_DIMS);
// Loop over the dimension values
dimensions_t dims(val.Size());
for (rapidjson::SizeType i = 0; i < val.Size(); i++) {
const rapidjson::Value& dim = val[i];
dims[i] = dim.GetUint64();
}
dset.frame_dimensions = dims;
// Create default chunking for the dataset (to include n dimension)
dimensions_t chunks(dset.frame_dimensions.size()+1);
// Set first chunk dimension (n dimension) to a single frame or item
chunks[0] = 1;
// Set the remaining chunk dimensions to the same as the dataset dimensions
for (int index = 0; index < dset.frame_dimensions.size(); index++){
chunks[index+1] = dset.frame_dimensions[index];
}
dset.chunks = chunks;
}
// There might be chunking dimensions present for the dataset, this is not required
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_CHUNKS)) {
const rapidjson::Value& val = config.get_param<const rapidjson::Value&>(FileWriterPlugin::CONFIG_DATASET_CHUNKS);
// Loop over the dimension values
dimensions_t chunks(val.Size());
for (rapidjson::SizeType i = 0; i < val.Size(); i++) {
const rapidjson::Value& dim = val[i];
chunks[i] = dim.GetUint64();
}
dset.chunks = chunks;
}
// Check if compression has been specified for the raw data
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_COMPRESSION)) {
dset.compression = get_compression_from_string(config.get_param<std::string>(FileWriterPlugin::CONFIG_DATASET_COMPRESSION));
LOG4CXX_INFO(logger_, "Enabling compression: " << dset.compression);
}
// Blosc compression require a set of parameters to be defined
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_BLOSC_COMPRESSOR)) {
dset.blosc_compressor = config.get_param<unsigned int>(FileWriterPlugin::CONFIG_DATASET_BLOSC_COMPRESSOR);
if (dset.blosc_compressor>5) {
LOG4CXX_ERROR(logger_, "Invalid blosc compression setting " << dset.blosc_compressor);
}
}
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_BLOSC_LEVEL)) {
dset.blosc_level = config.get_param<unsigned int>(FileWriterPlugin::CONFIG_DATASET_BLOSC_LEVEL);
if (dset.blosc_level>9) {
LOG4CXX_ERROR(logger_, "Invalid blosc level setting " << dset.blosc_level);
}
}
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_BLOSC_SHUFFLE)) {
dset.blosc_shuffle = config.get_param<unsigned int>(FileWriterPlugin::CONFIG_DATASET_BLOSC_SHUFFLE);
if (dset.blosc_shuffle>2) {
LOG4CXX_ERROR(logger_, "Invalid blosc shuffle setting " << dset.blosc_shuffle);
}
}
if (dset.compression == blosc) {
LOG4CXX_INFO(logger_, "Blosc compression settings: compressor=" << dset.blosc_compressor
<< " level=" << dset.blosc_level << " shuffle=" << dset.blosc_shuffle);
}
// Check if creating the high/low indexes has been specified
if (config.has_param(FileWriterPlugin::CONFIG_DATASET_INDEXES)) {
dset.create_low_high_indexes = config.get_param<bool>(FileWriterPlugin::CONFIG_DATASET_INDEXES);
}
// Add the dataset definition to the store
dataset_defs_[dataset_name] = dset;
}
/**
* Checks to see if a dataset with the supplied name already exists. If it doesn't then
* the dataset definition is created and then added to the store.
*
* \param[in] dset_name - Name of the dataset to create.
*/
void FileWriterPlugin::create_new_dataset(const std::string& dset_name)
{
if (dataset_defs_.count(dset_name) < 1){
DatasetDefinition dset_def;
// Provide default values for the dataset
dset_def.name = dset_name;
dset_def.data_type = raw_8bit;
dset_def.compression = no_compression;
dset_def.blosc_compressor = 0;
dset_def.blosc_level = 0;
dset_def.blosc_shuffle = 0;
dset_def.num_frames = 1;
std::vector<long long unsigned int> dims(0);
dset_def.frame_dimensions = dims;
dset_def.chunks = dims;
dset_def.create_low_high_indexes = false;
// Record the dataset in the definitions
dataset_defs_[dset_def.name] = dset_def;
}
}
/**
* Deletes all dataset definitions from the plugin.
*/
void FileWriterPlugin::delete_datasets()
{
LOG4CXX_INFO(logger_, "Deleting all datasets from FileWriter plugin");
dataset_defs_.clear();
}
/**
* Collate status information for the plugin. The status is added to the status IpcMessage object.
*
* \param[out] status - Reference to an IpcMessage value to store the status.
*/
void FileWriterPlugin::status(OdinData::IpcMessage& status)
{
// Record the plugin's status items
status.set_param(get_name() + "/writing", this->writing_);
status.set_param(get_name() + "/frames_max", (int)this->current_acquisition_->frames_to_write_);
status.set_param(get_name() + "/frames_written", (int)this->current_acquisition_->frames_written_);
status.set_param(get_name() + "/frames_processed", (int)this->current_acquisition_->frames_processed_);
status.set_param(get_name() + "/file_path", this->current_acquisition_->file_path_);
status.set_param(get_name() + "/file_name", this->current_acquisition_->filename_);
status.set_param(get_name() + "/acquisition_id", this->current_acquisition_->acquisition_id_);
status.set_param(get_name() + "/processes", (int)this->concurrent_processes_);
status.set_param(get_name() + "/rank", (int)this->concurrent_rank_);
status.set_param(get_name() + "/timeout_active", this->timeout_active_);
add_file_writing_stats(status);
}
/**
* Collate file writing statistics for the plugin.
*
* The metrics are added to the status IpcMessage object.
*
* \param[out] status - Reference to an IpcMessage value to store the file writing stats.
*/
void FileWriterPlugin::add_file_writing_stats(OdinData::IpcMessage& status)
{
status.set_param(get_name() + "/timing/last_create", (int) hdf5_call_durations_.create.last_);
status.set_param(get_name() + "/timing/max_create", (int) hdf5_call_durations_.create.max_);
status.set_param(get_name() + "/timing/mean_create", (int) hdf5_call_durations_.create.mean_);
status.set_param(get_name() + "/timing/last_write", (int) hdf5_call_durations_.write.last_);
status.set_param(get_name() + "/timing/max_write", (int) hdf5_call_durations_.write.max_);
status.set_param(get_name() + "/timing/mean_write", (int) hdf5_call_durations_.write.mean_);
status.set_param(get_name() + "/timing/last_flush", (int) hdf5_call_durations_.flush.last_);
status.set_param(get_name() + "/timing/max_flush", (int) hdf5_call_durations_.flush.max_);
status.set_param(get_name() + "/timing/mean_flush", (int) hdf5_call_durations_.flush.mean_);
status.set_param(get_name() + "/timing/last_close", (int) hdf5_call_durations_.close.last_);
status.set_param(get_name() + "/timing/max_close", (int) hdf5_call_durations_.close.max_);
status.set_param(get_name() + "/timing/mean_close", (int) hdf5_call_durations_.close.mean_);
}
/**
* Reset file writing statistics
*/
bool FileWriterPlugin::reset_statistics()
{
hdf5_call_durations_.create.reset();
hdf5_call_durations_.write.reset();
hdf5_call_durations_.flush.reset();
hdf5_call_durations_.close.reset();
return true;
}
/** This function checks the acquisition id of the frame matches that of the current acquisition,
* subject to two caveats:
* i) if the frame-aid is "", it matches anything
* ii) this function has the side-effect of moving from the current-acq to
* the next-acq if that helps us match the frame.
*
* The function will set an error if the frame does not match a writing acquisition.
* \param[in] frame - Pointer to the Frame object.
*/
bool FileWriterPlugin::frame_in_acquisition(boost::shared_ptr<Frame> frame) {
std::string frame_acquisition_ID = frame->get_meta_data().get_acquisition_ID();
if (!frame_acquisition_ID.empty()) {
if (writing_) {
if (frame_acquisition_ID == current_acquisition_->acquisition_id_) {
// On same file, take no action
return true;
}
}
if (frame_acquisition_ID == next_acquisition_->acquisition_id_) {
LOG4CXX_DEBUG_LEVEL(1, logger_, "Acquisition ID sent in frame matches next acquisition ID. "
"Closing current file and starting next");
stop_writing();
start_writing();
} else {
std::stringstream ss;
ss << "Unexpected acquisition ID on acquisition (" << frame_acquisition_ID << ")";
if (writing_)
{
set_error(ss.str());
ss << " for frame " << frame->get_frame_number();
LOG4CXX_WARN(logger_, ss.str());
}
else
{
ss << " for frame " << frame->get_frame_number();
LOG4CXX_DEBUG(logger_, ss.str());
}
return false;
}
}
return true;
}
/**
* Stops the current acquisition and starts the next if it is configured
*
*/
void FileWriterPlugin::stop_acquisition() {
// Before stopping the current acquisition, check for identical paths, filenames or
// acquisition IDs. If these are found do not automatically start the next acquisition
bool restart = true;
if (writing_){
if (!next_acquisition_->file_path_.empty()){
if (next_acquisition_->file_path_ == current_acquisition_->file_path_){
if (!next_acquisition_->configured_filename_.empty()){
if (next_acquisition_->configured_filename_ == current_acquisition_->configured_filename_){
// Identical path and filenames so do not re-start
restart = false;
LOG4CXX_INFO(logger_, "FrameProcessor will not auto-restart acquisition due to identical filename and path");
}
}
if (!next_acquisition_->acquisition_id_.empty()){
if (next_acquisition_->acquisition_id_ == current_acquisition_->acquisition_id_){
// Identical path and acquisition IDs so do not re-start
restart = false;
LOG4CXX_INFO(logger_, "FrameProcessor will not auto-restart acquisition due to identical file path and acquisition ID");
}
}
}
}
}
this->stop_writing();
if (restart){
// Start next acquisition if we have a filename or acquisition ID to use
if (!next_acquisition_->configured_filename_.empty() || !next_acquisition_->acquisition_id_.empty()) {
if (next_acquisition_->total_frames_ > 0 && next_acquisition_->frames_to_write_ == 0) {
// We're not expecting any frames, so just clear out the nextAcquisition for the next one and don't start writing
this->next_acquisition_ = boost::shared_ptr<Acquisition>(new Acquisition(hdf5_error_definition_));
LOG4CXX_INFO(logger_, "FrameProcessor will not receive any frames from this acquisition and so no output file will be created");
} else {
this->start_writing();
}
}
}
// Prevent the timeout from closing the file as it's just been closed. This will also stop the timer if it's running
timeout_active_ = false;
}
/**
* Starts the close file timeout
*
*/
void FileWriterPlugin::start_close_file_timeout()
{
if (timeout_active_ == false) {
LOG4CXX_INFO(logger_, "Starting close file timeout");
boost::mutex::scoped_lock lock(start_timeout_mutex_);
start_condition_.notify_all();
} else {
LOG4CXX_INFO(logger_, "Close file timeout already active");
}
}
/**
* Function that is run by the close file timeout thread
*
* This waits until notified to start, and then runs a timer. If the timer times out,
* then the current acquisition is stopped. If the timer is notified before timing out
* (by a frame being written) then no action is taken, as it will either start the timer
* again or go back to the wait for start state, depending on the value of timeoutActive.
*
*/
void FileWriterPlugin::run_close_file_timeout()
{
OdinData::configure_logging_mdc(OdinData::app_path.c_str());
boost::mutex::scoped_lock startLock(start_timeout_mutex_);
while (timeout_thread_running_) {
start_condition_.wait(startLock);
if (timeout_thread_running_) {
timeout_active_ = true;
boost::mutex::scoped_lock lock(close_file_mutex_);
while (timeout_active_) {
if (!timeout_condition_.timed_wait(lock, boost::posix_time::milliseconds(timeout_period_))) {
// Timeout
LOG4CXX_DEBUG_LEVEL(1, logger_, "Close file Timeout timed out");
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
if (writing_ && timeout_active_) {
set_error("Timed out waiting for frames, stopping writing");
stop_acquisition();
}
timeout_active_ = false;
} else {
// Event - No need to do anything. The timeout will either wait for another period if active or wait until it is restarted
}
}
}
}
}
/**
* Calculates the number of frames that this FileWriter can expect to write based on the total number of frames
*
* \param[in] totalFrames - The total number of frames in the acquisition
* \return - The number of frames that this FileWriter is expected to write
*/
size_t FileWriterPlugin::calc_num_frames(size_t totalFrames)
{
size_t num_of_frames = 0;
// Work out how many 'rounds' where all processes are writing whole blocks
size_t blocks_needed = totalFrames / frames_per_block_;
size_t num_whole_rounds_needed = blocks_needed / this->concurrent_processes_;
num_of_frames = num_whole_rounds_needed * frames_per_block_;
// Now work out if there are any left over half-complete rounds
size_t leftover = totalFrames - (num_of_frames * this->concurrent_processes_);
size_t remaining = 0;
// If there is a leftover, and this processor gets any of the remaining frames, add this to the total
if (leftover > (this->concurrent_rank_ * frames_per_block_))
{
remaining = leftover - (this->concurrent_rank_ * frames_per_block_);
if (remaining > 0) {
num_of_frames += std::min(remaining, frames_per_block_);
}
}
return num_of_frames;
}
int FileWriterPlugin::get_version_major()
{
return ODIN_DATA_VERSION_MAJOR;
}
int FileWriterPlugin::get_version_minor()
{
return ODIN_DATA_VERSION_MINOR;
}
int FileWriterPlugin::get_version_patch()
{
return ODIN_DATA_VERSION_PATCH;
}
std::string FileWriterPlugin::get_version_short()
{
return ODIN_DATA_VERSION_STR_SHORT;
}
std::string FileWriterPlugin::get_version_long()
{
return ODIN_DATA_VERSION_STR;
}
} /* namespace FrameProcessor */