netdata/netdata

View on GitHub
src/aclk/mqtt_websockets/ws_client.c

Summary

Maintainability
Test Coverage
// Copyright (C) 2020 Timotej Šiškovič
// SPDX-License-Identifier: GPL-3.0-only
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
//
// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with this program.
// If not, see <https://www.gnu.org/licenses/>.

#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>

#include <openssl/evp.h>

#include "ws_client.h"
#include "common_internal.h"

#ifdef MQTT_WEBSOCKETS_DEBUG
#include "../c-rbuf/src/ringbuffer_internal.h"
#endif

#define UNIT_LOG_PREFIX "ws_client: "
#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
#define WARN(fmt, ...)  mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
#define INFO(fmt, ...)  mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)

const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
                              "Host: %s\x0D\x0A"
                              "Upgrade: websocket\x0D\x0A"
                              "Connection: Upgrade\x0D\x0A"
                              "Sec-WebSocket-Key: %s\x0D\x0A"
                              "Origin: http://example.com\x0D\x0A"
                              "Sec-WebSocket-Protocol: mqtt\x0D\x0A"
                              "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A";

const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

#define DEFAULT_RINGBUFFER_SIZE (1024*128)
#define ENTROPY_SOURCE "/dev/urandom"
ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
{
    ws_client *client;

    if(!host)
        return NULL;

    client = mw_calloc(1, sizeof(ws_client));
    if (!client)
        return NULL;

    client->host = host;
    client->log = log;

    client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
    if (!client->buf_read)
        goto cleanup;

    client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
    if (!client->buf_write)
        goto cleanup_1;

    client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
    if (!client->buf_to_mqtt)
        goto cleanup_2;

    client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC);
    if (client->entropy_fd < 1) {
        ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
        goto cleanup_3;
    }

    return client;

cleanup_3:
    rbuf_free(client->buf_to_mqtt);
cleanup_2:
    rbuf_free(client->buf_write);
cleanup_1:
    rbuf_free(client->buf_read);
cleanup:
    mw_free(client);
    return NULL;
}

void ws_client_free_headers(ws_client *client)
{
    struct http_header *ptr = client->hs.headers;
    struct http_header *tmp;

    while (ptr) {
        tmp = ptr;
        ptr = ptr->next;
        mw_free(tmp);
    }

    client->hs.headers = NULL;
    client->hs.headers_tail = NULL;
    client->hs.hdr_count = 0;
}

void ws_client_destroy(ws_client *client)
{
    ws_client_free_headers(client);
    mw_free(client->hs.nonce_reply);
    mw_free(client->hs.http_reply_msg);
    close(client->entropy_fd);
    rbuf_free(client->buf_read);
    rbuf_free(client->buf_write);
    rbuf_free(client->buf_to_mqtt);
    mw_free(client);
}

void ws_client_reset(ws_client *client)
{
    ws_client_free_headers(client);
    mw_free(client->hs.nonce_reply);
    client->hs.nonce_reply = NULL;
    mw_free(client->hs.http_reply_msg);
    client->hs.http_reply_msg = NULL;
    rbuf_flush(client->buf_read);
    rbuf_flush(client->buf_write);
    rbuf_flush(client->buf_to_mqtt);
    client->state = WS_RAW;
    client->hs.hdr_state = WS_HDR_HTTP;
    client->rx.parse_state = WS_FIRST_2BYTES;
}

#define MAX_HTTP_HDR_COUNT 128
int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
{
    if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) {
        ERROR("Too many HTTP response header fields");
        return -1;
    }

    if (client->hs.headers)
        client->hs.headers_tail->next = hdr;
    else
        client->hs.headers = hdr;

    client->hs.headers_tail = hdr;
    client->hs.hdr_count++;

    return 0;
}

int ws_client_want_write(ws_client *client)
{
    return rbuf_bytes_available(client->buf_write);
}

