acooks/jittertrap

View on GitHub
server/tt_thread.c

Summary

Maintainability
Test Coverage
#include <net/ethernet.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <sched.h>
#include <errno.h>
#include <pthread.h>
#include <syslog.h>

#include <jansson.h>

#include "jt_message_types.h"
#include "jt_messages.h"

#include "mq_msg_tt.h"

#include "flow.h"
#include "intervals.h"

#include "tt_thread.h"

struct tt_thread_info ti = {
    0,
    .thread_name = "jt-toptalk",
    .thread_prio = 3
};

struct {
    pthread_t thread_id;
    pthread_attr_t thread_attr;
    const char * const thread_name;
    const int thread_prio;
} iti = {
    0,
    .thread_name = "jt-intervals",
    .thread_prio = 2
};

static char const *const protos[IPPROTO_MAX] = {[IPPROTO_TCP] = "TCP",
                                                [IPPROTO_UDP] = "UDP",
                                                [IPPROTO_ICMP] = "ICMP",
                                                [IPPROTO_ICMPV6] = "ICMP6",
                                                [IPPROTO_IP] = "IP",
                                                [IPPROTO_IGMP] = "IGMP" };

static char const * const dscpvalues[] = {
        [IPTOS_DSCP_AF11] = "AF11",
        [IPTOS_DSCP_AF12] = "AF12",
        [IPTOS_DSCP_AF13] = "AF13",
        [IPTOS_DSCP_AF21] = "AF21",
        [IPTOS_DSCP_AF22] = "AF22",
        [IPTOS_DSCP_AF23] = "AF23",
        [IPTOS_DSCP_AF31] = "AF31",
        [IPTOS_DSCP_AF32] = "AF32",
        [IPTOS_DSCP_AF33] = "AF33",
        [IPTOS_DSCP_AF41] = "AF41",
        [IPTOS_DSCP_AF42] = "AF42",
        [IPTOS_DSCP_AF43] = "AF43",
        [IPTOS_DSCP_EF]   = "EF",
        [IPTOS_CLASS_CS0] = "CS0",
        [IPTOS_CLASS_CS1] = "CS1",
        [IPTOS_CLASS_CS2] = "CS2",
        [IPTOS_CLASS_CS3] = "CS3",
        [IPTOS_CLASS_CS4] = "CS4",
        [IPTOS_CLASS_CS5] = "CS5",
        [IPTOS_CLASS_CS6] = "CS6",
        [IPTOS_CLASS_CS7] = "CS7"
};

int tt_thread_restart(char * iface)
{
    int err;
    void *res;

    if (ti.thread_id) {
        pthread_cancel(ti.thread_id);
        pthread_join(ti.thread_id, &res);
        free(ti.t5);
        free(ti.dev);
    }

    ti.dev = malloc(MAX_IFACE_LEN);
    snprintf(ti.dev, MAX_IFACE_LEN, "%s", iface);

    /* start & run thread for capture and interval processing */
    tt_intervals_init(&ti);

    err = pthread_attr_init(&ti.attr);
    assert(!err);

    err = pthread_create(&ti.thread_id, &ti.attr, tt_intervals_run, &ti);
    assert(!err);
        pthread_setname_np(ti.thread_id, ti.thread_name);

    tt_update_ref_window_size(&ti, tt_intervals[0]);
    tt_update_ref_window_size(&ti, tt_intervals[INTERVAL_COUNT - 1]);

    return 0;
}

/* Convert from a struct tt_top_flows to a struct mq_tt_msg */
static int
m2m(struct tt_top_flows *ttf, struct mq_tt_msg *msg, int interval)
{
    struct jt_msg_toptalk *m = &msg->m;

    m->timestamp.tv_sec = ttf->timestamp.tv_sec;
    m->timestamp.tv_nsec = ttf->timestamp.tv_usec * 1000;

    m->interval_ns = tt_intervals[interval].tv_sec * 1E9
        + tt_intervals[interval].tv_usec * 1E3;

    m->tflows = ttf->flow_count;
    m->tbytes = ttf->total_bytes;
    m->tpackets = ttf->total_packets;

    for (int f = 0; f < MAX_FLOWS; f++) {
        m->flows[f].bytes = ttf->flow[f][interval].bytes;
        m->flows[f].packets = ttf->flow[f][interval].packets;
        m->flows[f].sport = ttf->flow[f][interval].flow.sport;
        m->flows[f].dport = ttf->flow[f][interval].flow.dport;
        snprintf(m->flows[f].proto, PROTO_LEN, "%s",
                protos[ttf->flow[f][interval].flow.proto]);
        snprintf(m->flows[f].src, ADDR_LEN, "%s",
                inet_ntoa(ttf->flow[f][interval].flow.src_ip));
        snprintf(m->flows[f].dst, ADDR_LEN, "%s",
                inet_ntoa(ttf->flow[f][interval].flow.dst_ip));
        snprintf(m->flows[f].tclass, TCLASS_LEN, "%s",
                 dscpvalues[ttf->flow[f][interval].flow.tclass]);
    }
    return 0;
}

