odin-detector/odin-data

View on GitHub
cpp/common/src/IpcReactor.cpp

Summary

Maintainability
Test Coverage
/*!
 * IpcReactor.cpp - implementation of the IpcReactor and associated classes
 *
 *  Created on: Feb 16, 2015
 *      Author: Tim Nicholls, STFC Application Engineering Group
 */

#include "IpcReactor.h"
#include "gettime.h"

using namespace OdinData;

//! Constructor - instantiates an IpcReactorTimer object
//!
//! This instantiates and IpcReactorTimer object for use in an IpcReactor. The timer is
//! not responsible for firing itself; rather it is a tracking object with the real work
//! being performed by the IpcReactor event loop. The timer runs periodically, either
//! forever or for a fixed number of times.
//!
//! \param delay_ms Timer delay in milliseconds for
//! \param times    Number of times the timer should fire, a value of 0 indicates forever
//! \param callback Callback function signature to be called when the timer fires

IpcReactorTimer::IpcReactorTimer(size_t delay_ms, size_t times, TimerCallback callback) :
    timer_id_(last_timer_id_++),
    delay_ms_(delay_ms),
    times_(times),
    callback_(callback),
    when_(clock_mono_ms() + delay_ms),
    expired_(false)
{
}

//! Destructor - destroys an IpcReactorTimer object
IpcReactorTimer::~IpcReactorTimer()
{
}

//! Returns the unique ID of the timer instance
//!
//! This method returns the unique ID of the timer, which can be used to track and remove
//! the timer from the reactor as necessary
//!
//! \return int unique timer id

int IpcReactorTimer::get_id(void)
{
  return timer_id_;
}

//! Executes the registered callback method of the timer
//!
//! This method executes the registered callback method of the timer and then
//! evaluates if the timer should fire again. If so, the time of the next firing
//! is updated according to the specified delay

void IpcReactorTimer::do_callback(void)
{
  // Call the registered callback function
  this->callback_();

  // If the timer has a finite numnber of times to run, decrement and set
  // the expired flag if necessary, otherwise reset the when variable to
  // indicate the next time it should fire
  if (times_ > 0 && --times_ == 0)
  {
    expired_ = true;
  }
  else
  {
    when_ += delay_ms_;
  }
}

//! Indicates if the timer has fired (e.g. is due for handling)
//!
//! This method indicates if the timer has fired, i.e. is due for handling by the
//! controlling object.
//!
//! \return boolean value, true if timer has fired

bool IpcReactorTimer::has_fired(void)
{
  return (clock_mono_ms() >= when_);
}

//! Indicates if the timer has expired, i.e. reached its maximum times fired
//!
//! This method indicates if the timer has expired, i.e. reached its maximum
//! number of times fired and is no longer active. It is up to the controlling
//! object to delete the timer as appropriate.
//!
//! \return boolean value, true if the timer has fired
bool IpcReactorTimer::has_expired(void)
{
  return expired_;
}

//! Indicates when (in absolute monotonic time) the timer is due to fire
//!
//! this method indicates when a timer is next due to fire, in absolute
//! monotonic time in milliseconds
//!
//! \return TimeMs value indicating when the timer is next due to fire

TimeMs IpcReactorTimer::when(void)
{
  return when_;
}

//! Returns the current monotonic clock time in milliseconds (static method)
//!
//! This static method returns the current monotonic system clock time to
//! millisecond precision. The monotonic clock is used rather than the local
//! real time, since the latter can be reset by clock operations on the host
//! system, which would disrupt the periodic operation of any timers
//!
//! \return TimeMs value of the current monotonic time in milliseconds
TimeMs IpcReactorTimer::clock_mono_ms(void)
{
  struct timespec ts;
  gettime(&ts, true);

  return (TimeMs)((TimeMs) ts.tv_sec * 1000 + (TimeMs) ts.tv_nsec / 1000000);
}

// Initialise static class variable holding last unique timer ID assigned
int IpcReactorTimer::last_timer_id_ = 0;

//! Constructor
//!
//! This constructs an IpcReactor object ready for use

IpcReactor::IpcReactor() :
    terminate_reactor_(false),
    pollitems_(0),
    callbacks_(0),
    pollsize_(0),
    needs_rebuild_(true)
{
}

