acooks/jittertrap

View on GitHub
server/test_mq_mt.c

Summary

Maintainability
Test Coverage
D
66%
#define _POSIX_C_SOURCE 200809L
#include <pthread.h>
#include <time.h>
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>

#include "mq_msg_ws.h"


#ifdef SPEED_TEST
#define TEST_ITERATIONS 10000000
#define DO_PRODUCER_SLEEP 0
#define DO_CONSUMER_SLEEP 0
#else
#define TEST_ITERATIONS 100000
#define DO_PRODUCER_SLEEP 1
#define DO_CONSUMER_SLEEP 1
#endif

#define PRINT_TAIL 1

int message_producer(struct mq_ws_msg *m, void *data)
{
    char *s = (char *)data;
    snprintf(m->m, MAX_JSON_MSG_LEN, "%s", s);
    return 0;
}

/* a callback for consuming messages. */
int message_printer(struct mq_ws_msg *m, void *data __attribute__((unused)))
{
    assert(m);
    printf("m: %s\n", m->m);
    return 0;
}

int string_copier(struct mq_ws_msg *m, void *data)
{
    char *s;

    assert(m);
    assert(data);

    s = (char *)data;
    sprintf(s, "%s", m->m);
    return 0;
}

/* dont ask, just condemn it. */
void nsleep(unsigned long delay)
{
    struct timespec deadline;

    clock_gettime(CLOCK_MONOTONIC, &deadline);
    deadline.tv_nsec += delay;

    /* account for the second boundary */
    if (deadline.tv_nsec >= 1E9) {
        deadline.tv_nsec -= 1E9;
        deadline.tv_sec++;
    }

    clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &deadline, NULL);
}

/* Calculate the absolute difference between t1 and t2. */
struct timespec ts_absdiff(struct timespec t1, struct timespec t2)
{
    struct timespec t;
    if ((t1.tv_sec < t2.tv_sec) ||
        ((t1.tv_sec == t2.tv_sec) && (t1.tv_nsec <= t2.tv_nsec))) {
        /* t1 <= t2 */
        t.tv_sec = t2.tv_sec - t1.tv_sec;
        if (t2.tv_nsec < t1.tv_nsec) {
            t.tv_nsec = t2.tv_nsec + 1E9 - t1.tv_nsec;
            t.tv_sec--;
        } else {
            t.tv_nsec = t2.tv_nsec - t1.tv_nsec;
        }
    } else {
        /* t1 > t2 */
        t.tv_sec = t1.tv_sec - t2.tv_sec;
        if (t1.tv_nsec < t2.tv_nsec) {
            t.tv_nsec = t1.tv_nsec + 1E9 - t2.tv_nsec;
            t.tv_sec--; /* Borrow a second. */
        } else {
            t.tv_nsec = t1.tv_nsec - t2.tv_nsec;
        }
    }
    return t;
}

double ts_to_seconds(struct timespec t)
{
    return t.tv_sec + t.tv_nsec / 1E9;
}

void *produce(void *d __attribute__((unused)))
{
    char msg[MAX_JSON_MSG_LEN];
    int err, cb_err;
    struct timespec deadline;

    printf("new producer for %d messages.\n", TEST_ITERATIONS);

    clock_gettime(CLOCK_MONOTONIC, &deadline);

    int i = TEST_ITERATIONS;
    while (i) {
#if DO_PRODUCER_SLEEP
        int r = rand() % 100;
        nsleep(100 + r);
#endif

        sprintf(msg, "%d", i);

        err = mq_ws_produce(message_producer, msg, &cb_err);
        if (!err) {
#if PRINT_TAIL
            if (i < PRINT_TAIL) {
                printf("p%03d\n", i);
            }
#endif
            i--;
        }

        if (0 == i % 5000) {
            printf("%2d%%\r",
            (int)(TEST_ITERATIONS - i) * 100 / TEST_ITERATIONS);
            fflush(stdout);
        }
    }
    assert(!err);
    printf("producer done.\n");
    return NULL;
}

void *consume(void *_tid)
{
    char msg[MAX_JSON_MSG_LEN];
    int err, cb_err;
    unsigned long id;
    struct timespec deadline;
    int tid = *(int *)_tid;
    char space[MAX_JSON_MSG_LEN];
    memset(space, ' ', 20 * tid);

    err = mq_ws_consumer_subscribe(&id);
    assert(!err);

    printf("consumer: %d mq id: %lu for %d messages\n", tid, id,
           TEST_ITERATIONS);

    clock_gettime(CLOCK_MONOTONIC, &deadline);

    int i = TEST_ITERATIONS;
    while (i) {
#if DO_CONSUMER_SLEEP
        int r = rand() % 100;
        nsleep(100 + r);
#endif

        err = mq_ws_consume(id, string_copier, msg, &cb_err);
        if (!err) {
#if PRINT_TAIL
            if (i < PRINT_TAIL) {
                printf("%sc%d-%03d-%s\n", space, tid, i, msg);
            }
#endif
            i--;
        }
    }

    assert(!err);

    err = mq_ws_consumer_unsubscribe(id);
    assert(!err);

    return NULL;
}

int main()
{
    int x, y, err;
    struct timespec start;
    struct timespec end;

    pthread_t producer_thread;
    pthread_t consumer_thread1;
    pthread_t consumer_thread2;

    err = mq_ws_init("test");
    assert(!err);

    x = 1;
    if (pthread_create(&consumer_thread1, NULL, consume, &x)) {
        fprintf(stderr, "Error creating consumer thread 1\n");
        return 1;
    }

    y = 2;
    if (pthread_create(&consumer_thread2, NULL, consume, &y)) {
        fprintf(stderr, "Error creating consumer thread 2\n");
        return 1;
    }

    /* always let the producer wait for the consumers to be ready, or
     * the second consumer can miss messages at the start and wait
     * forever. */
    nsleep(1000000);

    clock_gettime(CLOCK_MONOTONIC, &start);

    /* start the producer after the consumers, or they will miss the
     * messages that were qeued before they subscribed and you will
     * waste a lot of time looking for a bug that doesn't exist.
     */
    if (pthread_create(&producer_thread, NULL, produce, &x)) {
        fprintf(stderr, "Error creating producer thread\n");
        return 1;
    }

    /* wait for threads */
    if (pthread_join(producer_thread, NULL)) {
        fprintf(stderr, "Error joining thread\n");
        return 2;
    }

    if (pthread_join(consumer_thread1, NULL)) {
        fprintf(stderr, "Error joining consumer thread 1\n");
        return 2;
    }

    if (pthread_join(consumer_thread2, NULL)) {
        fprintf(stderr, "Error joining consumer thread 2\n");
        return 2;
    }

    clock_gettime(CLOCK_MONOTONIC, &end);

#ifdef SPEED_TEST
    struct timespec diff;
    double rate;
    diff = ts_absdiff(start, end);
    rate = TEST_ITERATIONS / ts_to_seconds(diff);

    printf("message queue OK. %.2f msgs/s\n", rate);
#else
    printf("message queue OK.\n");
#endif

    err = mq_ws_destroy();
    assert(!err);

    return 0;
}