#define RAND_SRC "/dev/urandom"
static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size)
{
    // we do not need crypto secure random here
    // it's just used for protocol negotiation
    int rd;
    int f = open(RAND_SRC, O_RDONLY | O_CLOEXEC);
    if (f < 0) {
        ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno));
        return -2;
    }

    if ((rd = read(f, dest, size)) > 0) {
        close(f);
        return rd;
    }
    close(f);
    return -1;
}

#define WEBSOCKET_NONCE_SIZE 16
#define TEMP_BUF_SIZE 4096
int ws_client_start_handshake(ws_client *client)
{
    char nonce[WEBSOCKET_NONCE_SIZE];
    char nonce_b64[256];
    char second[TEMP_BUF_SIZE];
    unsigned int md_len;
    unsigned char *digest;
    EVP_MD_CTX *md_ctx;
    const EVP_MD *md;

    if(!*client->host) {
        ERROR("Hostname has not been set. We should not be able to come here!");
        return 1;
    }

    ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE);
    EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
    snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr,
        *client->host,
        nonce_b64);
    if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
        ERROR("Write buffer capacity too low.");
        return 1;
    }

    rbuf_push(client->buf_write, second, strlen(second));
    client->state = WS_HANDSHAKE;

    //Calculating expected Sec-WebSocket-Accept reply
    snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);

#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
    md_ctx = EVP_MD_CTX_create();
#else
    md_ctx = EVP_MD_CTX_new();
#endif
    if (md_ctx == NULL) {
        ERROR("Cant create EVP_MD Context");
        return 1;
    }

    md = EVP_get_digestbyname("sha1");
    if (!md) {
        ERROR("Unknown message digest");
        return 1;
    }

    if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) {
        ERROR("Cant alloc digest");
        return 1;
    }

    EVP_DigestInit_ex(md_ctx, md, NULL);
    EVP_DigestUpdate(md_ctx, second, strlen(second));
    EVP_DigestFinal_ex(md_ctx, digest, &md_len);

    EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len);

    mw_free(client->hs.nonce_reply);
    client->hs.nonce_reply = mw_strdup(nonce_b64);

    OPENSSL_free(digest);

#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
    EVP_MD_CTX_destroy(md_ctx);
#else
    EVP_MD_CTX_free(md_ctx);
#endif

    return 0;
}

#define BUF_READ_MEMCMP_CONST(const, err)                                                                              \
    if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) {                                                       \
        ERROR(err);                                                                                                    \
        rbuf_flush(client->buf_read);                                                                                  \
        return WS_CLIENT_PROTOCOL_ERROR;                                                                               \
    }

#define BUF_READ_CHECK_AT_LEAST(x)                                                                                     \
    if (rbuf_bytes_available(client->buf_read) < x)                                                                    \
        return WS_CLIENT_NEED_MORE_BYTES;

#define MAX_HTTP_LINE_LENGTH 1024*4
#define HTTP_SC_LENGTH 4 // "XXX " http status code as C string
#define WS_CLIENT_HTTP_HDR "HTTP/1.1 "
#define WS_CONN_ACCEPT "sec-websocket-accept"
#define HTTP_HDR_SEPARATOR ": "
#define WS_NONCE_STRLEN_B64 28
#define WS_HTTP_NEWLINE "\r\n"
#define HTTP_HEADER_NAME_MAX_LEN 256
#if HTTP_HEADER_NAME_MAX_LEN > MAX_HTTP_LINE_LENGTH
#error "Buffer too small"
#endif
#if WS_NONCE_STRLEN_B64 > MAX_HTTP_LINE_LENGTH
#error "Buffer too small"
#endif

#define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \
{ \
    ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
    return WS_CLIENT_PROTOCOL_ERROR; \
}