//! Destructor
//!
//! This destroys an IpcReactor object

IpcReactor::~IpcReactor()
{
  delete[] pollitems_;
  delete[] callbacks_;
}

//! Adds an IPC channel and associated callback to the reactor
//!
//! This method adds an IPC channel and associated callback method with the appropriate
//! signature to the reactor.
//!
//! \param channel IpcChannel to add to reactor
//! \param callback function reference to callback method

void IpcReactor::register_channel(IpcChannel& channel, ReactorCallback callback)
{
  // Add channel to channel map
  channels_[&(channel.socket_)] = callback;

  // Signal a rebuild is required
  needs_rebuild_ = true;
}

//! Removes an IPC chanel from the reactor
//!
//! This method removes an IPC channel and its associated callback method from the
//! reactor.
//!
//! \param channel IpcChannel to remove from the reactor

void IpcReactor::remove_channel(IpcChannel& channel)
{
  // Erase the channel from the map
  channels_.erase(&(channel.socket_));

  // Signal a rebuild is required
  needs_rebuild_ = true;
}

void IpcReactor::register_socket(int socket_fd, ReactorCallback callback)
{
  sockets_[socket_fd] = callback;

  needs_rebuild_ = true;
}

void IpcReactor::remove_socket(int socket_fd)
{
  sockets_.erase(socket_fd);

  needs_rebuild_ = true;
}

//! Adds a timer to the reactor
//!
//! This method adds a timer to the reactor. The periodic delay of the timer, the
//! number of times it should fire and the callback method to be called on firing
//! are specified.
//!
//! \param delay_ms periodic timer delay in milliseconds
//! \param times number of times the timer should fire (0=indefinite)
//! \param callback function reference to callback method
//! \return integer unique timer ID, which can be used by the caller to delete it subsequently

int IpcReactor::register_timer(size_t delay_ms, size_t times, TimerCallback callback)
{
  // Take lock while modifying timers_
  boost::lock_guard<boost::mutex> lock(mutex_);

  // Create a smart pointer to a new timer object
  boost::shared_ptr<IpcReactorTimer> timer(new IpcReactorTimer(delay_ms, times, callback));

  // Add the timer to the timer map
  timers_[timer->get_id()] = timer;

  // Return the unique ID
  return timer->get_id();
}

//! Removes a timer from the reactor
//!
//! This method removes a timer from the reactor, based on the timer ID
//
//! \param timer_id integer unique timer ID that was returned by the add_timer() method
void IpcReactor::remove_timer(int timer_id)
{
  // Take lock while modifying timers_
  boost::lock_guard<boost::mutex> lock(mutex_);
  timers_.erase(timer_id);
}

//! Runs the reactor polling loop
//!
//! This method runs the reactor polling loop, handling any callbacks to
//! registered channels and timers. The loop runs indefinitely until an
//! error occurs or the stop() method is called. The loop uses a tickless
//! timeout based on the currently registered timers.
//!
//! \return integer return code, 0 = OK, -1 = error