inline static int message_producer(struct mq_tt_msg *m, void *data)
{

    memcpy(m, (struct mq_tt_msg *)data, sizeof(struct mq_tt_msg));
    return 0;
}

int queue_tt_msg(int interval)
{
    struct mq_tt_msg msg;
    struct tt_top_flows *t5 = ti.t5;
    int cb_err;

    pthread_mutex_lock(&ti.t5_mutex);
    {
        m2m(t5, &msg, interval);
        mq_tt_produce(message_producer, &msg, &cb_err);
    }
    pthread_mutex_unlock(&ti.t5_mutex);
    return 0;
}


/* TODO: calculate the GCD of tt_intervals
 * updates output var intervals
 * returns GCD nanoseconds*/
static uint32_t calc_intervals(uint32_t intervals[INTERVAL_COUNT])
{
    uint64_t t0_us = tt_intervals[0].tv_sec * 1E6 + tt_intervals[0].tv_usec;

    for (int i = INTERVAL_COUNT - 1; i >= 0; i--) {
        uint64_t t_us = tt_intervals[i].tv_sec * 1E6
                        + tt_intervals[i].tv_usec;
        intervals[i] = t_us / t0_us;

        /* FIXME: for now, t0_us is the GCD of tt_intervals */
        assert(0 == t_us % t0_us);
    }
    return 1E3 * tt_intervals[0].tv_usec + 1E9 * tt_intervals[0].tv_sec;
}

static void set_affinity(void)
{
    int s, j;
    cpu_set_t cpuset;
    pthread_t thread;
    thread = pthread_self();
    CPU_ZERO(&cpuset);
    CPU_SET(RT_CPU, &cpuset);
    s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
    if (s != 0) {
        handle_error_en(s, "pthread_setaffinity_np");
    }

    /* Check the actual affinity mask assigned to the thread */
    s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
    if (s != 0) {
        handle_error_en(s, "pthread_getaffinity_np");
    }

    char buff[64] = {0};
    char *offset = buff;
    int blen = sizeof(buff);
    for (j = 0; j < CPU_SETSIZE; j++) {
        if (CPU_ISSET(j, &cpuset)) {
            snprintf(offset, blen, "CPU%d ", j);
            blen -= strlen(offset);
            offset += strlen(offset);
        }
    }

    syslog(LOG_DEBUG, "[RT thread %s] priority [%d] CPU affinity: %s",
        iti.thread_name, iti.thread_prio, buff);
}

static int init_realtime(void)
{
    struct sched_param schedparm;
    memset(&schedparm, 0, sizeof(schedparm));
    schedparm.sched_priority = iti.thread_prio;
    sched_setscheduler(0, SCHED_FIFO, &schedparm);
    set_affinity();
    return 0;
}

static void *intervals_run(void *data)
{
    (void)data; /* unused */
    struct timespec deadline;

    uint32_t tick = 0;
    /* integer multiple of gcd in interval */
    uint32_t imuls[INTERVAL_COUNT];
    uint32_t sleep_time_ns = calc_intervals(imuls);

    init_realtime();

    clock_gettime(CLOCK_MONOTONIC, &deadline);

    for (;;) {

        for (int i = 0; i < INTERVAL_COUNT; i++) {
            assert(imuls[i]);
            if (0 == (tick % imuls[i])) {
                queue_tt_msg(i);
            }
        }

        /* increment / wrap tick */
        tick = (imuls[INTERVAL_COUNT-1] == tick) ? 1 : tick + 1;

        deadline.tv_nsec += sleep_time_ns;

        /* Second boundary */
        if (deadline.tv_nsec >= 1E9) {
            deadline.tv_nsec -= 1E9;
            deadline.tv_sec++;
        }

        clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &deadline,
                        NULL);
    }
    return NULL;
}

int intervals_thread_init(void)
{
    int err;
    void *res;

    if (iti.thread_id) {
        pthread_cancel(iti.thread_id);
        pthread_join(iti.thread_id, &res);
    }

    err = pthread_attr_init(&iti.thread_attr);
    assert(!err);

    err = pthread_create(&iti.thread_id, &iti.thread_attr, intervals_run,
                         NULL);
    assert(!err);
    pthread_setname_np(iti.thread_id, iti.thread_name);

    return 0;
}