sokil/php-clickhouse

View on GitHub
src/Connection/StreamConnection.php

Summary

Maintainability
B
4 hrs
Test Coverage
<?php
declare(strict_types=1);

namespace Sokil\ClickHouse\Connection;

use Sokil\ClickHouse\Connection\Exception\ConnectError;
use Sokil\ClickHouse\Connection\Exception\QueryError;

class StreamConnection extends AbstractConnection
{
    private const READ_BUFFER_LENGTH = 2048;

    /**
     * @var resource
     */
    private $stream;

    /**
     * @return resource
     *
     * @throws ConnectError
     */
    private function getStream()
    {
        if ($this->stream) {
            return $this->stream;
        }

        // suppress warning when connection now allowed
        set_error_handler(
            function ($type, $errorMessage) {
                throw new ConnectError(
                    sprintf('Connection error: %s', $errorMessage)
                );
            }
        );

        try {
            $socket = stream_socket_client(
                sprintf('tcp://%s:%s', $this->getHost(), $this->getPort()),
                $errorCode,
                $errorMessage,
                $this->getConnectionTimeoutMs(),
                STREAM_CLIENT_CONNECT
            );

            if ($socket === false) {
                throw new ConnectError(
                    sprintf('Connection error: %s', $errorMessage),
                    $errorCode
                );
            }
        } finally {
            restore_error_handler();
        }

        // blocking mode
        stream_set_blocking($socket, true);

        // read/write timeout
        $sendReceiveTimeout = $this->getTimevalStruct($this->getRequestTimeoutMs());

        stream_set_timeout($socket, $sendReceiveTimeout->getSeconds(), $sendReceiveTimeout->getMicroSeconds());

        $this->stream = $socket;

        return $this->stream;
    }

    public function execute(string $query): string
    {
        $query .= "\r\n";

        // create socket
        $stream = $this->getStream();

        // prepare HTTP QUERY
        $request =
            'POST / HTTP/1.1' .
            "\r\n" .
            implode(
                "\r\n",
                [
                    'Content-Length: ' . strlen($query),
                    'Connection: Keep-Alive',
                    'User-Agent: SOKIL/PHP-CLICKHOUSE:0.0.1',
                    'Content-Type: text/plain; charset=UTF-8',
                ]
            ) .
            "\r\n" .
            "\r\n" .
            $query;

        // send request
        if (stream_socket_sendto($stream, $request) === false) {
            throw new ConnectError('Socket write error');
        }

        // read response
        $response = '';
        while (true) {
            $responseChunk = stream_socket_recvfrom($stream, self::READ_BUFFER_LENGTH);
            if ($responseChunk === false || $responseChunk === '') {
                break;
            }

            $response .= $responseChunk;

            if (strlen($responseChunk) < self::READ_BUFFER_LENGTH) {
                break;
            }
        }

        // get response code
        if (preg_match('~HTTP/\d\.\d (\d{3})~', substr($response, 0, 12), $matches)) {
            $responseCode = (int)$matches[1];

            if ($responseCode !== 200) {
                throw new QueryError($response, $responseCode);
            }
        } else {
            throw new ConnectError(sprintf('Invalid HTTP response: "%s"', $response));
        }

        return $response;
    }

    /**
     * Wait socket
     *
     * @param resource $stream
     */
    private function wait($stream) : void
    {
        $read = [$stream];
        $write = [$stream];
        $except = [];

        $sendReceiveTimeout = $this->getTimevalStruct($this->getRequestTimeoutMs());

        $changedSocketCount = stream_select(
            $read,
            $write,
            $except,
            $sendReceiveTimeout->getSeconds(),
            $sendReceiveTimeout->getMicroSeconds()
        );

        if ($changedSocketCount === false) {
            throw new ConnectError('Receive response error');
        }

        if ($changedSocketCount === 0) {
            throw new ConnectError('Socket select timeout');
        }
    }
}