int ws_client_parse_handshake_resp(ws_client *client)
{
    char buf[HTTP_SC_LENGTH];
    int idx_crlf, idx_sep;
    char *ptr;
    size_t bytes;
    switch (client->hs.hdr_state) {
        case WS_HDR_HTTP:
            BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR))
            BUF_READ_MEMCMP_CONST(WS_CLIENT_HTTP_HDR, "Expected \"HTTP1.1\" header");
            rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR));
            client->hs.hdr_state = WS_HDR_RC;
            break;
        case WS_HDR_RC:
            BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
            rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
            if (buf[HTTP_SC_LENGTH - 1] != 0x20) {
                ERROR("HTTP status code received is not terminated by space (0x20)");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            buf[HTTP_SC_LENGTH - 1] = 0;
            client->hs.http_code = atoi(buf);
            if (client->hs.http_code < 100 || client->hs.http_code >= 600) {
                ERROR("HTTP status code received not in valid range 100-600");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            client->hs.hdr_state = WS_HDR_ENDLINE;
            break;
        case WS_HDR_ENDLINE:
            ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
            if (!ptr) {
                bytes = rbuf_bytes_available(client->buf_read);
                HTTP_HDR_LINE_CHECK_LIMIT(bytes);
                return WS_CLIENT_NEED_MORE_BYTES;
            }
            HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);

            client->hs.http_reply_msg = mw_malloc(idx_crlf+1);
            rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf);
            client->hs.http_reply_msg[idx_crlf] = 0;
            rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
            client->hs.hdr_state = WS_HDR_PARSE_HEADERS;
            break;
        case WS_HDR_PARSE_HEADERS:
            ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
            if (!ptr) {
                bytes = rbuf_bytes_available(client->buf_read);
                HTTP_HDR_LINE_CHECK_LIMIT(bytes);
                return WS_CLIENT_NEED_MORE_BYTES;
            }
            HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);

            if (!idx_crlf) { // empty line, header end
                rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
                client->hs.hdr_state = WS_HDR_PARSE_DONE;
                return 0;
            }

            ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep);
            if (!ptr || idx_sep > idx_crlf) {
                ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) {
                ERROR("HTTP Header value cannot be empty");
                return WS_CLIENT_PROTOCOL_ERROR;
            }

            if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) {
                ERROR("HTTP header too long (%d)", idx_sep);
                return WS_CLIENT_PROTOCOL_ERROR;
            }

            struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes
            hdr->key = ((char*)hdr) + sizeof(struct http_header);
            hdr->value = hdr->key + idx_sep + 1;

            bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep);
            rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR));

            bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
            rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));

            for (int i = 0; hdr->key[i]; i++)
                hdr->key[i] = tolower(hdr->key[i]);

//            DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value);

            if (ws_client_add_http_header(client, hdr))
                return WS_CLIENT_PROTOCOL_ERROR;

            if (!strcmp(hdr->key, WS_CONN_ACCEPT)) {
                if (strcmp(client->hs.nonce_reply, hdr->value)) {
                    ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
                    return WS_CLIENT_PROTOCOL_ERROR;
                }
                client->hs.nonce_matched = 1;
            }

            break;
        case WS_HDR_PARSE_DONE:
            if (!client->hs.nonce_matched) {
                ERROR("Missing " WS_CONN_ACCEPT " header");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            if (client->hs.http_code != 101) {
                ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
                return WS_CLIENT_PROTOCOL_ERROR;
            }

            client->state = WS_ESTABLISHED;
            client->hs.hdr_state = WS_HDR_ALL_DONE;
            INFO("Websocket Connection Accepted By Server");
            return WS_CLIENT_PARSING_DONE;
        case WS_HDR_ALL_DONE:
            FATAL("This is error we should never come here!");
            return WS_CLIENT_PROTOCOL_ERROR;
    }
    return 0;
}

#define BYTE_MSB          0x80
#define WS_FINAL_FRAG     BYTE_MSB
#define WS_PAYLOAD_MASKED BYTE_MSB

static inline size_t get_ws_hdr_size(size_t payload_size)
{
    size_t hdr_len = 2 + 4 /*mask*/;
    if(payload_size > 125)
        hdr_len += 2;
    if(payload_size > 65535)
        hdr_len += 6;
    return hdr_len;
}

