odin-detector/odin-data

View on GitHub
cpp/frameProcessor/src/DummyUDPProcessPlugin.cpp

Summary

Maintainability
Test Coverage
/*
 * DummyUDPProcessPlugin.cpp
 * 
 *  Created on: 22 Jul 2019
 *      Author: Tim Nicholls, STFC Detector Systems Software Group
 */

#include "DummyUDPProcessPlugin.h"
#include "version.h"

namespace FrameProcessor
{

  // Configuration parameter name definitions
  const std::string DummyUDPProcessPlugin::CONFIG_IMAGE_WIDTH = "width";
  const std::string DummyUDPProcessPlugin::CONFIG_IMAGE_HEIGHT = "height";
  const std::string DummyUDPProcessPlugin::CONFIG_COPY_FRAME = "copy_frame";
  // Command and parameters
  const std::string DummyUDPProcessPlugin::EXECUTE_PRINT = "print";

  /**
   * The constructor sets up default configuration parameters and logging used within the class.
   */
  DummyUDPProcessPlugin::DummyUDPProcessPlugin() :
    image_width_(1400),
    image_height_(1024),
    packets_lost_(0),
    copy_frame_(true)
  {
    // Setup logging for the class
    logger_ = Logger::getLogger("FP.DummyUDPProcessPlugin");
    LOG4CXX_INFO(logger_, "DummyUDPProcessPlugin version " << this->get_version_long() << " loaded");
  }

  /**
   * Destructor.
   */
  DummyUDPProcessPlugin::~DummyUDPProcessPlugin()
  {
    LOG4CXX_TRACE(logger_, "DummyUDPProcessPlugin destructor.");
  }

  /**
   * Get the plugin major version number.
   * 
   * \return major version number as an integer
   */ 
  int DummyUDPProcessPlugin::get_version_major()
  {
    return ODIN_DATA_VERSION_MAJOR;
  }

  /**
   * Get the plugin minor version number.
   * 
   * \return minor version number as an integer
   */ 
  int DummyUDPProcessPlugin::get_version_minor()
  {
    return ODIN_DATA_VERSION_MINOR;
  }

  /**
   * Get the plugin patch version number.
   * 
   * \return patch version number as an integer
   */ 
  int DummyUDPProcessPlugin::get_version_patch()
  {
    return ODIN_DATA_VERSION_PATCH;
  }

 /**
   * Get the plugin short version (e.g. x.y.z) string.
   * 
   * \return short version as a string
   */ 
  std::string DummyUDPProcessPlugin::get_version_short()
  {
    return ODIN_DATA_VERSION_STR_SHORT;
  }

  /**
   * Get the plugin long version (e.g. x.y.z-qualifier) string.
   * 
   * \return long version as a string
   */ 
  std::string DummyUDPProcessPlugin::get_version_long()
  {
    return ODIN_DATA_VERSION_STR;
  }

  /**
   * Configure the plugin.  This receives an IpcMessage which should be processed
   * to configure the plugin, and any response can be added to the reply IpcMessage.
   *
   * \param[in] config - Reference to the configuration IpcMessage object.
   * \param[out] reply - Reference to the reply IpcMessage object.
   */
  void DummyUDPProcessPlugin::configure(OdinData::IpcMessage& config, OdinData::IpcMessage& reply)
  {

    if (config.has_param(DummyUDPProcessPlugin::CONFIG_IMAGE_WIDTH))
    {
      image_width_ = config.get_param<int>(DummyUDPProcessPlugin::CONFIG_IMAGE_WIDTH);
      LOG4CXX_DEBUG(logger_, "Setting image width to " << image_width_);
    }

    if (config.has_param(DummyUDPProcessPlugin::CONFIG_IMAGE_HEIGHT))
    {
      image_height_ = config.get_param<int>(DummyUDPProcessPlugin::CONFIG_IMAGE_HEIGHT);
      LOG4CXX_DEBUG(logger_, "Setting image height to " << image_height_);
    }

    if (config.has_param(DummyUDPProcessPlugin::CONFIG_COPY_FRAME))
    {
      copy_frame_ = config.get_param<bool>(DummyUDPProcessPlugin::CONFIG_COPY_FRAME);
      LOG4CXX_DEBUG(logger_, "Setting copy frame mode to " << (copy_frame_ ? "true" : "false"));
    }

  }

