seregazhuk/php-react-memcached

View on GitHub
src/Client.php

Summary

Maintainability
A
25 mins
Test Coverage
<?php

namespace seregazhuk\React\Memcached;

use Evenement\EventEmitter;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use seregazhuk\React\Memcached\Connection\Connection;
use seregazhuk\React\Memcached\Exception\ConnectionClosedException;
use seregazhuk\React\Memcached\Exception\Exception;
use seregazhuk\React\Memcached\Exception\WrongCommandException;
use seregazhuk\React\Memcached\Protocol\Parser;
use seregazhuk\React\Memcached\Request\Request;
use seregazhuk\React\Memcached\Request\RequestsPool;

/**
 * @method PromiseInterface set(string $key, mixed $value, int $flag = 0, int $exp = 0)
 * @method PromiseInterface version()
 * @method PromiseInterface verbosity(int $level)
 * @method PromiseInterface flushAll()
 * @method PromiseInterface get($key)
 * @method PromiseInterface delete($key)
 * @method PromiseInterface replace($key, $value)
 * @method PromiseInterface incr($key, $value)
 * @method PromiseInterface decr($key, $value)
 * @method PromiseInterface stats()
 * @method PromiseInterface touch($key, $exp)
 * @method PromiseInterface add($key, $value)
 */
class Client extends EventEmitter
{
    /**
     * @var Parser
     */
    private $parser;

    /**
     * @var RequestsPool
     */
    private $pool;

    /**
     * Indicates that the connection is closed.
     *
     * @var bool
     */
    private $isClosed = false;

    /**
     * Indicates that we don't accept new requests but we are still waiting for
     * pending requests to be resolved.
     *
     * @var bool
     */
    private $isEnding = false;

    /**
     * @var Connection
     */
    private $connection;

    /**
     * @param Connection $connection
     * @param Parser $parser
     */
    public function __construct(Connection $connection, Parser $parser)
    {
        $this->parser = $parser;
        $this->connection = $connection;
        $this->pool = new RequestsPool();

        $this->setConnectionHandlers();
    }

    protected function setConnectionHandlers(): void
    {
        $this->connection->on('data', function ($chunk) {
            $parsed = $this->parser->parseRawResponse($chunk);
            $this->resolveRequests($parsed);
        });

        $this->connection->on('failed', function () {
            $this->pool->rejectAll(new ConnectionClosedException());
        });

        $this->connection->on('close', function () {
            if (!$this->isEnding) {
                $this->emit('error', [new ConnectionClosedException()]);
            }
        });
    }

    /**
     * @param string $name
     * @param array $args
     * @return Promise|PromiseInterface
     */
    public function __call(string $name, array $args)
    {
        $request = new Request($name);

        if ($this->isEnding) {
            $request->reject(new ConnectionClosedException());
        } else {
            try {
                $command = $this->parser->makeCommand($name, $args);
                $this->connection->write($command);
                $this->pool->add($request);
            } catch (WrongCommandException $e) {
                $request->reject($e);
            }
        }

        return $request->promise();
    }

    /**
     * @param string[] $responses
     * @throws Exception
     */
    public function resolveRequests(array $responses): void
    {
        if ($this->pool->isEmpty()) {
            throw new Exception('Received unexpected response, no matching request found');
        }

        foreach ($responses as $response) {
            $request = $this->pool->shift();

            try {
                $parsedResponse = $this->parser->parseResponse($request->command(), $response);
                $request->resolve($parsedResponse);
            } catch (WrongCommandException $exception) {
                $request->reject($exception);
            }
        }

        if ($this->isEnding && $this->pool->isEmpty()) {
            $this->close();
        }
    }

    /**
     * Closes the connection when all requests are resolved
     */
    public function end(): void
    {
        $this->isEnding = true;

        if ($this->pool->isEmpty()) {
            $this->close();
        }
    }

    /**
     * Forces closing the connection and rejects all pending requests
     */
    public function close(): void
    {
        if ($this->isClosed) {
            return;
        }

        $this->isEnding = true;
        $this->isClosed = true;

        $this->connection->close();
        $this->emit('close');

        $this->pool->rejectAll(new ConnectionClosedException());
    }
}