#define MAX_POSSIBLE_HDR_LEN 14
int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
{
    // TODO maybe? implement fragmenting, it is not necessary though
    // as both tested MQTT brokers have no reuirement of one MQTT envelope
    // be equal to one WebSockets envelope. Therefore there is no need to send
    // one big MQTT message as single fragmented WebSocket envelope
    char hdr[MAX_POSSIBLE_HDR_LEN];
    char *ptr = hdr;
    char *mask;
    int size_written = 0;
    size_t j = 0;

    size_t w_buff_free = rbuf_bytes_free(client->buf_write);
    size_t hdr_len = get_ws_hdr_size(size);

    if (w_buff_free < hdr_len * 2) {
#ifdef DEBUG_ULTRA_VERBOSE
        DEBUG("Write buffer full. Can't write requested %d size.", size);
#endif
        return 0;
    }

    if (w_buff_free < (hdr_len + size)) {
#ifdef DEBUG_ULTRA_VERBOSE
        DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len);
#endif
        size = w_buff_free - hdr_len;
        hdr_len = get_ws_hdr_size(size);
        // the actual needed header size might decrease if we cut number of bytes
        // if decrease of size crosses 65535 or 125 boundary
        // but I can live with that at least for now
        // worst case is we have 6 more bytes we could have written
        // no bigus dealus
    }

    *ptr++ = frame_type | WS_FINAL_FRAG;

    //generate length
    *ptr = WS_PAYLOAD_MASKED;
    if (size > 65535) {
        *ptr++ |= 0x7f;
        uint64_t be = htobe64(size);
        memcpy(ptr, (void *)&be, sizeof(be));
        ptr += sizeof(be);
    } else if (size > 125) {
        *ptr++ |= 0x7e;
        uint16_t be = htobe16(size);
        memcpy(ptr, (void *)&be, sizeof(be));
        ptr += sizeof(be);
    } else
        *ptr++ |= size;
    
    mask = ptr;
    if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) {
        ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\"");
        return -2;
    }

    rbuf_push(client->buf_write, hdr, hdr_len);

    if (!size)
        return 0;

    // copy and mask data in the write ringbuffer
    while (size - size_written) {
        size_t writable_bytes;
        char *w_ptr = rbuf_get_linear_insert_range(client->buf_write, &writable_bytes);
        if(!writable_bytes)
            break;

        writable_bytes = (writable_bytes > size) ? (size - size_written) : writable_bytes;

        memcpy(w_ptr, &data[size_written], writable_bytes);
        rbuf_bump_head(client->buf_write, writable_bytes);

        for (size_t i = 0; i < writable_bytes; i++, j++)
            w_ptr[i] ^= mask[j % 4];
        size_written += writable_bytes;
    }
    return size_written;
}

static int check_opcode(ws_client *client,enum websocket_opcode oc)
{
    switch(oc) {
        case WS_OP_BINARY_FRAME:
        case WS_OP_CONNECTION_CLOSE:
        case WS_OP_PING:
            return 0;
        case WS_OP_CONTINUATION_FRAME:
            FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
            return 0;
        case WS_OP_TEXT_FRAME:
            FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
            return 0;
        case WS_OP_PONG:
            FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!");
            return 0;
        default:
            return WS_CLIENT_PROTOCOL_ERROR;
    }
}

static inline void ws_client_rx_post_hdr_state(ws_client *client)
{
    switch(client->rx.opcode) {
        case WS_OP_BINARY_FRAME:
            client->rx.parse_state = WS_PAYLOAD_DATA;
            return;
        case WS_OP_CONNECTION_CLOSE:
            client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE;
            return;
        case WS_OP_PING:
            client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD;
            return;
        default:
            client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD;
            return;
    }
}