  /**
   * Respond to configuration requests from clients.
   * 
   * This method responds to configuration requests from client, populating the supplied IpcMessage
   * reply with configured parameters.
   * 
   * \param[in,out] reply - IpcMessage response to client, to be populated with plugin parameters
   */
  void DummyUDPProcessPlugin::requestConfiguration(OdinData::IpcMessage& reply)
  {

    std::string base_str = get_name() + "/";
    reply.set_param(base_str + DummyUDPProcessPlugin::CONFIG_IMAGE_WIDTH, image_width_);
    reply.set_param(base_str + DummyUDPProcessPlugin::CONFIG_IMAGE_HEIGHT, image_height_);
    reply.set_param(base_str + DummyUDPProcessPlugin::CONFIG_COPY_FRAME, copy_frame_);
  }

  /**
   * Execute a command on the plugin.  This receives an IpcMessage which should be processed
   * to execute a command within the plugin, and any response can be added to the reply IpcMessage.
   * The dummy plugin implements a single command "print" that prints the value of the parameter named.
   *
   * \param[in] config - String containing the command to execute.
   * \param[out] reply - Reference to the reply IpcMessage object.
   */
  void DummyUDPProcessPlugin::execute(const std::string& command, OdinData::IpcMessage& reply)
  {
    if (command == DummyUDPProcessPlugin::EXECUTE_PRINT){
      LOG4CXX_INFO(logger_, "Image width is " << image_width_);
      LOG4CXX_INFO(logger_, "Image height is " << image_height_);
      LOG4CXX_INFO(logger_, "Copy frame is " << copy_frame_);
    } else {
      std::stringstream is;
      is << "Submitted command not supported: " << command;
      LOG4CXX_ERROR(logger_, is.str());
      throw std::runtime_error(is.str().c_str());
    }
  }

  /**
   * Respond to command execution requests from clients.
   * 
   * This method responds to command executions requests from client, populating the supplied IpcMessage
   * reply with the commands and command parameters supported by this plugin.
   * 
   * \return - Vector containing supported command strings.
   */
  std::vector<std::string> DummyUDPProcessPlugin::requestCommands()
  {
    // Reply with a vector of supported command strings.
    std::vector<std::string> cmds = {DummyUDPProcessPlugin::EXECUTE_PRINT};
    return cmds;
  }

  /**
   * Collate status information for the plugin.  The status is added to the status IpcMessage object.
   *
   * \param[out] status - Reference to an IpcMessage value to store the status.
   */
  void DummyUDPProcessPlugin::status(OdinData::IpcMessage& status)
  {
    std::string base_str = get_name() + "/";
    status.set_param(base_str + "packets_lost", packets_lost_);
  }

  /**
   * Reset process plugin statistics, i.e. counter of packets lost
   */
  bool DummyUDPProcessPlugin::reset_statistics(void)
  {
    packets_lost_ = 0;
    
    return true;
  }

