messages/src/jt_msg_toptalk.c
#define _GNU_SOURCE
#include <net/ethernet.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <stdint.h>
#include <string.h>
#include <assert.h>
#include <jansson.h>
#include <inttypes.h>
#include "jt_message_types.h"
#include "jt_messages.h"
#include "jt_msg_toptalk.h"
static const char *tt_test_msg =
"{\"msg\":\"toptalk\","
" \"p\":{\"tflows\":5, \"tbytes\": 9999, \"tpackets\": 888,"
" \"interval_ns\": 123,"
" \"timestamp\": {\"tv_sec\": 123, \"tv_nsec\": 456},"
" \"flows\": ["
"{\"src\":\"192.168.0.1\", \"dst\": \"192.168.0.2\", \"sport\":32000, \"dport\":32000, \"proto\": \"udp\", \"bytes\":100, \"packets\":10, \"tclass\":\"af11\" },"
"{\"src\":\"192.168.0.1\", \"dst\": \"192.168.0.2\", \"sport\":32001, \"dport\":32001, \"proto\": \"udp\", \"bytes\":100, \"packets\":10, \"tclass\":\"BE\"},"
"{\"src\":\"192.168.0.1\", \"dst\": \"192.168.0.2\", \"sport\":32002, \"dport\":32002, \"proto\": \"udp\", \"tclass\":\"EF\", \"bytes\":100, \"packets\":10},"
"{\"src\":\"192.168.0.1\", \"dst\": \"192.168.0.2\", \"sport\":32003, \"dport\":32003, \"tclass\":\"cs1\", \"proto\": \"udp\", \"bytes\":100, \"packets\":10},"
"{\"src\":\"192.168.0.1\", \"dst\": \"192.168.0.2\", \"sport\":32004, \"dport\":32004, \"proto\": \"udp\", \"bytes\":100, \"packets\":10, \"tclass\":\"AF41\"}"
"]}}";
const char* jt_toptalk_test_msg_get(void) { return tt_test_msg; }
int jt_toptalk_printer(void *data, char *out, int len)
{
struct jt_msg_toptalk *t = (struct jt_msg_toptalk*)data;
snprintf(out, len,
"t:%ld.%09ld fc:%"PRId32", b: %"PRId64", p:%"PRId64"",
t->timestamp.tv_sec, t->timestamp.tv_nsec, t->tflows, t->tbytes,
t->tpackets);
return 0;
}
int jt_toptalk_unpacker(json_t *root, void **data)
{
json_t *params;
json_t *t, *flows, *timestamp;
struct jt_msg_toptalk *tt;
params = json_object_get(root, "p");
assert(params);
assert(JSON_OBJECT == json_typeof(params));
assert(0 < json_object_size(params));
tt = malloc(sizeof(struct jt_msg_toptalk));
t = json_object_get(params, "tflows");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->tflows = json_integer_value(t);
t = json_object_get(params, "tbytes");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->tbytes = json_integer_value(t);
t = json_object_get(params, "tpackets");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->tpackets = json_integer_value(t);
t = json_object_get(params, "interval_ns");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->interval_ns = json_integer_value(t);
timestamp = json_object_get(params, "timestamp");
if ((JSON_OBJECT != json_typeof(timestamp))
|| (0 == json_object_size(timestamp)))
{
goto unpack_fail;
}
t = json_object_get(timestamp, "tv_sec");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->timestamp.tv_sec = json_integer_value(t);
t = json_object_get(timestamp, "tv_nsec");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->timestamp.tv_nsec = json_integer_value(t);
flows = json_object_get(params, "flows");
if (!json_is_array(flows)) {
goto unpack_fail;
}
int l = json_array_size(flows);
/* tt->tflows may be more than the number of flows we send! */
int i;
for (i = 0; i < l; i++) {
json_t *f = json_array_get(flows, i);
t = json_object_get(f, "bytes");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->flows[i].bytes = json_integer_value(t);
t = json_object_get(f, "packets");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->flows[i].packets = json_integer_value(t);
t = json_object_get(f, "sport");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->flows[i].sport = json_integer_value(t);
t = json_object_get(f, "dport");
if (!json_is_integer(t)) {
goto unpack_fail;
}
tt->flows[i].dport = json_integer_value(t);
t = json_object_get(f, "src");
if (!json_is_string(t)) {
goto unpack_fail;
}
snprintf(tt->flows[i].src, ADDR_LEN, "%s",
json_string_value(t));
t = json_object_get(f, "dst");
if (!json_is_string(t)) {
goto unpack_fail;
}
snprintf(tt->flows[i].dst, ADDR_LEN, "%s",
json_string_value(t));
t = json_object_get(f, "proto");
if (!json_is_string(t)) {
goto unpack_fail;
}
snprintf(tt->flows[i].proto, PROTO_LEN, "%s",
json_string_value(t));
t = json_object_get(f, "tclass");
if (!json_is_string(t)) {
goto unpack_fail;
}
snprintf(tt->flows[i].tclass, TCLASS_LEN, "%s",
json_string_value(t));
}
*data = tt;
json_object_clear(params);
return 0;
unpack_fail:
free(tt);
return -1;
}
int jt_toptalk_packer(void *data, char **out)
{
struct jt_msg_toptalk *tt_msg = data;
json_t *t = json_object();
json_t *timestamp = json_object();
json_t *params = json_object();
json_t *flows_arr = json_array();
json_t *flows[MAX_FLOWS];
assert(tt_msg);
json_object_set_new(params, "tflows", json_integer(tt_msg->tflows));
json_object_set_new(params, "tbytes", json_integer(tt_msg->tbytes));
json_object_set_new(params, "tpackets", json_integer(tt_msg->tpackets));
json_object_set_new(params, "interval_ns",
json_integer(tt_msg->interval_ns));
json_object_set(timestamp, "tv_sec", json_integer(tt_msg->timestamp.tv_sec));
json_object_set(timestamp, "tv_nsec", json_integer(tt_msg->timestamp.tv_nsec));
json_object_set(params, "timestamp", timestamp);
/* tt_msg->tflows is the Total flows recorded, not the number of flows
* listed in the message, so it will be more than MAX_FLOWS...
* So this is wrong >>> assert(tt_msg->tflows <= MAX_FLOWS);
*/
const int stop = (tt_msg->tflows < MAX_FLOWS) ?
tt_msg->tflows : MAX_FLOWS;
for (int i = 0; i < stop; i++) {
flows[i] = json_object();
json_object_set_new(flows[i], "bytes",
json_integer(tt_msg->flows[i].bytes));
json_object_set_new(flows[i], "packets",
json_integer(tt_msg->flows[i].packets));
json_object_set_new(flows[i], "sport",
json_integer(tt_msg->flows[i].sport));
json_object_set_new(flows[i], "dport",
json_integer(tt_msg->flows[i].dport));
json_object_set_new(flows[i], "src",
json_string(tt_msg->flows[i].src));
json_object_set_new(flows[i], "dst",
json_string(tt_msg->flows[i].dst));
json_object_set_new(flows[i], "proto",
json_string(tt_msg->flows[i].proto));
json_object_set_new(flows[i], "tclass",
json_string(tt_msg->flows[i].tclass));
json_array_append(flows_arr, flows[i]);
}
json_object_set_new(t, "msg",
json_string(jt_messages[JT_MSG_TOPTALK_V1].key));
json_object_set(params, "flows", flows_arr);
json_object_set(t, "p", params);
*out = json_dumps(t, 0);
for (int i = 0; i < stop; i++) {
json_decref(flows[i]);
}
json_array_clear(flows_arr);
json_decref(flows_arr);
json_object_clear(params);
json_decref(params);
json_object_clear(timestamp);
json_decref(timestamp);
json_object_clear(t);
json_decref(t);
return 0;
}
int jt_toptalk_free(void *data)
{
struct jt_msg_toptalk *t = (struct jt_msg_toptalk *)data;
free(t);
return 0;
}