firehol/netdata

View on GitHub
src/spawn/spawn.c

Summary

Maintainability
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

#include "spawn.h"

static uv_thread_t thread;
int spawn_thread_error;
int spawn_thread_shutdown;

struct spawn_queue spawn_cmd_queue;

static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run)
{
    struct spawn_cmd_info *cmdinfo;

    cmdinfo = mallocz(sizeof(*cmdinfo));
    fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
    fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
    cmdinfo->serial = 0; /* invalid */
    cmdinfo->command_to_run = strdupz(command_to_run);
    cmdinfo->exit_status = -1; /* invalid */
    cmdinfo->pid = -1; /* invalid */
    cmdinfo->flags = 0;

    return cmdinfo;
}

void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
{
    uv_cond_destroy(&cmdinfo->cond);
    uv_mutex_destroy(&cmdinfo->mutex);

    freez(cmdinfo->command_to_run);
    freez(cmdinfo);
}

int spawn_cmd_compare(void *a, void *b)
{
    struct spawn_cmd_info *cmda = a, *cmdb = b;

    /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
    if (cmda->serial < cmdb->serial) return -1;
    if (cmda->serial > cmdb->serial) return 1;

    return 0;
}

static void init_spawn_cmd_queue(void)
{
    spawn_cmd_queue.cmd_tree.root = NULL;
    spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
    spawn_cmd_queue.size = 0;
    spawn_cmd_queue.latest_serial = 0;
    fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
    fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
}

/*
 * Returns serial number of the enqueued command
 */
uint64_t spawn_enq_cmd(const char *command_to_run)
{
    unsigned queue_size;
    uint64_t serial;
    avl_t *avl_ret;
    struct spawn_cmd_info *cmdinfo;

    cmdinfo = create_spawn_cmd(command_to_run);

    /* wait for free space in queue */
    uv_mutex_lock(&spawn_cmd_queue.mutex);
    while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
        uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
    }
    fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
    spawn_cmd_queue.size = queue_size + 1;

    serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
    cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */

    /* enqueue command */
    avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
    fatal_assert(avl_ret == (avl_t *)cmdinfo);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    /* wake up event loop */
    fatal_assert(0 == uv_async_send(&spawn_async));
    return serial;
}

/*
 * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
 */
void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
{
    avl_t *avl_ret;
    struct spawn_cmd_info tmp, *cmdinfo;

    tmp.serial = serial;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
    cmdinfo = (struct spawn_cmd_info *)avl_ret;

    uv_mutex_lock(&cmdinfo->mutex);
    while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
        /* Only 1 thread is allowed to wait for this command to finish */
        uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
    }
    uv_mutex_unlock(&cmdinfo->mutex);

    spawn_deq_cmd(cmdinfo);
    *exit_status = cmdinfo->exit_status;
    *exec_run_timestamp = cmdinfo->exec_run_timestamp;

    destroy_spawn_cmd(cmdinfo);
}

void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
{
    unsigned queue_size;
    avl_t *avl_ret;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    queue_size = spawn_cmd_queue.size;
    fatal_assert(queue_size);
    /* dequeue command */
    avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
    fatal_assert(avl_ret);

    spawn_cmd_queue.size = queue_size - 1;

    /* wake up callers */
    uv_cond_signal(&spawn_cmd_queue.cond);
    uv_mutex_unlock(&spawn_cmd_queue.mutex);
}

/*
 * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
 * only writer as far as struct spawn_cmd_info entries are concerned.
 */
static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
{
    struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;

    if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
        *cmdinfop = cmdinfo;
        return -1; /* break tree traversal */
    }
    return 0; /* continue traversing */
}

struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
{
    struct spawn_cmd_info *cmdinfo;
    unsigned queue_size;
    int ret;

    uv_mutex_lock(&spawn_cmd_queue.mutex);
    queue_size = spawn_cmd_queue.size;
    if (queue_size == 0) {
        uv_mutex_unlock(&spawn_cmd_queue.mutex);
        return NULL;
    }
    /* find command */
    cmdinfo = NULL;
    ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
    if (-1 != ret) { /* no commands available for processing */
        uv_mutex_unlock(&spawn_cmd_queue.mutex);
        return NULL;
    }
    uv_mutex_unlock(&spawn_cmd_queue.mutex);

    return cmdinfo;
}

/**
 * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
 * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
 * The caller has to be the netdata user as configured.
 *
 * @param loop the libuv loop of the caller context
 * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share
 * @param process the spawn server libuv process context
 * @return 0 on success or the libuv error code
 */
int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
{
    uv_process_options_t options = {0};
    char *args[3];
    int ret;
#define SPAWN_SERVER_DESCRIPTORS (3)
    uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
    struct passwd *passwd = NULL;
    char *user = NULL;

    passwd = getpwuid(getuid());
    user = (passwd && passwd->pw_name) ? passwd->pw_name : "";

    args[0] = netdata_exe_file;
    args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
    args[2] = NULL;

    memset(&options, 0, sizeof(options));
    options.file = netdata_exe_file;
    options.args = args;
    options.exit_cb = NULL; //exit_cb;
    options.stdio = stdio;
    options.stdio_count = SPAWN_SERVER_DESCRIPTORS;

    stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
    stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
    stdio[1].flags = UV_INHERIT_FD;
    stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
    stdio[2].flags = UV_INHERIT_FD;
    stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */;

    ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
    if (0 != ret) {
        netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret));
        fatal("Cannot start netdata without the spawn server.");
    }

    return ret;
}

#define CONCURRENT_SPAWNS 16
#define SPAWN_ITERATIONS  10000
#undef CONCURRENT_STRESS_TEST

void spawn_init(void)
{
    struct completion completion;
    int error;

    netdata_log_info("Initializing spawn client.");

    init_spawn_cmd_queue();

    completion_init(&completion);
    error = uv_thread_create(&thread, spawn_client, &completion);
    if (error) {
        netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
        goto after_error;
    }
    /* wait for spawn client thread to initialize */
    completion_wait_for(&completion);
    completion_destroy(&completion);

    if (spawn_thread_error) {
        error = uv_thread_join(&thread);
        if (error) {
            netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
        }
        goto after_error;
    }
#ifdef CONCURRENT_STRESS_TEST
    signals_reset();
    signals_unblock();

    sleep(60);
    uint64_t serial[CONCURRENT_SPAWNS];
    for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
        for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
            char cmd[64];
            sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
            serial[i] = spawn_enq_cmd(cmd);
            netdata_log_info("Queued command %s for spawning.", cmd);
        }
        int exit_status;
        time_t exec_run_timestamp;
        for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
            netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
                 exec_run_timestamp);
            spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
            netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
                 exec_run_timestamp);
        }
    }
    exit(0);
#endif
    return;

    after_error:
        netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
}