netdata/netdata

View on GitHub
src/web/server/h2o/streaming.c

Summary

Maintainability
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

#include "daemon/common.h"
#include "streaming.h"
#include "connlist.h"
#include "h2o_utils.h"
#include "streaming/common.h"

static int pending_write_reqs = 0;

#define H2O2STREAM_BUF_SIZE (1024 * 1024)

// h2o_stream_conn_t related functions
void h2o_stream_conn_t_init(h2o_stream_conn_t *conn)
{
    memset(conn, 0, sizeof(*conn));
    conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE);
    conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE);

    pthread_mutex_init(&conn->rx_buf_lock, NULL);
    pthread_mutex_init(&conn->tx_buf_lock, NULL);
    pthread_cond_init(&conn->rx_buf_cond, NULL);
    // no need to check for NULL as rbuf_create uses mallocz internally
}

void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn)
{
    rbuf_free(conn->rx);
    rbuf_free(conn->tx);

    freez(conn->url);
    freez(conn->user_agent);

    pthread_mutex_destroy(&conn->rx_buf_lock);
    pthread_mutex_destroy(&conn->tx_buf_lock);
    pthread_cond_destroy(&conn->rx_buf_cond);
}

// streaming upgrade related functions
int is_streaming_handshake(h2o_req_t *req)
{
    /* method */
    if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET")))
        return 1;

    if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) {
        return 1;
    }

    /* upgrade header */
    if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME)))
        return 1;

    // TODO consider adding some key in form of random number
    // to prevent caching on route especially if TLS is not used
    // e.g. client sends random number
    // server replies with it xored

    return 0;
}

static void stream_on_close(h2o_stream_conn_t *conn);
void stream_process(h2o_stream_conn_t *conn, int initial);

void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize)
{
    h2o_stream_conn_t *conn = user_data;

    /* close the connection on error */
    if (sock == NULL) {
        stream_on_close(conn);
        return;
    }

    conn->sock = sock;
    sock->data = conn;

    conn_list_insert(&conn_list, conn);

    h2o_buffer_consume(&sock->input, reqsize);
    stream_process(conn, 1);
}

// handling of active streams
static void stream_on_close(h2o_stream_conn_t *conn)
{
    if (conn->sock != NULL)
        h2o_socket_close(conn->sock);

    conn_list_remove_conn(&conn_list, conn);

    pthread_mutex_lock(&conn->rx_buf_lock);
    conn->shutdown = 1;
    pthread_cond_broadcast(&conn->rx_buf_cond);
    pthread_mutex_unlock(&conn->rx_buf_lock);

    h2o_stream_conn_t_destroy(conn);
    freez(conn);
}

static void on_write_complete(h2o_socket_t *sock, const char *err)
{
    h2o_stream_conn_t *conn = sock->data;

    if (err != NULL) {
        stream_on_close(conn);
        error_report("Streaming connection error \"%s\"", err);
        return;
    }

    pthread_mutex_lock(&conn->tx_buf_lock);

    rbuf_bump_tail(conn->tx, conn->tx_buf.len);

    conn->tx_buf.base = NULL;
    conn->tx_buf.len = 0;

    pthread_mutex_unlock(&conn->tx_buf_lock);

    stream_process(conn, 0);
}

static void stream_on_recv(h2o_socket_t *sock, const char *err)
{
    h2o_stream_conn_t *conn = sock->data;

    if (err != NULL) {
        stream_on_close(conn);
        error_report("Streaming connection error \"%s\"", err);
        return;
    }
    stream_process(conn, 0);
}

#define PARSE_DONE 1
#define PARSE_ERROR -1
#define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0

#define STREAM_METHOD "STREAM "
#define USER_AGENT "User-Agent: "

#define NEED_MIN_BYTES(buf, bytes) do {      \
    if(rbuf_bytes_available(buf) < bytes)    \
        return GIMME_MORE_OF_DEM_SWEET_BYTEZ;\
} while(0)

// TODO check in streaming code this is probably defined somewhere already
#define MAX_LEN_STREAM_HELLO (1024*2)

static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent)
{
    int idx;
    switch(*parser_state) {
        case HTTP_STREAM:
            NEED_MIN_BYTES(buf, strlen(STREAM_METHOD));
            if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) {
                error_report("Expected \"%s\"", STREAM_METHOD);
                return PARSE_ERROR;
            }
            rbuf_bump_tail(buf, strlen(STREAM_METHOD));
            *parser_state = HTTP_URL;
            /* FALLTHROUGH */
        case HTTP_URL:
            if (!rbuf_find_bytes(buf, " ", 1, &idx)) {
                if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) {
                    error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
                    return PARSE_ERROR;
                }
            }
            *url = mallocz(idx + 1);
            rbuf_pop(buf, *url, idx);
            (*url)[idx] = 0;

            *parser_state = HTTP_PROTO;
            /* FALLTHROUGH */
        case HTTP_PROTO:
            NEED_MIN_BYTES(buf, strlen(HTTP_1_1));
            if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) {
                error_report("Expected \"%s\"", HTTP_1_1);
                return PARSE_ERROR;
            }
            rbuf_bump_tail(buf, strlen(HTTP_1_1));
            *parser_state = HTTP_USER_AGENT_KEY;
            /* FALLTHROUGH */
        case HTTP_USER_AGENT_KEY:
            // and OF COURSE EVERYTHING is passed in URL except
            // for user agent which we need and is passed as HTTP header
            // not worth writing a parser for this so we manually extract
            // just the single header we need and skip everything else
            if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) {
                if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
                    error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
                    return PARSE_ERROR;
                }
                return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
            }
            rbuf_bump_tail(buf, idx + strlen(USER_AGENT));
            *parser_state = HTTP_USER_AGENT_VALUE;
            /* FALLTHROUGH */
        case HTTP_USER_AGENT_VALUE:
            if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) {
                if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
                    error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
                    return PARSE_ERROR;
                }
                return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
            }

            *user_agent = mallocz(idx + 1);
            rbuf_pop(buf, *user_agent, idx);
            (*user_agent)[idx] = 0;

            *parser_state = HTTP_HDR;
            /* FALLTHROUGH */
        case HTTP_HDR:
            if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) {
                if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
                    error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
                    return PARSE_ERROR;
                }
                return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
            }
            rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END));

            *parser_state = HTTP_DONE;
            return PARSE_DONE;
        case HTTP_DONE:
            error_report("Parsing is done. No need to call again.");
            return PARSE_DONE;
        default:
            error_report("Unknown parser state %d", (int)*parser_state);
            return PARSE_ERROR;
    }
}

