server/sampling_thread.c
#define _POSIX_C_SOURCE 200809L
#include <pthread.h>
#include <signal.h>
#include <sys/time.h>
#include <stdio.h>
#include <sched.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <syslog.h>
#include <linux/types.h>
#include <netlink/netlink.h>
#include <netlink/socket.h>
#include <netlink/utils.h>
#include <netlink/route/link.h>
#include "jittertrap.h"
#include "iface_stats.h"
#include "sampling_thread.h"
#include "sample_buf.h"
#include "timeywimey.h"
/* globals */
struct {
pthread_t thread_id;
pthread_attr_t thread_attr;
const char * const thread_name;
const int thread_prio;
} thread_info = {
0,
.thread_name = "jt-sample",
.thread_prio = 2
};
struct sigaction sa;
struct nl_sock *nl_sock;
pthread_mutex_t nl_sock_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t g_iface_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t g_stats_mutex = PTHREAD_MUTEX_INITIALIZER;
char *g_iface;
struct sample g_stats_o;
/* FIXME: Treat with extreme prejudice!
* There's some kind of bug (maybe in libnl?) where the rtnl_link_get_stat()
* will return stale data after a rtnl_link_get_kernel() call that changes
* the iface (and therefore also the link object).
*
* This hack effectively ignores the values of the first 50 results from
* rtnl_link_get_kernel();
*/
#define DISCARD_FIRST_N_READINGS 50
int reset_stats = DISCARD_FIRST_N_READINGS;
int sample_period_us;
void (*stats_handler)(struct iface_stats *counts);
/* local prototypes */
static void *run(void *data);
int get_sample_period(void)
{
return sample_period_us;
}
int sample_thread_init(void (*_stats_handler)(struct iface_stats *counts))
{
int err;
if (!g_iface || !_stats_handler) {
return -1;
}
stats_handler = _stats_handler;
if (!thread_info.thread_id) {
err = pthread_create(&thread_info.thread_id, NULL, run, NULL);
assert(!err);
pthread_setname_np(thread_info.thread_id,
thread_info.thread_name);
}
return 0;
}
/* update g_iface and reset the old stats. */
void sample_iface(const char *_iface)
{
pthread_mutex_lock(&g_iface_mutex);
pthread_mutex_lock(&g_stats_mutex);
g_stats_o.rx_bytes = 0;
g_stats_o.tx_bytes = 0;
g_stats_o.rx_packets = 0;
g_stats_o.tx_packets = 0;
reset_stats = DISCARD_FIRST_N_READINGS;
pthread_mutex_unlock(&g_stats_mutex);
if (g_iface) {
free(g_iface);
}
g_iface = strdup(_iface);
pthread_mutex_unlock(&g_iface_mutex);
}
static int init_nl(void)
{
/* Allocate and initialize a new netlink handle */
if (!(nl_sock = nl_socket_alloc())) {
syslog(LOG_ERR, "Failed to alloc netlink socket\n");
return -EOPNOTSUPP;
}
/* Bind and connect socket to protocol, NETLINK_ROUTE in our case. */
if (nl_connect(nl_sock, NETLINK_ROUTE) < 0) {
syslog(LOG_ERR, "Failed to connect to kernel\n");
return -EOPNOTSUPP;
}
return 0;
}
static int read_counters(const char *iface, struct sample *stats)
{
struct rtnl_link *link;
assert(nl_sock);
pthread_mutex_lock(&nl_sock_mutex);
/* iface index zero means use the iface name */
if (rtnl_link_get_kernel(nl_sock, 0, iface, &link) < 0) {
syslog(LOG_ERR, "unknown interface/link name: %s\n", iface);
pthread_mutex_unlock(&nl_sock_mutex);
return -1;
}
/* read and return counter */
stats->rx_bytes = rtnl_link_get_stat(link, RTNL_LINK_RX_BYTES);
stats->tx_bytes = rtnl_link_get_stat(link, RTNL_LINK_TX_BYTES);
stats->rx_packets = rtnl_link_get_stat(link, RTNL_LINK_RX_PACKETS);
stats->rx_packets += rtnl_link_get_stat(link, RTNL_LINK_RX_COMPRESSED);
stats->tx_packets = rtnl_link_get_stat(link, RTNL_LINK_TX_PACKETS);
stats->tx_packets += rtnl_link_get_stat(link, RTNL_LINK_TX_COMPRESSED);
rtnl_link_put(link);
pthread_mutex_unlock(&nl_sock_mutex);
return 0;
}
static void calc_deltas(struct sample *stats_o, struct sample *stats_c)
{
if (reset_stats-- > 0 || stats_o->rx_bytes > stats_c->rx_bytes) {
stats_o->rx_bytes = stats_c->rx_bytes;
stats_o->tx_bytes = stats_c->tx_bytes;
stats_o->rx_packets = stats_c->rx_packets;
stats_o->tx_packets = stats_c->tx_packets;
}
stats_c->rx_bytes_delta = stats_c->rx_bytes - stats_o->rx_bytes;
stats_c->tx_bytes_delta = stats_c->tx_bytes - stats_o->tx_bytes;
stats_c->rx_packets_delta = stats_c->rx_packets - stats_o->rx_packets;
stats_c->tx_packets_delta = stats_c->tx_packets - stats_o->tx_packets;
}
static void
update_stats(struct sample *sample_c, char *iface, struct timespec deadline)
{
struct sample sample_o;
struct timespec whoosh_err; /* the sound of a missed deadline. */
pthread_mutex_lock(&g_stats_mutex);
/* FIXME: this smells funny */
memcpy(&sample_o, &g_stats_o, sizeof(struct sample));
if (0 == read_counters(iface, sample_c)) {
clock_gettime(CLOCK_MONOTONIC, &(sample_c->timestamp));
whoosh_err = ts_absdiff(sample_c->timestamp, deadline);
sample_c->whoosh_error_ns = whoosh_err.tv_nsec;
calc_deltas(&sample_o, sample_c);
}
memcpy(&g_stats_o, sample_c, sizeof(struct sample));
pthread_mutex_unlock(&g_stats_mutex);
}
#define handle_error_en(en, msg) \
do { \
errno = en; \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
static void set_affinity(void)
{
int s, j;
cpu_set_t cpuset;
pthread_t thread;
thread = pthread_self();
/* Set affinity mask to include CPUs 1 only */
CPU_ZERO(&cpuset);
CPU_SET(RT_CPU, &cpuset);
s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
if (s != 0) {
handle_error_en(s, "pthread_setaffinity_np");
}
/* Check the actual affinity mask assigned to the thread */
s = pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
if (s != 0) {
handle_error_en(s, "pthread_getaffinity_np");
}
char buff[64] = {0};
char *offset = buff;
int blen = sizeof(buff);
for (j = 0; j < CPU_SETSIZE; j++) {
if (CPU_ISSET(j, &cpuset)) {
snprintf(offset, blen, "CPU%d ", j);
blen -= strlen(offset);
offset += strlen(offset);
}
}
syslog(LOG_DEBUG, "[RT thread %s] priority [%d] CPU affinity: %s",
thread_info.thread_name, thread_info.thread_prio, buff);
}
static int init_realtime(void)
{
struct sched_param schedparm;
memset(&schedparm, 0, sizeof(schedparm));
schedparm.sched_priority = thread_info.thread_prio;
sched_setscheduler(0, SCHED_FIFO, &schedparm);
set_affinity();
return 0;
}
/* microseconds */
void set_sample_period(int period)
{
if (period < 100)
period = 100;
sample_period_us = period;
}
static void *run(void *data)
{
(void)data; /* unused parameter. silence warning. */
init_nl();
raw_sample_buf_init();
init_realtime();
set_sample_period(SAMPLE_PERIOD_US);
struct timespec deadline;
clock_gettime(CLOCK_MONOTONIC, &deadline);
int sample_no = 0;
char *iface = strdup(g_iface);
struct iface_stats *stats_frame = raw_sample_buf_produce_next();
snprintf(stats_frame->iface, MAX_IFACE_LEN, "%s", iface);
stats_frame->sample_period_us = sample_period_us;
for (;;) {
update_stats(&(stats_frame->samples[sample_no]), iface,
deadline);
sample_no++;
sample_no %= SAMPLES_PER_FRAME;
/* set the iface, samples per period at start of each frame*/
if (sample_no == 0) {
pthread_mutex_lock(&g_iface_mutex);
free(iface);
iface = strdup(g_iface);
stats_frame = raw_sample_buf_produce_next();
snprintf(stats_frame->iface, MAX_IFACE_LEN, "%s",
iface);
pthread_mutex_unlock(&g_iface_mutex);
stats_frame->sample_period_us = sample_period_us;
stats_handler(raw_sample_buf_consume_next());
}
deadline.tv_nsec += 1000 * sample_period_us;
/* Normalize the time to account for the second boundary */
if (deadline.tv_nsec >= 1000000000) {
deadline.tv_nsec -= 1000000000;
deadline.tv_sec++;
}
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &deadline,
NULL);
}
return NULL;
}