#define LONGEST_POSSIBLE_HDR_PART 8
int ws_client_process_rx_ws(ws_client *client)
{
    char buf[LONGEST_POSSIBLE_HDR_PART];
    size_t size;
    switch (client->rx.parse_state) {
        case WS_FIRST_2BYTES:
            BUF_READ_CHECK_AT_LEAST(2);
            rbuf_pop(client->buf_read, buf, 2);
            client->rx.opcode = buf[0] & (char)~BYTE_MSB;

            if (!(buf[0] & (char)~WS_FINAL_FRAG)) {
                ERROR("Not supporting fragmented messages yet!");
                return WS_CLIENT_PROTOCOL_ERROR;
            }

            if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
                return WS_CLIENT_PROTOCOL_ERROR;

            if (buf[1] & (char)WS_PAYLOAD_MASKED) {
                ERROR("Mask is not allowed in Server->Client Websocket direction.");
                return WS_CLIENT_PROTOCOL_ERROR;
            }

            switch (buf[1]) {
                case 127:
                    client->rx.parse_state = WS_PAYLOAD_EXTENDED_64;
                    break;
                case 126:
                    client->rx.parse_state = WS_PAYLOAD_EXTENDED_16;
                    break;
                default:
                    client->rx.payload_length = buf[1];
                    ws_client_rx_post_hdr_state(client);
            }
            break;
        case WS_PAYLOAD_EXTENDED_16:
            BUF_READ_CHECK_AT_LEAST(2);
            rbuf_pop(client->buf_read, buf, 2);
            client->rx.payload_length = be16toh(*((uint16_t *)buf));
            ws_client_rx_post_hdr_state(client);
            break;
        case WS_PAYLOAD_EXTENDED_64:
            BUF_READ_CHECK_AT_LEAST(LONGEST_POSSIBLE_HDR_PART);
            rbuf_pop(client->buf_read, buf, LONGEST_POSSIBLE_HDR_PART);
            client->rx.payload_length = be64toh(*((uint64_t *)buf));
            ws_client_rx_post_hdr_state(client);
            break;
        case WS_PAYLOAD_DATA:
            // TODO not pretty?
            while (client->rx.payload_processed < client->rx.payload_length) {
                size_t remaining = client->rx.payload_length - client->rx.payload_processed;
                if (!rbuf_bytes_available(client->buf_read))
                    return WS_CLIENT_NEED_MORE_BYTES;
                char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size);
                if (!insert) {
#ifdef DEBUG_ULTRA_VERBOSE
                    DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining);
#endif
                    return WS_CLIENT_BUFFER_FULL;
                }
                size = (size > remaining) ? remaining : size;
                size = rbuf_pop(client->buf_read, insert, size);
                rbuf_bump_head(client->buf_to_mqtt, size);
                client->rx.payload_processed += size;
            }
            client->rx.parse_state = WS_PACKET_DONE;
            break;
        case WS_PAYLOAD_CONNECTION_CLOSE:
            // for WS_OP_CONNECTION_CLOSE allowed is
            // a) empty payload
            // b) 2byte reason code
            // c) 2byte reason code followed by message
            if (client->rx.payload_length == 1) {
                ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            if (!client->rx.payload_length) {
                INFO("WebSocket server closed the connection without giving reason.");
                client->rx.parse_state = WS_PACKET_DONE;
                break;
            }
            client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_EC;
            break;
        case WS_PAYLOAD_CONNECTION_CLOSE_EC:
            BUF_READ_CHECK_AT_LEAST(sizeof(uint16_t));

            rbuf_pop(client->buf_read, buf, sizeof(uint16_t));
            client->rx.specific_data.op_close.ec = be16toh(*((uint16_t *)buf));
            client->rx.payload_processed += sizeof(uint16_t);

            if(client->rx.payload_processed == client->rx.payload_length) {
                INFO("WebSocket server closed the connection with EC=%d. Without message.",
                    client->rx.specific_data.op_close.ec);
                client->rx.parse_state = WS_PACKET_DONE;
                break;
            }
            client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_MSG;
            break;
        case WS_PAYLOAD_CONNECTION_CLOSE_MSG:
            if (!client->rx.specific_data.op_close.reason)
                client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1);

            while (client->rx.payload_processed < client->rx.payload_length) {
                if (!rbuf_bytes_available(client->buf_read))
                    return WS_CLIENT_NEED_MORE_BYTES;
                client->rx.payload_processed += rbuf_pop(client->buf_read,
                                                         &client->rx.specific_data.op_close.reason[client->rx.payload_processed - sizeof(uint16_t)],
                                                         client->rx.payload_length - client->rx.payload_processed);
            }
            client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0;
            INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
                client->rx.specific_data.op_close.ec,
                client->rx.specific_data.op_close.reason);
            mw_free(client->rx.specific_data.op_close.reason);
            client->rx.specific_data.op_close.reason = NULL;
            client->rx.parse_state = WS_PACKET_DONE;
            break;
        case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD:
            BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
            WARN("Skipping Websocket Packet of unsupported/unknown type");
            if (client->rx.payload_length)
                rbuf_bump_tail(client->buf_read, client->rx.payload_length);
            client->rx.parse_state = WS_PACKET_DONE;
            return WS_CLIENT_PARSING_DONE;
        case WS_PAYLOAD_PING_REQ_PAYLOAD:
            if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) {
                ERROR("Ping arrived with payload which is too big!");
                return WS_CLIENT_INTERNAL_ERROR;
            }
            BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
            client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length);
            rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length);
            // TODO schedule this instead of sending right away
            // then attempt to send as soon as buffer space clears up
            size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length);
            if (size != client->rx.payload_length) {
                ERROR("Unable to send the PONG as one packet back. Closing connection.");
                return WS_CLIENT_PROTOCOL_ERROR;
            }
            client->rx.parse_state = WS_PACKET_DONE;
            return WS_CLIENT_PARSING_DONE;
        case WS_PACKET_DONE:
            client->rx.parse_state = WS_FIRST_2BYTES;
            client->rx.payload_processed = 0;
            if (client->rx.opcode == WS_OP_CONNECTION_CLOSE)
                return WS_CLIENT_CONNECTION_CLOSED;
            return WS_CLIENT_PARSING_DONE;
        default:
            FATAL("Unknown parse state");
            return WS_CLIENT_INTERNAL_ERROR;
    }
    return 0;
}