#define SINGLE_WRITE_MAX (1024)

void stream_process(h2o_stream_conn_t *conn, int initial)
{
    int rc;
    struct web_client w;

    pthread_mutex_lock(&conn->tx_buf_lock);
    if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) {
        if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) {
            conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len);
            if (conn->tx_buf.base) {
                conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX);
                h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete);
            }
        }
    }
    pthread_mutex_unlock(&conn->tx_buf_lock);

    if (initial)
        h2o_socket_read_start(conn->sock, stream_on_recv);

    if (conn->sock->input->size) {
        size_t insert_max;
        pthread_mutex_lock(&conn->rx_buf_lock);
        char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max);
        if (insert_loc == NULL) {
            pthread_cond_broadcast(&conn->rx_buf_cond);
            pthread_mutex_unlock(&conn->rx_buf_lock);
            return;
        }
        insert_max = MIN(insert_max, conn->sock->input->size);
        memcpy(insert_loc, conn->sock->input->bytes, insert_max);
        rbuf_bump_head(conn->rx, insert_max);

        h2o_buffer_consume(&conn->sock->input, insert_max);

        pthread_cond_broadcast(&conn->rx_buf_cond);
        pthread_mutex_unlock(&conn->rx_buf_lock);
    }

    switch (conn->state) {
        case STREAM_X_HTTP_1_1:
            // no conn->rx lock here as at this point we are still single threaded
            // until we call rrdpush_receiver_thread_spawn() later down
            rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent);
            if (rc == PARSE_ERROR) {
                error_report("error parsing the STREAM hello");
                break;
            }
            if (rc != PARSE_DONE)
                break;
            conn->state = STREAM_X_HTTP_1_1_DONE;
            /* FALLTHROUGH */
        case STREAM_X_HTTP_1_1_DONE:
            memset(&w, 0, sizeof(w));
            w.response.data = buffer_create(1024, NULL);

            // get client ip from the conn->sock
            struct sockaddr client;
            socklen_t len = h2o_socket_getpeername(conn->sock, &client);
            char peername[NI_MAXHOST];
            size_t peername_len = h2o_socket_getnumerichost(&client, len, peername);
            size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len;
            memcpy(w.client_ip, peername, cpy_len);
            w.client_ip[cpy_len - 1] = 0;
            w.user_agent = conn->user_agent;

            rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn);
            if (rc != HTTP_RESP_OK) {
                error_report("HTTPD Failed to spawn the receiver thread %d", rc);
                conn->state = STREAM_CLOSE;
                stream_on_close(conn);
            } else {
                conn->state = STREAM_ACTIVE;
            }
            buffer_free(w.response.data);
            /* FALLTHROUGH */
        case STREAM_ACTIVE:
            break;
        default:
            error_report("Unknown conn->state");
    }
}

// read and write functions to be used by streaming parser
int h2o_stream_write(void *ctx, const char *data, size_t data_len)
{
    h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;

    pthread_mutex_lock(&conn->tx_buf_lock);
    size_t avail = rbuf_bytes_free(conn->tx);
    avail = MIN(avail, data_len);
    rbuf_push(conn->tx, data, avail);
    pthread_mutex_unlock(&conn->tx_buf_lock);
    __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST);
    return avail;
}

size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes)
{
    int ret;
    h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;

    pthread_mutex_lock(&conn->rx_buf_lock);
    size_t avail = rbuf_bytes_available(conn->rx);

    if (!avail) {
        if (conn->shutdown) {
            pthread_mutex_unlock(&conn->rx_buf_lock);
            return -1;
        }
        pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock);
        if (conn->shutdown) {
            pthread_mutex_unlock(&conn->rx_buf_lock);
            return -1;
        }
        avail = rbuf_bytes_available(conn->rx);
        if (!avail) {
            pthread_mutex_unlock(&conn->rx_buf_lock);
            return 0;
        }
    }

    avail = MIN(avail, read_bytes);

    ret = rbuf_pop(conn->rx, buf, avail);
    pthread_mutex_unlock(&conn->rx_buf_lock);

    return ret;
}

// periodic check for pending write requests
void check_tx_buf(h2o_stream_conn_t *conn)
{
    pthread_mutex_lock(&conn->tx_buf_lock);
    if (rbuf_bytes_available(conn->tx)) {
        pthread_mutex_unlock(&conn->tx_buf_lock);
        stream_process(conn, 0);
    } else
        pthread_mutex_unlock(&conn->tx_buf_lock);
}

void h2o_stream_check_pending_write_reqs(void)
{
    int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST);
    if (_write_reqs > 0)
        conn_list_iter_all(&conn_list, check_tx_buf);
}