  /**
   * Perform processing on the frame. For the DummyUDPProcessPlugin class this involves dealing
   * with any lost packets and then simply copying the data buffer into an appropriately 
   * dimensioned output frame.
   *
   * \param[in] frame - Pointer to a Frame object.
   */
  void DummyUDPProcessPlugin::process_frame(boost::shared_ptr<Frame> frame)
  {
    LOG4CXX_TRACE(logger_, "Received a new frame");

    DummyUDP::FrameHeader* hdr_ptr = static_cast<DummyUDP::FrameHeader*>(frame->get_data_ptr());

    LOG4CXX_DEBUG(logger_, "Raw frame number: " << hdr_ptr->frame_number);
    LOG4CXX_DEBUG(logger_, "Frame state: " << hdr_ptr->frame_state);
    LOG4CXX_DEBUG(logger_, "Packets expected: " << hdr_ptr->total_packets_expected);
    LOG4CXX_DEBUG(logger_, "Packets received: " << hdr_ptr->total_packets_received
      << " expected: " << hdr_ptr->total_packets_expected);
    LOG4CXX_DEBUG(logger_, "Packet size: " << hdr_ptr->packet_size);

    // Process any lost packets
    this->process_lost_packets(frame);

    // Obtain a pointer to the start of the data in the frame
    const void* data_ptr = static_cast<const void*>(
      static_cast<const char*>(frame->get_data_ptr()) + sizeof(DummyUDP::FrameHeader)
    );

    // Create and populate metadata for the output frame
    FrameMetaData frame_meta;

    frame_meta.set_dataset_name("dummy");
    frame_meta.set_data_type(raw_16bit);
    frame_meta.set_frame_number(static_cast<long long>(hdr_ptr->frame_number));
    dimensions_t dims(2);
    dims[0] = image_height_;
    dims[1] = image_width_;
    frame_meta.set_dimensions(dims);
    frame_meta.set_compression_type(no_compression);

    // Calculate output image size
    const std::size_t output_image_size = image_width_ * image_height_ * sizeof(uint16_t);

    // If copy frame mode is selected, create a new data block frame, copy the image data
    // in and push it. Otherwise, set metadata, image size and offset on the current incoming
    // frame, typically a shared buffer frame, and push that.

    if (copy_frame_) 
    {
      LOG4CXX_DEBUG(logger_, "Copying data for frame " << hdr_ptr->frame_number << " to output");

      // Construct a new data block object to output the processed frame 
      boost::shared_ptr<Frame> output_frame;
      output_frame = boost::shared_ptr<Frame>(new DataBlockFrame(frame_meta, output_image_size));

      // Get a pointer to the data buffer in the output frame
      void* output_ptr = output_frame->get_data_ptr();

      // Copy processed frame data into the output frame
      memcpy(output_ptr, data_ptr, output_image_size);

      // Push the output frame 
      this->push(output_frame);
    }
    else
    {
      LOG4CXX_DEBUG(logger_, "Using existing frame " << hdr_ptr->frame_number << " for output");

      // Set metadata on existing frame
      frame->set_meta_data(frame_meta);

      // Set iamge offset on exisitng frame
      frame->set_image_offset(sizeof(DummyUDP::FrameHeader));

      // Set output image size on existing frame
      frame->set_image_size(output_image_size);

      // Push the existing frame
      this->push(frame);
    }
  }

  /**
   * Process and report lost UDP packets for the frame
   *
   * \param[in] frame - Pointer to a Frame object.
   */
  void DummyUDPProcessPlugin::process_lost_packets(boost::shared_ptr<Frame>& frame)
  {
    const DummyUDP::FrameHeader* hdr_ptr = 
      static_cast<const DummyUDP::FrameHeader*>(frame->get_data_ptr());

    // Process lost packets if frame header reports any missing
    int hdr_packets_lost = (hdr_ptr->total_packets_expected - hdr_ptr->total_packets_received);
    if (hdr_packets_lost > 0)
    {

      LOG4CXX_DEBUG(logger_,  "Processing " << hdr_packets_lost 
        << " lost packets for frame " << hdr_ptr->frame_number);

      int packets_lost = 0;
      char* payload_ptr = static_cast<char*>(frame->get_data_ptr()) + sizeof(DummyUDP::FrameHeader);

      // Loop over all packets in frame
      for (int packet_idx = 0; packet_idx <  hdr_ptr->total_packets_expected; packet_idx++)
      {
        // If header reports packet missing, zero out the packet
        if (hdr_ptr->packet_state[packet_idx] == 0)
        {
          LOG4CXX_DEBUG(logger_, "Missing packet number " << packet_idx);
          char* packet_ptr = payload_ptr + (hdr_ptr->packet_size * packet_idx);
          memset(packet_ptr, 0, hdr_ptr->packet_size);
          packets_lost++;
        }
      }
      // Check if there's a mismatch between packets reported lost by the header and found
      // by scanning the packet state information.
      if (packets_lost != hdr_packets_lost)
      {
        LOG4CXX_WARN(logger_, "Packet loss mismatch between frame header ("
          << hdr_packets_lost << ") and packet state (" << packets_lost << ")");
      }
      packets_lost_ += packets_lost;
    }
    else
    {
      LOG4CXX_DEBUG(logger_, "No lost packets to process on frame " << hdr_ptr->frame_number);
    }
  }

  
} /* namespace FrameProcessor */