cpp/frameProcessor/src/KafkaProducerPlugin.cpp
/*
* KafkaProducerPlugin.cpp
*
* Created on: 25 Mar 2019
* Author: Emilio Perez
*/
#include <cstring>
#include <cstdlib>
#include <boost/algorithm/string.hpp>
#include <climits>
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include "KafkaProducerPlugin.h"
#include "version.h"
namespace FrameProcessor {
/**
* Callback function used to report status of a delivered message.
*
* This function is only used internally.
*
* \param[in] kafka_producer- Pointer to a Kafka producer handler.
* \param[in] kafka_message - Pointer to the message structure of the message being reported
* \param[in] opaque - Opaque pointer (not used)
*/
static void kafka_message_callback(rd_kafka_t *kafka_producer,
const rd_kafka_message_t *kafka_message,
void *opaque)
{
KafkaProducerPlugin *kafka_producer_plugin = static_cast<KafkaProducerPlugin *>(kafka_message->_private);
if (kafka_message->err) {
kafka_producer_plugin->on_message_error(
rd_kafka_err2str(kafka_message->err));
} else {
// count message as acknowledged
kafka_producer_plugin->on_message_ack();
}
}
const std::string KafkaProducerPlugin::CONFIG_SERVERS = "servers";
const std::string KafkaProducerPlugin::CONFIG_TOPIC = "topic";
const std::string KafkaProducerPlugin::CONFIG_PARTITION = "partition";
const std::string KafkaProducerPlugin::CONFIG_DATASET = "dataset";
const std::string KafkaProducerPlugin::CONFIG_INCLUDE_PARAMETERS = "include_parameters";
/**
* The constructor sets up logging used within the class.
*/
KafkaProducerPlugin::KafkaProducerPlugin()
: dataset_name_(KAFKA_DEFAULT_DATASET), topic_name_(KAFKA_DEFAULT_TOPIC),
kafka_producer_(NULL), kafka_topic_(NULL),
partition_(RD_KAFKA_PARTITION_UA),
include_parameters_(true)
{
// Setup logging for the class
logger_ = Logger::getLogger("FP.KafkaProducer");
LOG4CXX_TRACE(logger_, "KafkaProducer constructor.");
this->reset_statistics();
}
/**
* The destructor cleans up Kafka handlers.
*/
KafkaProducerPlugin::~KafkaProducerPlugin()
{
LOG4CXX_TRACE(logger_, "KafkaProducer destructor.");
destroy_kafka();
}
/**
* Set configuration options for this Plugin.
*
* This sets up the Kafka Producer Plugin according to the configuration IpcMessage
* objects that are received.
*
* \param[in] config - IpcMessage containing configuration data.
* \param[out] reply - Response IpcMessage.
*/
void KafkaProducerPlugin::configure(OdinData::IpcMessage &config,
OdinData::IpcMessage &reply)
{
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
if (config.has_param(CONFIG_SERVERS)) {
destroy_kafka();
configure_kafka_servers(config.get_param<std::string>(CONFIG_SERVERS));
configure_kafka_topic(this->topic_name_);
}
if (config.has_param(CONFIG_TOPIC)) {
configure_kafka_topic(config.get_param<std::string>(CONFIG_TOPIC));
}
if (config.has_param(CONFIG_PARTITION)) {
configure_partition(config.get_param<uint32_t>(CONFIG_PARTITION));
}
if (config.has_param(CONFIG_DATASET)) {
configure_dataset(config.get_param<std::string>(CONFIG_DATASET));
}
if (config.has_param(CONFIG_INCLUDE_PARAMETERS)) {
this->include_parameters_ = config.get_param<bool>(CONFIG_INCLUDE_PARAMETERS);
}
}
/**
* Get the configuration values for this Plugin.
*
* \param[out] reply - Response IpcMessage.
*/
void KafkaProducerPlugin::requestConfiguration(OdinData::IpcMessage &reply)
{
reply.set_param(get_name() + "/" + KafkaProducerPlugin::CONFIG_SERVERS,
this->servers_);
reply.set_param(get_name() + "/" + KafkaProducerPlugin::CONFIG_TOPIC,
this->topic_name_);
reply.set_param(get_name() + "/" + KafkaProducerPlugin::CONFIG_PARTITION,
this->partition_);
reply.set_param(get_name() + "/" + KafkaProducerPlugin::CONFIG_DATASET,
this->dataset_name_);
reply.set_param(get_name() + "/" + KafkaProducerPlugin::CONFIG_INCLUDE_PARAMETERS,
this->include_parameters_);
}
/**
* Collate status information for the plugin. The status is added to the status IpcMessage object.
*
* \param[out] status - Reference to an IpcMessage value to store the status.
*/
void KafkaProducerPlugin::status(OdinData::IpcMessage &status)
{
// Make sure statistics are updated
poll_delivery_message_report_queue();
/* Number of sent frames */
status.set_param(get_name() + "/" + "sent", frames_sent_);
/* Number of lost frames */
status.set_param(get_name() + "/" + "lost", frames_lost_);
/* Number of acknowledged frames */
status.set_param(get_name() + "/" + "ack", frames_ack_);
}
/**
* Clear frame statistics
*/
bool KafkaProducerPlugin::reset_statistics()
{
this->frames_sent_ = 0;
this->frames_lost_ = 0;
this->frames_ack_ = 0;
return true;
}
/**
* Destroy Kafka handlers for the connection and the topic
*/
void KafkaProducerPlugin::destroy_kafka()
{
if (kafka_topic_ != NULL) {
rd_kafka_flush(kafka_producer_, KAFKA_LINGER_MS);
rd_kafka_topic_destroy(kafka_topic_);
kafka_topic_ = NULL;
}
if (kafka_producer_ != NULL) {
rd_kafka_destroy(kafka_producer_);
kafka_producer_ = NULL;
}
}
/**
* Poll the delivery message report queue, calling the
* callback function if appropriate.
*/
void KafkaProducerPlugin::poll_delivery_message_report_queue()
{
if (kafka_producer_ != NULL) {
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
rd_kafka_poll(kafka_producer_, 0);
}
}
/**
* Configure Kafka connection handler for the server/s specified.
*
* \param[in] servers - string representing Kafka brokers using format: IP:PORT[,IP2:PORT2,...]
*/
void KafkaProducerPlugin::configure_kafka_servers(std::string servers)
{
rd_kafka_t *kafka_producer;
rd_kafka_conf_t *kafka_config;
char errBuf[KAFKA_ERROR_BUFFER_LEN];
kafka_config = rd_kafka_conf_new();
int status;
status = rd_kafka_conf_set(kafka_config,
"message.max.bytes",
KAFKA_MESSAGE_MAX_BYTES,
errBuf,
sizeof(errBuf));
if (status != RD_KAFKA_CONF_OK) {
LOG4CXX_ERROR(logger_, "Kafka configuration error while setting max message size: "
<< errBuf);
return;
}
status = rd_kafka_conf_set(kafka_config,
"bootstrap.servers",
servers.c_str(),
errBuf,
sizeof(errBuf));
if (status != RD_KAFKA_CONF_OK) {
LOG4CXX_ERROR(logger_, "Kafka configuration error while setting botstrap servers"
<< errBuf);
return;
}
// Configure callback to count ACKed messages
rd_kafka_conf_set_dr_msg_cb(kafka_config, kafka_message_callback);
// kafkaProducer will free kafka_config when destroyed
kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, kafka_config, errBuf,
sizeof(errBuf));
if (!kafka_producer) {
LOG4CXX_ERROR(logger_, "Kafka handler error: " << errBuf);
return;
}
this->kafka_producer_ = kafka_producer;
this->servers_ = servers;
LOG4CXX_TRACE(logger_, "Configured kafka servers: " << servers);
}
/**
* Configure Kafka topic handler for the topic specified
*
* \param[in] topic_name - string representing the topic name.
*/
void KafkaProducerPlugin::configure_kafka_topic(std::string topic_name)
{
if (this->kafka_producer_ == NULL) {
LOG4CXX_WARN(logger_, "Broker is not configured");
this->kafka_topic_ = NULL;
return;
}
if (this->kafka_topic_ != NULL) {
rd_kafka_topic_destroy(kafka_topic_);
}
this->kafka_topic_ = rd_kafka_topic_new(kafka_producer_,
topic_name.c_str(),
NULL);
if (!this->kafka_topic_) {
LOG4CXX_ERROR(logger_, "Kafka topic error");
}
this->topic_name_ = topic_name;
LOG4CXX_TRACE(logger_, "Configured kafka topic: " << topic_name);
}
/**
* Configure the dataset that will be published
*/
void KafkaProducerPlugin::configure_dataset(std::string dataset)
{
this->dataset_name_ = dataset;
LOG4CXX_TRACE(logger_, "Configured dataset " << this->dataset_name_);
}
/**
* Set Kafka partition to send messages to
*
* If it is not configured, it defaults to automatic partitioning (using
* the topic's partitioner function)
*
* \param[in] partition - partition number.
*/
void KafkaProducerPlugin::configure_partition(int32_t partition)
{
this->partition_ = partition;
}
/**
* Create a message with the following structure:
*
* [ json header length (2 bytes) ] + [ json header ] + [ frame data ]
*
* \param[in] frame - Pointer to a Frame object.
* \param[out] nbytes - Reference to the message size in bytes.
*/
void *KafkaProducerPlugin::create_message(boost::shared_ptr<Frame> frame,
size_t &nbytes)
{
// creates header information
rapidjson::StringBuffer string_buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(string_buffer);
writer.StartObject();
writer.String(MSG_HEADER_FRAME_SIZE_KEY);
writer.Uint64(frame->get_data_size());
writer.String(MSG_HEADER_DATA_TYPE_KEY);
writer.Int(frame->get_meta_data().get_data_type());
writer.String(MSG_HEADER_FRAME_NUMBER_KEY);
writer.Uint64(frame->get_frame_number());
writer.String(MSG_HEADER_ACQUISITION_ID_KEY);
writer.String(frame->get_meta_data().get_acquisition_ID().c_str());
writer.String(MSG_HEADER_COMPRESSION_KEY);
writer.Uint(frame->get_meta_data().get_compression_type());
writer.String(MSG_HEADER_FRAME_OFFSET_KEY);
writer.Uint64(frame->get_meta_data().get_frame_offset());
writer.String(MSG_HEADER_FRAME_DIMENSIONS_KEY);
writer.StartArray();
dimensions_t dims = frame->get_meta_data().get_dimensions();
for (dimensions_t::iterator it = dims.begin(); it != dims.end(); it++) {
writer.Uint64(*it);
}
writer.EndArray();
if (this->include_parameters_) {
const std::map<std::string, boost::any> ¶meters = frame->get_meta_data().get_parameters();
writer.String(MSG_HEADER_FRAME_PARAMETERS_KEY);
writer.StartObject();
for (std::map<std::string, boost::any>::const_iterator it = parameters.begin(); it != parameters.end(); it++) {
writer.String(it->first.c_str());
const std::type_info &ti = it->second.type();
if (it->second.type() == typeid(unsigned long)) {
writer.Uint64(frame->get_meta_data().get_parameter<unsigned long>(it->first));
}
else if (it->second.type() == typeid(float)) {
writer.Double(frame->get_meta_data().get_parameter<float>(it->first));
} else {
writer.Null();
}
}
writer.EndObject();
}
writer.EndObject();
if (string_buffer.GetSize() > USHRT_MAX) {
LOG4CXX_ERROR(logger_, "Header size is too big, it should be less than "
<< USHRT_MAX);
nbytes = 0;
return NULL;
}
size_t message_size = sizeof(uint16_t) + string_buffer.GetSize() + 1
+ frame->get_data_size();
char *msg = (char *) malloc(message_size);
uint16_t header_size = static_cast<uint16_t>(string_buffer.GetSize() + 1);
*(reinterpret_cast<uint16_t *>(msg)) = header_size;
// copy header data, this includes an ending null byte
memcpy(msg + sizeof(uint16_t), string_buffer.GetString(),
header_size);
// copy frame data
memcpy(msg + sizeof(uint16_t) + header_size, frame->get_data_ptr(),
frame->get_data_size());
nbytes = message_size;
return msg;
}
/**
* Create and enqueue a frame message to kafka server/s
*
* \param[in] frame - Pointer to a Frame object.
*/
void KafkaProducerPlugin::enqueue_frame(boost::shared_ptr<Frame> frame)
{
LOG4CXX_TRACE(logger_, "Sending frame to message queue ...");
if (!this->kafka_topic_) {
LOG4CXX_WARN(logger_, "Topic not configured");
return;
}
// This lock avoids configuring/destroying/enqueuing at the same time
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
char *buf;
size_t len;
// This buffer is freed by kafka (when there are no errors)
buf = (char *) create_message(frame, len);
if (!buf) {
return;
}
// enqueue message
int status = rd_kafka_produce(
this->kafka_topic_,
partition_,
/* free buffer when enqueued */
RD_KAFKA_MSG_F_FREE,
/* Data */
buf, len,
/* No key */
NULL, 0,
/* Opaque pointer */
this);
if (status) {
// Dropping frame, probably the queue is full
LOG4CXX_ERROR(logger_, "Error while producing: "
<< rd_kafka_err2str(rd_kafka_last_error()));
free(buf);
frames_lost_++;
} else {
frames_sent_++;
}
rd_kafka_poll(this->kafka_producer_, 0);
}
/**
* It updates stats when a message is acknowledged
*/
void KafkaProducerPlugin::on_message_ack()
{
this->frames_ack_++;
}
/**
* It logs an error when a message delivery has failed
*/
void KafkaProducerPlugin::on_message_error(const char *error)
{
LOG4CXX_ERROR(logger_, "Error while delivering message: " << error);
}
/**
* If dataset configured matches, it sends the frame to kafka server/s
*
* \param[in] frame - Pointer to a Frame object.
*/
void KafkaProducerPlugin::process_frame(boost::shared_ptr<Frame> frame)
{
LOG4CXX_TRACE(logger_, "Received a new frame...");
if (frame->get_meta_data().get_dataset_name() == this->dataset_name_) {
this->enqueue_frame(frame);
}
this->push(frame);
}
int KafkaProducerPlugin::get_version_major()
{
return ODIN_DATA_VERSION_MAJOR;
}
int KafkaProducerPlugin::get_version_minor()
{
return ODIN_DATA_VERSION_MINOR;
}
int KafkaProducerPlugin::get_version_patch()
{
return ODIN_DATA_VERSION_PATCH;
}
std::string KafkaProducerPlugin::get_version_short()
{
return ODIN_DATA_VERSION_STR_SHORT;
}
std::string KafkaProducerPlugin::get_version_long()
{
return ODIN_DATA_VERSION_STR;
}
} /* namespace FrameProcessor */