ekinhbayar/gitamp

View on GitHub
src/Websocket/Handler.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php declare(strict_types=1);

namespace ekinhbayar\GitAmp\Websocket;

use Amp\Delayed;
use Amp\Http\Server\HttpServer;
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Client;
use Amp\Websocket\Server\ClientHandler;
use Amp\Websocket\Server\Gateway;
use Amp\Websocket\Server\WebsocketServerObserver;
use ekinhbayar\GitAmp\Configuration;
use ekinhbayar\GitAmp\Provider\Listener;
use ekinhbayar\GitAmp\Response\Results;
use Psr\Log\LoggerInterface;
use function Amp\asyncCall;
use function Amp\call;

class Handler implements ClientHandler, WebsocketServerObserver
{
    private Gateway $gateway;

    private Listener $provider;

    private Configuration $configuration;

    private Results $lastEvents;

    private LoggerInterface $logger;

    public function __construct(
        Listener $provider,
        Configuration $configuration,
        Results $results,
        LoggerInterface $logger
    ) {
        $this->provider      = $provider;
        $this->configuration = $configuration;
        $this->lastEvents    = $results;
        $this->logger        = $logger;
    }

    public function handleHandshake(Gateway $gateway, Request $request, Response $response): Promise
    {
        if (!$this->configuration->websocketAddressExists($request->getHeader('origin'))) {
            return $gateway->getErrorHandler()->handleError(Status::FORBIDDEN, 'Forbidden Origin', $request);
        }

        return new Success($response);
    }

    public function handleClient(Gateway $gateway, Client $client, Request $request, Response $response): Promise
    {
        return call(function () use ($gateway, $client, $request, $response) {
            $client->onClose(function (Client $client, int $code, string $reason) use ($gateway) {
                yield $this->processDisconnectingClient($gateway, $client, $code, $reason);
            });

            $this->logger->info(
                \sprintf('Client %d connected. Total clients: %d', $client->getId(), count($gateway->getClients())),
            );

            yield $this->sendConnectedUsersCount(\count($gateway->getClients()));

            $client->send($this->lastEvents->jsonEncode());

            yield $client->receive();
        });
    }

    private function processDisconnectingClient(Gateway $gateway, Client  $client, int $code, string $reason): Promise
    {
        return call(function () use ($gateway, $client, $code, $reason) {
            $this->logger->info(
                \sprintf(
                    'Client %d disconnected. Code: %d Reason: %s. Total clients: %d',
                    $client->getId(),
                    $code,
                    $reason,
                    count($gateway->getClients()),
                )
            );

            yield $this->sendConnectedUsersCount(count($gateway->getClients()));
        });
    }

    private function emit(Results $events): Promise
    {
        if (!$events->hasEvents()) {
            return new Success();
        }

        $this->lastEvents = $events;

        return $this->gateway->broadcast($events->jsonEncode());
    }

    private function sendConnectedUsersCount(int $count): Promise
    {
        return $this->gateway->broadcast(\json_encode(['connectedUsers' => $count]));
    }

    public function onStart(HttpServer $server, Gateway $gateway): Promise
    {
        $this->gateway = $gateway;

        asyncCall(function () {
            while (true) {
                $this->emit(yield $this->provider->listen());

                yield new Delayed(25000);
            }
        });

        return new Success();
    }

    public function onStop(HttpServer $server, Gateway $gateway): Promise
    {
        return new Success();
    }
}