odin-detector/odin-data

View on GitHub
cpp/frameProcessor/src/LiveViewPlugin.cpp

Summary

Maintainability
Test Coverage
/*
 * LiveImageViewPlugin.cpp
 *
 *  Created on: 6 Sept 2018
 *      Author: Ashley Neaves
 */

#include "LiveViewPlugin.h"
#include "version.h"
#include <boost/algorithm/string.hpp>

namespace FrameProcessor
{
/* Default Config*/
const int32_t     LiveViewPlugin::DEFAULT_FRAME_FREQ = 1;
const int32_t     LiveViewPlugin::DEFAULT_PER_SECOND = 0;
const std::string LiveViewPlugin::DEFAULT_IMAGE_VIEW_SOCKET_ADDR = "tcp://127.0.0.1:5020";
const std::string LiveViewPlugin::DEFAULT_DATASET_NAME = "";
const std::string LiveViewPlugin::DEFAULT_TAGGED_FILTER = "";

/* Config Names*/
const std::string LiveViewPlugin::CONFIG_FRAME_FREQ =  "frame_frequency";
const std::string LiveViewPlugin::CONFIG_PER_SECOND =  "per_second";
const std::string LiveViewPlugin::CONFIG_SOCKET_ADDR = "live_view_socket_addr";
const std::string LiveViewPlugin::CONFIG_DATASET_NAME = "dataset_name";
const std::string LiveViewPlugin::CONFIG_TAGGED_FILTER_NAME = "filter_tagged";

/**
 * Constructor for this class. Sets up ZMQ pub socket and other default values for the config
 */
LiveViewPlugin::LiveViewPlugin() :
    publish_socket_(ZMQ_PUB),
    is_bound_(false),
    time_last_frame_(boost::posix_time::min_date_time)
{
  logger_ = Logger::getLogger("FP.LiveViewPlugin");
  LOG4CXX_INFO(logger_, "LiveViewPlugin version " << this->get_version_long() << " loaded");

  set_frame_freq_config(DEFAULT_FRAME_FREQ);
  set_per_second_config(DEFAULT_PER_SECOND);
  set_dataset_name_config(DEFAULT_DATASET_NAME);
  set_tagged_filter_config(DEFAULT_TAGGED_FILTER);
}

/**
 * Class Destructor. Closes the Publish socket
 */
LiveViewPlugin::~LiveViewPlugin()
{
  LOG4CXX_TRACE(logger_, "LiveViewPlugin destructor.");
  publish_socket_.close();
}

/**
 * Process recieved frame. For the live view plugin, this means checking if certain conditions are true (time elapsed, frame number, dataset name)
 * and then, if the conditions mean sending the frame, creating a json header and copying the data to send to the publisher socket.
 * 
 * \param[in] frame - pointer to a frame object.
 */
void LiveViewPlugin::process_frame(boost::shared_ptr<Frame> frame)
{
  /** Static Frame Count will increment each time this method is called, basically as a count of how many frames have been processed by the plugin*/
  static uint32_t frame_count_;
  LOG4CXX_TRACE(logger_, "LiveViewPlugin Process Frame.");
  if(is_bound_)
  {
    std::string frame_dataset = frame->get_meta_data().get_dataset_name();
    /* If datasets is empty, or contains the frame's dataset, then we can potentially send it*/
    std::map<std::string, int>::iterator dataset = datasets_.find(frame_dataset);
    if (datasets_.empty() || dataset != datasets_.end())
    {
      /* If either filtering by tag is disabled, or the frame has the tagged param */
      bool tag_filter_active = !tags_.empty();
      bool is_tagged = false;
      if (tag_filter_active)
      {
        for (int i = 0; i < tags_.size(); i++)
        {
          if (frame->get_meta_data().has_parameter(tags_[i]))
          {
            is_tagged = true;
            break;
          }
        }
      }

      // If the frame is tagged or the tag filter is inactive, test if the rate and frequency
      // conditions are set to pass the frame to clients
      if (!tag_filter_active || is_tagged)
      {
        bool pass_frame = false;

        // If the per-second rate setting is active, check time since last live view frame sent
        if (per_second_ != 0)
        {
          boost::posix_time::time_duration elapsed = (boost::posix_time::microsec_clock::local_time() - time_last_frame_);
          if (elapsed > time_between_frames_)
          {
            LOG4CXX_TRACE(logger_, "Frame " << frame->get_frame_number() << " elapsed time " << elapsed << " > " << time_between_frames_);
            pass_frame = true;
          }
        }

        // If the frame frequency setting is active, check if this frame should be sent
        int count;
        if(dataset != datasets_.end())
        {
          count = dataset->second++;
        }
        else
        {
          count = frame_count_++;
        }
        if (frame_freq_ != 0 && count % frame_freq_ == 0)
        {
          LOG4CXX_TRACE(logger_, "Frame " << frame->get_frame_number() << " count matches frequency " << frame_freq_);
          pass_frame = true;
        }

        // Pass the frame to live view clients if one of the conditions above has been met
        if (pass_frame) {
          pass_live_frame(frame);
        }
      }
      else
      {
        LOG4CXX_TRACE(logger_, "LiveViewPlugin No Tag(s) found, frame skipped.");
      }
    }
    else
    {
      LOG4CXX_TRACE(logger_,"Frame dataset: " << frame_dataset << " not desired");
    }

  }
  else
  {
    LOG4CXX_WARN(logger_, "Socket is unbound. Check if address " << image_view_socket_addr_ << " is in use.");
  }

  LOG4CXX_TRACE(logger_, "Pushing Data Frame" );
  this->push(frame);
}


/**
 * Set configuration options for this Plugin.
 *
 * This sets up the Live View Plugin according to the configuration IpcMessage
 * objects that are received.
 *
 * \param[in] config - IpcMessage containing configuration data.
 * \param[out] reply - Response IpcMessage.
 */
void LiveViewPlugin::configure(OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
{
  try{
    /* Check if we're setting the frequency of frames to show*/
    if (config.has_param(CONFIG_FRAME_FREQ))
    {
      set_frame_freq_config(config.get_param<int32_t>(CONFIG_FRAME_FREQ));
    }
    /* Check if we're setting the per_second config*/
    if (config.has_param(CONFIG_PER_SECOND))
    {
      set_per_second_config(config.get_param<int32_t>(CONFIG_PER_SECOND));
    }
    /* Check if we are setting the dataset name filter*/
    if (config.has_param(CONFIG_DATASET_NAME))
    {
      set_dataset_name_config(config.get_param<std::string>(CONFIG_DATASET_NAME));
    }
    if (config.has_param(CONFIG_TAGGED_FILTER_NAME))
    {
      set_tagged_filter_config(config.get_param<std::string>(CONFIG_TAGGED_FILTER_NAME));
    }
    /* Display warning if configuration sets the plugin to do nothing*/
    if (per_second_ == 0 && frame_freq_ == 0)
    {
      LOG4CXX_WARN(logger_, "Current Live View Config results in it doing nothing.");
    }
    /* Check if we're setting the address of the socket to send the live view frames to.*/
    if (config.has_param(CONFIG_SOCKET_ADDR))
    {
      set_socket_addr_config(config.get_param<std::string>(CONFIG_SOCKET_ADDR));
    }
    else if (not is_bound_) //do not default if the socket is already bound
    {
      set_socket_addr_config(DEFAULT_IMAGE_VIEW_SOCKET_ADDR);
    }
    
  }
  catch (std::runtime_error& e)
  {
    std::stringstream ss;
    ss << "Bad ctrl msg: " << e.what();
    this->set_error(ss.str());
    throw;
  }
}

/**
 * Get the configuration values for this Plugin.
 *
 * \param[out] reply - Response IpcMessage.
 */
void LiveViewPlugin::requestConfiguration(OdinData::IpcMessage& reply)
{
  reply.set_param(get_name() + "/" + LiveViewPlugin::CONFIG_FRAME_FREQ, frame_freq_);
  reply.set_param(get_name() + "/" + LiveViewPlugin::CONFIG_SOCKET_ADDR, image_view_socket_addr_);
  reply.set_param(get_name() + "/" + LiveViewPlugin::CONFIG_PER_SECOND, per_second_);
}

/**
 * Constructs a header with information about the data frame, then sends that header and the data
 * to the ZMQ socket interface to be consumed by an external live viewer.
 * The Header contains the following:
 * - int32_t Frame number
 * - string   Acquisition ID
 * - string   Data Type
 * - size_t   Data Size
 * - string   compression type
 * - size_t[] dimensions
 * \param[in] frame - pointer to the data frame
 * \param[in] frame_num - the number of the frame
 * 
 */
void LiveViewPlugin::pass_live_frame(boost::shared_ptr<Frame> frame)
{
  void* frame_data_copy = (void*)frame->get_image_ptr();

  const FrameMetaData meta_data = frame->get_meta_data();

  uint32_t frame_num = frame->get_frame_number();
  std::string aqqID = meta_data.get_acquisition_ID();
  dimensions_t dim = meta_data.get_dimensions();
  std::string type = get_type_from_enum((DataType)meta_data.get_data_type());
  std::size_t size = frame->get_image_size();
  std::string compress = get_compress_from_enum((CompressionType)meta_data.get_compression_type());
  std::string dataset = meta_data.get_dataset_name();

  rapidjson::Document document; /* Header info*/
  document.SetObject();
  LOG4CXX_TRACE(logger_, "LiveViewPlugin Building Frame Header");
  //building image header

  add_json_member(&document, "frame_num", frame_num);
  add_json_member(&document, "acquisition_id", aqqID);
  add_json_member(&document, "dtype", type);
  add_json_member(&document, "dsize", static_cast<uint64_t>(size));
  add_json_member(&document, "dataset", dataset);
  add_json_member(&document, "compression", compress);


  //getting tags manually because it is an array
  rapidjson::Value keyTags("tags", document.GetAllocator());
  rapidjson::Value valueTags(rapidjson::kArrayType);
  if (!tags_.empty())
  {
    for(int i = 0; i < tags_.size(); i++)
    {
      if (meta_data.has_parameter(tags_[i]))
      {
        rapidjson::Value tagStringVal(tags_[i].c_str(), document.GetAllocator());
        valueTags.PushBack(tagStringVal, document.GetAllocator());
      }
    }
    document.AddMember(keyTags, valueTags, document.GetAllocator());
  }

  //getting dimensions manually because its an array
  rapidjson::Value keyDims("shape", document.GetAllocator());
  rapidjson::Value valueDims(rapidjson::kArrayType);

  size_t dim_size = dim.size();
  for(size_t i=0; i < dim_size; i++)
  {
    std::string dimString = boost::to_string(dim[i]);
    rapidjson::Value dimStringVal(dimString.c_str(), document.GetAllocator());
    valueDims.PushBack(dimStringVal, document.GetAllocator());
  }

  document.AddMember(keyDims, valueDims, document.GetAllocator());

  //convert to a json like string so that it can be passed along the socket as the image header
  rapidjson::StringBuffer buffer;
  buffer.Clear();
  rapidjson::Writer<rapidjson::StringBuffer, rapidjson::UTF8<> > writer(buffer);

  document.Accept(writer);

  LOG4CXX_TRACE(logger_, "LiveViewPlugin Header Built, sending down socket.");
  publish_socket_.send(buffer.GetString(), ZMQ_SNDMORE);
  LOG4CXX_TRACE(logger_, "LiveViewPlugin Sending frame raw data");
  publish_socket_.send(size, frame_data_copy, 0);

  time_last_frame_ = boost::posix_time::microsec_clock::local_time();
}

void LiveViewPlugin::add_json_member(rapidjson::Document* document, std::string key, std::string value)
{
  rapidjson::Value rkey(key.c_str(), document->GetAllocator());
  rapidjson::Value rvalue(value.c_str(), document->GetAllocator());
  document->AddMember(rkey, rvalue, document->GetAllocator());
}

void LiveViewPlugin::add_json_member(rapidjson::Document* document, std::string key, uint32_t value)
{
  rapidjson::Value rkey(key.c_str(), document->GetAllocator());
  rapidjson::Value rvalue(value);
  document->AddMember(rkey, rvalue, document->GetAllocator());
}
int LiveViewPlugin::get_version_major()
{
  return ODIN_DATA_VERSION_MAJOR;
}

int LiveViewPlugin::get_version_minor()
{
  return ODIN_DATA_VERSION_MINOR;
}

int LiveViewPlugin::get_version_patch()
{
  return ODIN_DATA_VERSION_PATCH;
}

std::string LiveViewPlugin::get_version_short()
{
  return ODIN_DATA_VERSION_STR_SHORT;
}

std::string LiveViewPlugin::get_version_long()
{
  return ODIN_DATA_VERSION_STR;
}

/**
 * Sets the config value "per_second" which tells the plugin the number of frames to show each second
 * Setting this value to 0 disables the "frames per second" checking
 * \param[in] value - the value to assign to per_second
 */ 
void LiveViewPlugin::set_per_second_config(int32_t value)
{
  per_second_ = value;

  if (per_second_ == 0)
  {
    LOG4CXX_INFO(logger_, "Disabling Frames Per Second Option");
  }
  else
  {
    LOG4CXX_INFO(logger_, "Displaying " << per_second_ << " frames per second");
    time_between_frames_ = boost::posix_time::milliseconds(1000 / per_second_);
  }
}
/**
 * Sets the Frame Frequency config value. This value tells the plugin to show every Nth frame.
 * Setting this value to 0 disables this check
 * \param[in] value - the value to assign to frame_freq
 */ 
void LiveViewPlugin::set_frame_freq_config(int32_t value)
{
  frame_freq_ = value;
  if (frame_freq_ == 0)
  {
    LOG4CXX_INFO(logger_, "Disabling Frame Frequency");
  }
  else
  {
    LOG4CXX_INFO(logger_, "Showing every " << frame_freq_ << " frame(s)");
  }
}
/**
 * Sets the address of the publisher socket that live view data is sent to.
 * When setting this address, it also binds the socket to the address, unbinding it from any previous address given
 * \param[in] value - the address to bind the socket to.
 */
void LiveViewPlugin::set_socket_addr_config(std::string value)
{
  //we dont want to unbind and rebind the same address, as it can cause an error if it takes time to unbind, so we check first
  if (publish_socket_.has_bound_endpoint(value))
  {
    LOG4CXX_WARN(logger_, "Socket already bound to " << value << ". Doing nothing");
    return;
  }

  try
  {
    uint32_t linger = 0;
    publish_socket_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    publish_socket_.unbind(image_view_socket_addr_.c_str());
    //set global variable as soon as socket is unbound in case any errors are caused by rebinding.
    is_bound_ = false;
    image_view_socket_addr_ = value;
    LOG4CXX_INFO(logger_, "Setting Live View Socket Address to " << image_view_socket_addr_);
    publish_socket_.bind(image_view_socket_addr_);
    is_bound_ = true;
    LOG4CXX_INFO(logger_, "Live View Socket bound successfully");
  }
  catch(zmq::error_t& e)
  {
    LOG4CXX_ERROR(logger_, "Error binding socket to address " << value << " Error Number: " << e.num());
  }
}

/**
 * Sets the dataset filter configuration. The live view output can be filtered by the dataset variable on the frame based off a list
 * of desired datasets.
 * \param[in] value - A comma deliminated list of dataset names, or an empty string to ignore this filtering.
 */
void LiveViewPlugin::set_dataset_name_config(std::string value)
{
  std::string delim = ",";
  datasets_.clear();
  std::vector<std::string> dataset_names;
  if (!value.empty())
  {
    //delim value string by comma
    
    boost::split(dataset_names, value, boost::is_any_of(delim));
    std::for_each(dataset_names.begin(), dataset_names.end(), [this](std::string& name)
    {
      boost::trim(name);
      datasets_.insert(std::pair<std::string, int>(name, 0));
    });
  }

  //loop to log datasets
  std::string dataset_string = "";
  for (int i = 0; i < dataset_names.size(); i++)
  {
    dataset_string += dataset_names[i] + ":";
  }
  LOG4CXX_INFO(logger_, "Setting the datasets allowed to: " << dataset_string);
}

void LiveViewPlugin::set_tagged_filter_config(std::string value)
{
  std::string delim = ",";
  tags_.clear();
  if (!value.empty())
  {
    boost::split(tags_, value, boost::is_any_of(delim));
  }
  std::string tags_string = "";
  for (int i = 0; i < tags_.size(); i++)
  {
    boost::trim(tags_[i]);
    tags_string += tags_[i] + ", ";
  }
  LOG4CXX_INFO(logger_, "Only Displaying images with the following tags: " << tags_string);
}

}/*namespace FrameProcessor*/