server/jt_server_message_handler.c
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <stdbool.h>
#include <errno.h>
#include <limits.h>
#include <pthread.h>
#include <inttypes.h>
#include <string.h>
#include <stdio.h>
#include <math.h>
#include <assert.h>
#include <syslog.h>
#include <jansson.h>
#include "jittertrap.h"
#include "jt_server_message_handler.h"
#include "iface_stats.h"
#include "sampling_thread.h"
#include "compute_thread.h"
#include "tt_thread.h"
#include "netem.h"
#include "mq_msg_stats.h"
#include "mq_msg_ws.h"
#include "mq_msg_tt.h"
#include "jt_message_types.h"
#include "jt_messages.h"
#define QUOTE(str) #str
#define EXPAND_AND_QUOTE(str) QUOTE(str)
enum {
JT_STATE_STOPPING,
JT_STATE_STOPPED,
JT_STATE_STARTING,
JT_STATE_RUNNING,
JT_STATE_PAUSED
};
int g_jt_state = JT_STATE_STARTING;
char g_selected_iface[MAX_IFACE_LEN];
unsigned long stats_consumer_id;
unsigned long tt_consumer_id;
static int set_netem(void *data)
{
struct jt_msg_netem_params *p1 = data;
struct netem_params p2 = {
.delay = p1->delay, .jitter = p1->jitter, .loss = p1->loss,
};
netem_set_params(p1->iface, &p2);
jt_srv_send_netem_params();
return 0;
}
static int select_iface(void *data)
{
char(*iface)[MAX_IFACE_LEN] = data;
if (!is_iface_allowed(*iface)) {
syslog(LOG_WARNING,
"ignoring request to switch to iface: [%s] - "
"iface not in allowed list: [%s]\n",
*iface, EXPAND_AND_QUOTE(ALLOWED_IFACES));
return -1;
}
snprintf(g_selected_iface, MAX_IFACE_LEN, "%s", *iface);
syslog(LOG_INFO, "switching to iface: [%s]\n", *iface);
sample_iface(*iface);
tt_thread_restart(*iface);
jt_srv_send_select_iface();
jt_srv_send_netem_params();
jt_srv_send_sample_period();
return 0;
}
static void get_first_iface(char *iface)
{
char **ifaces = netem_list_ifaces();
char **i = ifaces;
assert(NULL != i);
if (NULL == *i) {
syslog(LOG_WARNING, "No interfaces available. "
"Allowed interfaces (compile-time): %s\n",
EXPAND_AND_QUOTE(ALLOWED_IFACES));
}
snprintf(iface, MAX_IFACE_LEN, "%s", *i);
while (*i) {
free(*i);
i++;
}
free(ifaces);
}
/* FIXME: this is fugly. */
static void mq_stats_msg_to_jt_msg_stats(struct mq_stats_msg *mq_s,
struct jt_msg_stats *msg_s)
{
snprintf(msg_s->iface, MAX_IFACE_LEN, "%s", mq_s->iface);
msg_s->mean_rx_bytes = mq_s->mean_rx_bytes;
msg_s->mean_tx_bytes = mq_s->mean_tx_bytes;
msg_s->mean_rx_packets = mq_s->mean_rx_packets;
msg_s->mean_tx_packets = mq_s->mean_tx_packets;
msg_s->mean_whoosh = mq_s->mean_whoosh;
msg_s->max_whoosh = mq_s->max_whoosh;
msg_s->sd_whoosh = mq_s->sd_whoosh;
msg_s->min_rx_packet_gap = mq_s->min_rx_packet_gap;
msg_s->max_rx_packet_gap = mq_s->max_rx_packet_gap;
msg_s->mean_rx_packet_gap = mq_s->mean_rx_packet_gap;
msg_s->min_tx_packet_gap = mq_s->min_tx_packet_gap;
msg_s->max_tx_packet_gap = mq_s->max_tx_packet_gap;
msg_s->mean_tx_packet_gap = mq_s->mean_tx_packet_gap;
msg_s->interval_ns = mq_s->interval_ns;
}
inline static int message_producer(struct mq_ws_msg *m, void *data)
{
char *s = (char *)data;
snprintf(m->m, MAX_JSON_MSG_LEN, "%s", s);
return 0;
}
int jt_srv_send(int msg_type, void *msg_data)
{
char *tmpstr;
int cb_err, err = 0;
/* convert from jt_msg_* to string */
err = jt_messages[msg_type].to_json_string(msg_data, &tmpstr);
if (err) {
return -1;
}
/* write the json string to a websocket message */
err = mq_ws_produce(message_producer, tmpstr, &cb_err);
free(tmpstr);
return err;
}
int jt_srv_send_netem_params(void)
{
struct netem_params p;
struct jt_msg_netem_params *m =
malloc(sizeof(struct jt_msg_netem_params));
assert(m);
memcpy(p.iface, g_selected_iface, MAX_IFACE_LEN);
if (0 != netem_get_params(p.iface, &p)) {
/* There need not be a netem qdisc on the interface */
p.delay = -1;
p.jitter = -1;
p.loss = -1;
}
m->delay = p.delay;
m->jitter = p.jitter;
m->loss = p.loss;
snprintf(m->iface, MAX_IFACE_LEN, "%s", p.iface);
int err = jt_srv_send(JT_MSG_NETEM_PARAMS_V1, m);
free(m);
return err;
}
int jt_srv_send_select_iface(void)
{
char iface[MAX_IFACE_LEN];
memcpy(&iface, g_selected_iface, MAX_IFACE_LEN);
return jt_srv_send(JT_MSG_SELECT_IFACE_V1, &iface);
}
int jt_srv_send_iface_list(void)
{
struct jt_iface_list *il;
char **iface;
int idx;
char **ifaces = netem_list_ifaces();
il = malloc(sizeof(struct jt_iface_list));
assert(il);
il->count = 0;
iface = ifaces;
assert(NULL != iface);
if (NULL == *iface) {
fprintf(stderr, "No interfaces available. "
"Allowed interfaces (compile-time): %s\n",
EXPAND_AND_QUOTE(ALLOWED_IFACES));
free(ifaces);
free(il);
return -1;
} else {
char buff[128] = {0};
char *offset = buff;
int blen = sizeof(buff);
do {
snprintf(offset, blen, "%s ", *iface);
blen -= strlen(offset);
offset += strlen(offset);
(il->count)++;
iface++;
} while (*iface);
syslog(LOG_INFO, "available ifaces: %s", buff);
}
il->ifaces = malloc(il->count * MAX_IFACE_LEN);
assert(il->ifaces);
for (iface = ifaces, idx = 0; NULL != *iface && idx < il->count;
idx++) {
strncpy(il->ifaces[idx], *iface, MAX_IFACE_LEN);
free(*iface);
iface++;
}
free(ifaces);
int err = jt_srv_send(JT_MSG_IFACE_LIST_V1, il);
jt_messages[JT_MSG_IFACE_LIST_V1].free(il);
return err;
}
int jt_srv_send_sample_period(void)
{
int sp;
sp = get_sample_period();
return jt_srv_send(JT_MSG_SAMPLE_PERIOD_V1, &sp);
}
static int stats_consumer(struct mq_stats_msg *m, void *data)
{
struct jt_msg_stats *s = (struct jt_msg_stats *)data;
mq_stats_msg_to_jt_msg_stats(m, s);
if (0 == jt_srv_send(JT_MSG_STATS_V1, s)) {
return 0;
}
return 1;
}
int jt_srv_send_stats(void)
{
struct jt_msg_stats *msg_stats;
int err, cb_err;
do {
/* convert from struct iface_stats to struct jt_msg_stats */
msg_stats = malloc(sizeof(struct jt_msg_stats));
assert(msg_stats);
err = mq_stats_consume(stats_consumer_id, stats_consumer,
msg_stats, &cb_err);
/* cleanup */
jt_messages[JT_MSG_STATS_V1].free(msg_stats);
} while (JT_WS_MQ_OK == err);
return 0;
}
static int tt_consumer(struct mq_tt_msg *m, void *data)
{
(void)data;
//jt_messages[JT_MSG_TOPTALK_V1].print(m);
if (0 == jt_srv_send(JT_MSG_TOPTALK_V1, (struct jt_msg_toptalk *)m)) {
return 0;
}
return 1;
}
int jt_srv_send_tt(void)
{
int ret, cb_err;
do {
ret = mq_tt_consume(tt_consumer_id, tt_consumer, NULL, &cb_err);
} while (JT_WS_MQ_OK == ret);
return 0;
}
static int jt_init(void)
{
int err;
char *iface;
if (netem_init() < 0) {
fprintf(stderr,
"Couldn't initialise netlink for netem module.\n");
return -1;
}
err = mq_ws_init("ws");
if (err) {
return -1;
}
iface = malloc(MAX_IFACE_LEN);
get_first_iface(iface);
select_iface(iface);
free(iface);
mq_tt_init("tt");
mq_stats_init("stats");
compute_thread_init();
intervals_thread_init();
err = mq_stats_consumer_subscribe(&stats_consumer_id);
assert(!err);
err = mq_tt_consumer_subscribe(&tt_consumer_id);
assert(!err);
g_jt_state = JT_STATE_RUNNING;
return 0;
}
int jt_srv_pause(void)
{
int err;
assert(JT_STATE_RUNNING == g_jt_state);
err = mq_stats_consumer_unsubscribe(stats_consumer_id);
assert(!err);
err = mq_tt_consumer_unsubscribe(tt_consumer_id);
assert(!err);
g_jt_state = JT_STATE_PAUSED;
return 0;
}
int jt_srv_resume(void)
{
int err;
if (JT_STATE_PAUSED == g_jt_state) {
err = mq_stats_consumer_subscribe(&stats_consumer_id);
assert(!err);
err = mq_tt_consumer_subscribe(&tt_consumer_id);
assert(!err);
g_jt_state = JT_STATE_RUNNING;
}
assert(JT_STATE_RUNNING == g_jt_state);
return 0;
}
int jt_server_tick(void)
{
switch (g_jt_state) {
case JT_STATE_STARTING:
jt_init();
jt_srv_send_iface_list();
jt_srv_send_select_iface();
jt_srv_send_netem_params();
jt_srv_send_sample_period();
break;
case JT_STATE_RUNNING:
/* queue a stats msg (if there is one) */
jt_srv_send_stats();
jt_srv_send_tt();
break;
case JT_STATE_PAUSED:
break;
}
return 0;
}
static int jt_msg_handler(char *in_unsafe, int len, const int *msg_type_arr)
{
json_t *root;
json_error_t error;
void *data;
const int *msg_type;
char in_safe[1024];
if (len <= 0) {
syslog(LOG_ERR, "error: message cannot have negative length");
return -1;
}
/* in_unsafe is not null-terminated and len doesn't include \0 */
if ((long unsigned int)len >= sizeof(in_safe)) {
snprintf(in_safe, sizeof(in_safe), "%s", in_unsafe);
syslog(LOG_DEBUG, "invalid message truncated and ignored: %s\n",
in_safe);
return -1;
}
snprintf(in_safe, len+1, "%s", in_unsafe);
root = json_loadb(in_safe, len, 0, &error);
if (!root) {
syslog(LOG_ERR, "error: %s loading message:%s\n",
error.text, in_safe);
return -1;
}
// iterate over array of msg types using pointer arithmetic.
for (msg_type = msg_type_arr; *msg_type != JT_MSG_END; msg_type++) {
char printable[1024];
int err;
// check if the message type matches.
err = jt_msg_match_type(root, *msg_type);
if (err) {
// type doesn't match, try the next.
continue;
}
// type matches, try to unpack it.
err = jt_messages[*msg_type].to_struct(root, &data);
json_decref(root);
if (err) {
// type matched, but unpack failed.
syslog(LOG_ERR, "[%s] type match, unpack failed.\n",
jt_messages[*msg_type].key);
break;
}
jt_messages[*msg_type].print(data, printable, sizeof(printable));
syslog(LOG_DEBUG, "MSG received: %s", printable);
switch (*msg_type) {
case JT_MSG_SELECT_IFACE_V1:
err = select_iface(data);
break;
case JT_MSG_SET_NETEM_V1:
err = set_netem(data);
break;
case JT_MSG_HELLO_V1:
syslog(LOG_INFO, "new session");
break;
default:
/* no way to get here, right? */
assert(0);
}
jt_messages[*msg_type].free(data);
return err;
}
syslog(LOG_WARNING, "message received and ignored: %s\n", in_safe);
json_decref(root);
return -1;
}
/* handle messages received from client in server */
int jt_server_msg_receive(char *in, int len)
{
return jt_msg_handler(in, len, &jt_msg_types_c2s[0]);
}