int IpcReactor::run(void)
{
  int rc = 0;
  boost::unique_lock<boost::mutex> lock(mutex_, boost::defer_lock);

  // Loop until the terminate flag is set
  while (!terminate_reactor_)
  {
    // If the poll items list needs rebuilding, do it now
    if (needs_rebuild_)
    {
      rebuild_pollitems();
    }

    // If there are no channels to poll and no timers currently active, break out of the
    // reactor loop cleanly
    if ((pollsize_ == 0) && (timers_.size() == 0))
    {
      rc = 0;
      break;
    }

    try
    {

      // Poll the registered channels, using the tickless timeout based
      // on the next pending timer
      int pollrc = zmq::poll(pollitems_, pollsize_, calculate_timeout());

      if (pollrc > 0)
      {
        // If there were any channels ready to read, execute their callbacks
        for (size_t item = 0; item < pollsize_; ++item)
        {
          // TODO handle error flag on pollitems
          if (pollitems_[item].revents & ZMQ_POLLIN)
          {
            callbacks_[item]();
          }
        }
      }
      else if (pollrc == 0)
      {
        // Poll timed out, do nothing as we handle timers firing unconditionally below
      }
      else
      {
        // An error occurred, terminate the reactor loop
        rc = -1;
        terminate_reactor_ = true;
      }

      // Take lock while accessing timers_
      lock.lock();
      // Handle any timers that have now fired, calling their callbacks. Erase any timers
      // that have now expired
      TimerMap::iterator it = timers_.begin();
      while (it != timers_.end())
      {
        if ((it->second)->has_fired())
        {
          (it->second)->do_callback();
        }
        if ((it->second)->has_expired())
        {
          timers_.erase(it++);
        }
        else
        {
          ++it;
        }
      }
      lock.unlock();
    }
    catch ( zmq::error_t& e)
    {
      // If the exception was thrown with errno EINTR, i.e. interrupted system call, this is
      // because we have installed a custom signal handler, so terminate the reactor gracefully
      if (e.num() == EINTR) {
        rc = -1;
        terminate_reactor_ = true;
      }
        // Otherwise propogate the exception upwards
      else {
        std::stringstream ss;
        ss << "IpcReactor error while polling: " << e.what();
        throw IpcReactorException(ss.str());
      }
    }
  }

  return rc;
}

//! Signals that the reactor polling loop should stop gracefully
//!
//! This method is used to signal that the reactor polling loop should stop gracefully.

void IpcReactor::stop(void)
{
  terminate_reactor_ = true;
}

//! Rebuilds the internal list of polling item
//!
//! This private method rebuilds the internal list of items to poll in the reactor
//! loop. This is done when the rebuild_flag is set, i.e. after adding or
//! deleting channels.

void IpcReactor::rebuild_pollitems(void)
{

  // If the existing pollitems array is valid, delete it
  if (pollitems_) {
    delete[] pollitems_;
    pollitems_ = 0;
  }

  // If the existing callback array is valid, delete it
  if (callbacks_) {
    delete[] callbacks_;
    callbacks_= 0;
  }

  // Set the number of items to poll to the number of channels registered
  pollsize_ = channels_.size() + sockets_.size();

  // If the numer of items to poll is non-zero, craete new pollitems and callback
  // arrays and populate them

  if (pollsize_) {

    pollitems_ = new zmq::pollitem_t[pollsize_];
    callbacks_ = new ReactorCallback[pollsize_];

    // Iterate over the channel map and build the pollitems and callback arrays. These have
    // a one-to-one correspondence, allowing the reactor loop to easy associate an active
    // channel socket with the appropriate callback
    unsigned int item = 0;
    for (ChannelMap::iterator it = channels_.begin(); it != channels_.end(); ++item, ++it)
    {
      zmq::pollitem_t pollitem = {*(it->first), 0, ZMQ_POLLIN, 0};
      pollitems_[item] = pollitem;
      callbacks_[item] = it->second;
    }

    for (SocketMap::iterator it = sockets_.begin(); it != sockets_.end(); ++item, ++it)
    {
      zmq::pollitem_t pollitem = {0, it->first, ZMQ_POLLIN, 0};
      pollitems_[item] = pollitem;
      callbacks_[item] = it->second;
    }
  }
}

//! Calculates the next poll timeout based on the tickless pattern
//!
//! This private method calculates the next reactor poll timeout based on the
//! 'tickless' idiom in the CZMQ zloop implementation. The timeout is set
//! to match the next timer due to fire.
//!
//! \return long timeout value in milliseconds

long IpcReactor::calculate_timeout(void)
{
  // Take lock while accessing timers_
  boost::lock_guard<boost::mutex> lock(mutex_);

  // Calculate shortest timeout up to one hour (!!), looping through
  // current timers to see which fires first
  TimeMs tickless = IpcReactorTimer::clock_mono_ms() + (1000 * 3600);
  for (TimerMap::iterator it = timers_.begin(); it != timers_.end(); ++it)
  {
    if (tickless > (it->second)->when())
    {
      tickless = (it->second)->when();
    }
  }

  // Calculate current timeout based on that, set to zero (don't wait) if
  // there is no timers pending
  long timeout = (long)(tickless - IpcReactorTimer::clock_mono_ms());
  if (timeout < 0)
  {
    timeout = 0;
  }

  return timeout;
}