firehol/netdata

View on GitHub
src/spawn/spawn_client.c

Summary

Maintainability
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

#include "spawn.h"

static uv_process_t process;
static uv_pipe_t spawn_channel;
static uv_loop_t *loop;
uv_async_t spawn_async;

static char prot_buffer[MAX_COMMAND_LENGTH];
static unsigned prot_buffer_len = 0;

static void async_cb(uv_async_t *handle)
{
    uv_stop(handle->loop);
}

static void after_pipe_write(uv_write_t* req, int status)
{
    (void)status;
#ifdef SPAWN_DEBUG
    netdata_log_info("CLIENT %s called status=%d", __func__, status);
#endif
    void **data = req->data;
    freez(data[0]);
    freez(data[1]);
    freez(data);
}

static void client_parse_spawn_protocol(unsigned source_len, char *source)
{
    unsigned required_len;
    struct spawn_prot_header *header;
    struct spawn_prot_spawn_result *spawn_result;
    struct spawn_prot_cmd_exit_status *exit_status;
    struct spawn_cmd_info *cmdinfo;

    while (source_len) {
        required_len = sizeof(*header);
        if (prot_buffer_len < required_len)
            copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
        if (prot_buffer_len < required_len)
            return; /* Source buffer ran out */

        header = (struct spawn_prot_header *)prot_buffer;
        cmdinfo = (struct spawn_cmd_info *)header->handle;
        fatal_assert(NULL != cmdinfo);

        switch(header->opcode) {
        case SPAWN_PROT_SPAWN_RESULT:
            required_len += sizeof(*spawn_result);
            if (prot_buffer_len < required_len)
                copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
            if (prot_buffer_len < required_len)
                return; /* Source buffer ran out */

            spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
            uv_mutex_lock(&cmdinfo->mutex);
            cmdinfo->pid = spawn_result->exec_pid;
            if (0 == cmdinfo->pid) { /* Failed to spawn */
#ifdef SPAWN_DEBUG
                netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
#endif
                cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
                uv_cond_signal(&cmdinfo->cond);
            } else {
                cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
                cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
#ifdef SPAWN_DEBUG
                netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
#endif
            }
            uv_mutex_unlock(&cmdinfo->mutex);
            prot_buffer_len = 0;
            break;
        case SPAWN_PROT_CMD_EXIT_STATUS:
            required_len += sizeof(*exit_status);
            if (prot_buffer_len < required_len)
                copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
            if (prot_buffer_len < required_len)
                return; /* Source buffer ran out */

            exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
            uv_mutex_lock(&cmdinfo->mutex);
            cmdinfo->exit_status = exit_status->exec_exit_status;
#ifdef SPAWN_DEBUG
            netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
#endif
            cmdinfo->flags |= SPAWN_CMD_DONE;
            uv_cond_signal(&cmdinfo->cond);
            uv_mutex_unlock(&cmdinfo->mutex);
            prot_buffer_len = 0;
            break;
        default:
            fatal_assert(0);
            break;
        }

    }
}

static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
{
    if (0 == nread) {
        netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__);
    } else if (UV_EOF == nread) {
        netdata_log_info("EOF found in spawn pipe.");
    } else if (nread < 0) {
        netdata_log_error("%s: %s", __func__, uv_strerror(nread));
    }

    if (nread < 0) { /* stop stream due to EOF or error */
        (void)uv_read_stop((uv_stream_t *)pipe);
    } else if (nread) {
#ifdef SPAWN_DEBUG
        netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread);
#endif
        client_parse_spawn_protocol(nread, buf->base);
    }
    if (buf && buf->len) {
        freez(buf->base);
    }

    if (nread < 0) {
        uv_close((uv_handle_t *)pipe, NULL);
    }
}

static void on_read_alloc(uv_handle_t* handle,
                          size_t suggested_size,
                          uv_buf_t* buf)
{
    (void)handle;
    buf->base = mallocz(suggested_size);
    buf->len = suggested_size;
}

static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
{
    int ret;
    uv_buf_t *writebuf;
    struct write_context *write_ctx;

    void **data = callocz(2, sizeof(void *));
    writebuf = callocz(3, sizeof(uv_buf_t));
    write_ctx = callocz(1, sizeof(*write_ctx));

    data[0] = write_ctx;
    data[1] = writebuf;
    write_ctx->write_req.data = data;

    uv_mutex_lock(&cmdinfo->mutex);
    cmdinfo->flags |= SPAWN_CMD_PROCESSED;
    uv_mutex_unlock(&cmdinfo->mutex);

    write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
    write_ctx->header.handle = cmdinfo;
    write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);

    writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
    writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
    writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);

#ifdef SPAWN_DEBUG
    netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
#endif
    ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
    fatal_assert(ret == 0);
}

void spawn_client(void *arg)
{
    uv_thread_set_name_np("DAEMON_SPAWN");

    int ret;
    struct completion *completion = (struct completion *)arg;

    loop = mallocz(sizeof(uv_loop_t));
    ret = uv_loop_init(loop);
    if (ret) {
        netdata_log_error("uv_loop_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_loop_init;
    }
    loop->data = NULL;

    spawn_async.data = NULL;
    ret = uv_async_init(loop, &spawn_async, async_cb);
    if (ret) {
        netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_async_init;
    }

    ret = uv_pipe_init(loop, &spawn_channel, 1);
    if (ret) {
        netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret));
        spawn_thread_error = ret;
        goto error_after_pipe_init;
    }
    fatal_assert(spawn_channel.ipc);

    ret = create_spawn_server(loop, &spawn_channel, &process);
    if (ret) {
        netdata_log_error("Failed to fork spawn server process.");
        spawn_thread_error = ret;
        goto error_after_spawn_server;
    }

    spawn_thread_error = 0;
    spawn_thread_shutdown = 0;
    /* wake up initialization thread */
    completion_mark_complete(completion);

    prot_buffer_len = 0;
    ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
    fatal_assert(ret == 0);

    while (spawn_thread_shutdown == 0) {
        struct spawn_cmd_info *cmdinfo;

        uv_run(loop, UV_RUN_DEFAULT);
        while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
            spawn_process_cmd(cmdinfo);
        }
    }
    /* cleanup operations of the event loop */
    netdata_log_info("Shutting down spawn client event loop.");
    uv_close((uv_handle_t *)&spawn_channel, NULL);
    uv_close((uv_handle_t *)&spawn_async, NULL);
    uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */

    netdata_log_info("Shutting down spawn client loop complete.");
    fatal_assert(0 == uv_loop_close(loop));

    return;

error_after_spawn_server:
    uv_close((uv_handle_t *)&spawn_channel, NULL);
error_after_pipe_init:
    uv_close((uv_handle_t *)&spawn_async, NULL);
error_after_async_init:
    uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
    fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
    freez(loop);

    /* wake up initialization thread */
    completion_mark_complete(completion);
}