int ws_client_process(ws_client *client)
{
    int ret;
    switch(client->state) {
        case WS_RAW:
            if (ws_client_start_handshake(client))
                return WS_CLIENT_INTERNAL_ERROR;
            return WS_CLIENT_NEED_MORE_BYTES;
        case WS_HANDSHAKE:
            do {
                ret = ws_client_parse_handshake_resp(client);
                if (ret == WS_CLIENT_PROTOCOL_ERROR)
                    client->state = WS_ERROR;
                if (ret == WS_CLIENT_PARSING_DONE && client->state == WS_ESTABLISHED)
                    ret = WS_CLIENT_NEED_MORE_BYTES;
            } while (!ret);
            break;
        case WS_ESTABLISHED:
            do {
                ret = ws_client_process_rx_ws(client);
                switch(ret) {
                    case WS_CLIENT_PROTOCOL_ERROR:
                        client->state = WS_ERROR;
                        break;
                    case WS_CLIENT_CONNECTION_CLOSED:
                        client->state = WS_CONN_CLOSED_GRACEFUL;
                        break;
                }
                // if ret == 0 we can continue parsing
                // if ret == WS_CLIENT_PARSING_DONE we processed
                //    one websocket packet and attempt processing
                //    next one if data available in the buffer
            } while (!ret || ret == WS_CLIENT_PARSING_DONE);
            break;
        case WS_ERROR:
            ERROR("ws_client is in error state. Restart the connection!");
            return WS_CLIENT_PROTOCOL_ERROR;
        case WS_CONN_CLOSED_GRACEFUL:
            ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
            return WS_CLIENT_CONNECTION_CLOSED;
        default:
            FATAL("Unknown connection state! Probably memory corruption.");
            return WS_CLIENT_INTERNAL_ERROR;
    }
    return ret;
}