phergie/phergie-irc-client-react

View on GitHub
src/Client.php

Summary

Maintainability
C
1 day
Test Coverage
<?php
/**
 * Phergie (http://phergie.org)
 *
 * @link http://github.com/phergie/phergie-irc-client-react for the canonical source repository
 * @copyright Copyright (c) 2008-2014 Phergie Development Team (http://phergie.org)
 * @license http://phergie.org/license Simplified BSD License
 * @package Phergie\Irc\Client\React
 */

namespace Phergie\Irc\Client\React;

use Evenement\EventEmitter;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LoggerInterface;
use Phergie\Irc\ConnectionInterface;
use React\Dns\Resolver\Factory;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Stream\DuplexResourceStream;
use React\Stream\DuplexStreamInterface;
use React\Stream\Stream;

/**
 * IRC client implementation based on the React component library.
 *
 * @category Phergie
 * @package Phergie\Irc\Client\React
 */
class Client extends EventEmitter implements
    ClientInterface,
    LoopAwareInterface,
    LoopAccessorInterface
{
    /**
     * Event loop
     *
     * @var \React\EventLoop\LoopInterface
     */
    protected $loop;

    /**
     * Logging stream
     *
     * @var \Psr\Log\LoggerInterface
     */
    protected $logger;

    /**
     * Interval in seconds between irc.tick events
     *
     * @var float
     */
    protected $tickInterval = 0.2;

    /**
     * Connector used to establish SSL connections
     *
     * @var \React\Socket\SecureConnector
     */
    protected $secureConnector;

    /**
     * @var \React\Dns\Resolver\Resolver
     */
    protected $resolver;

    /**
     * @var string
     */
    protected $dnsServer = '8.8.8.8';

    /**
     * Contains all connection instances associated with an active socket stream
     *
     * @var \SplObjectStorage
     */
    protected $activeConnections;

    /**
     * Sets the event loop dependency.
     *
     * @param \React\EventLoop\LoopInterface $loop
     */
    public function setLoop(LoopInterface $loop)
    {
        $this->loop = $loop;
    }

    /**
     * Returns the event loop dependency, initializing it if needed.
     *
     * @return \React\EventLoop\LoopInterface
     */
    public function getLoop()
    {
        if (!$this->loop) {
            $this->loop = \React\EventLoop\Factory::create();
        }
        return $this->loop;
    }

    /**
     * Sets the DNS Resolver.
     *
     * @param Resolver $resolver
     */
    public function setResolver(Resolver $resolver = null)
    {
        $this->resolver = $resolver;
    }

    /**
     * Get the DNS Resolver, if one isn't set in instance will be created.
     *
     * @return Resolver
     */
    public function getResolver()
    {
        if ($this->resolver instanceof Resolver) {
            return $this->resolver;
        }

        $factory = new Factory();
        $this->resolver = $factory->createCached($this->getDnsServer(), $this->getLoop());

        return $this->resolver;
    }

    /**
     * Set the DNS server to use when looking up IP's
     *
     * @param string $dnsServer
     */
    public function setDnsServer($dnsServer = '8.8.8.8')
    {
        $this->dnsServer = $dnsServer;
    }

    /**
     * Returns the configured DNS server
     *
     * @return string
     */
    public function getDnsServer()
    {
        return $this->dnsServer;
    }

    /**
     * Sets the interval in seconds between irc.tick events.
     *
     * @param float $tickInterval
     */
    public function setTickInterval($tickInterval)
    {
        $this->tickInterval = (float) $tickInterval;
    }

    /**
     * Returns the interval in seconds between irc.tick events.
     *
     * @return float
     */
    public function getTickInterval()
    {
        return $this->tickInterval;
    }

    /**
     * Add a connection instance to the active connection store
     *
     * @param \Phergie\Irc\ConnectionInterface
     */
    protected function addActiveConnection(ConnectionInterface $connection)
    {
        if (!$this->activeConnections) {
            $this->activeConnections = new \SplObjectStorage;
        }
        $this->activeConnections->attach($connection);
    }

    /**
     * Remove a connection instance from the active connections store
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     */
    protected function removeActiveConnection(ConnectionInterface $connection)
    {
        if ($this->activeConnections) {
            $this->activeConnections->detach($connection);
        }
    }

    /**
     * Gets an array of all connections associated with an active socket stream.
     *
     * @return \Phergie\Irc\ConnectionInterface[]
     */
    public function getActiveConnections()
    {
        return $this->activeConnections ? iterator_to_array($this->activeConnections, false) : array();
    }

    /**
     * Returns a socket configured for a specified remote.
     *
     * @param string $remote Address to connect the socket to (e.g. tcp://irc.freenode.net:6667)
     * @param array $context Context options for the socket
     * @return resource
     * @throws \Phergie\Irc\Client\React\Exception if unable to connect to the specified remote
     */
    protected function getSocket($remote, array $context)
    {
        $socket = stream_socket_client(
            $remote,
            $errno,
            $errstr,
            ini_get('default_socket_timeout'),
            STREAM_CLIENT_CONNECT,
            stream_context_create($context)
        );

        if (!$socket) {
            throw new Exception(
                'Unable to connect to remote ' . $remote .
                    ': socket error ' . $errno . ' ' . $errstr,
                Exception::ERR_CONNECTION_ATTEMPT_FAILED
            );
        }

        stream_set_blocking($socket, 0);
        return $socket;
    }

    /**
     * Derives the transport for a given connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return string Transport usable in the URI for a stream
     */
    protected function getTransport(ConnectionInterface $connection)
    {
        return (string) $connection->getOption('transport') ?: 'tcp';
    }

    /**
     * Extracts the value of the force-ipv4 option from a given connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return boolean TRUE to force use of IPv4, FALSE otherwise
     */
    protected function getForceIpv4Flag(ConnectionInterface $connection)
    {
        return (boolean) $connection->getOption('force-ipv4') ?: false;
    }

    /**
     * Extracts the value of the allow-self-signed option from a given connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return boolean TRUE to allow self signed certificates, FALSE otherwise
     */
    protected function getAllowSelfSignedFlag(ConnectionInterface $connection)
    {
        return (boolean) $connection->getOption('allow-self-signed') ?: false;
    }

    /**
     * Derives a remote for a given connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return string Remote usable to establish a socket connection
     */
    protected function getRemote(ConnectionInterface $connection)
    {
        $hostname = $connection->getServerHostname();
        $port = $connection->getServerPort();

        $deferred = new Deferred();
        $this->resolveHostname($hostname)
            ->then(
                function($ip) use($deferred, $port) {
                    $deferred->resolve('tcp://' . $ip . ':' . $port);
                },
                function($error) use ($deferred) {
                    $deferred->reject($error);
                }
            );

        return $deferred->promise();
    }

    /**
     * Resolve an IP address from a hostname
     *
     * @param string $hostname
     * @return PromiseInterface promise resolving to the hostname's IP
     */
    protected function resolveHostname($hostname)
    {
        if (false !== filter_var($hostname, FILTER_VALIDATE_IP)) {
            return \React\Promise\resolve($hostname);
        }

        return $this->getResolver()->resolve($hostname);
    }

    /**
     * Derives a set of socket context options for a given connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return array Associative array of context option key-value pairs
     */
    protected function getContext(ConnectionInterface $connection)
    {
        $context = array();
        if ($this->getForceIpv4Flag($connection)) {
            $context['bindto'] = '0.0.0.0:0';
        }
        $context = array('socket' => $context);
        return $context;
    }

    /**
     * Derives a set of socket context options for a given secure connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @return array Associative array of context option key-value pairs
     */
    protected function getSecureContext(ConnectionInterface $connection)
    {
        $context = array();
        if ($this->getForceIpv4Flag($connection)) {
            $context['bindto'] = '0.0.0.0:0';
        }
        if ($this->getAllowSelfSignedFlag($connection)) {
            $context['allow_self_signed'] = true;
        }
        return $context;
    }

    /**
     * Returns a stream for a socket connection.
     *
     * @param resource $socket Socket for the connection to the server
     * @param \React\EventLoop\LoopInterface $loop Event loop
     * @return \React\Stream\DuplexStreamInterface
     */
    protected function getStream($socket)
    {
        if (class_exists(DuplexResourceStream::class)) {
            return new DuplexResourceStream($socket, $this->getLoop());
        }

        return new Stream($socket, $this->getLoop());
    }

    /**
     * Generates a closure for stream output logging.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @param string $level Evenement logging level to use
     * @param string $prefix Prefix string for log lines (optional)
     * @return callable
     * @throws \DomainException if $level is not a valid logging level
     */
    protected function getOutputLogCallback(ConnectionInterface $connection, $level, $prefix = null)
    {
        $logger = $this->getLogger();
        if (!method_exists($logger, $level)) {
            throw new \DomainException("Invalid log level '$level'");
        }
        return function($message) use ($connection, $level, $prefix, $logger) {
            $output = sprintf('%s %s%s', $connection->getMask(), $prefix, trim($message));
            call_user_func(array($logger, $level), $output);
        };
    }

    /**
     * Adds an event listener to log data emitted by a stream.
     *
     * @param \Evenement\EventEmitter $emitter
     * @param \Phergie\Irc\ConnectionInterface $connection Connection
     *        corresponding to the stream
     */
    protected function addLogging(EventEmitter $emitter, ConnectionInterface $connection)
    {
        $emitter->on('data', $this->getOutputLogCallback($connection, 'debug'));
        $emitter->on('error', $this->getOutputLogCallback($connection, 'notice'));
    }

    /**
     * Returns a stream instance for parsing messages from the server and
     * emitting them as events.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Connection
     *        corresponding to the read stream
     * @return \Phergie\Irc\Client\React\ReadStream
     */
    protected function getReadStream(ConnectionInterface $connection)
    {
        $read = new ReadStream();
        $this->addLogging($read, $connection);
        $read->on('invalid', $this->getOutputLogCallback($connection, 'notice', 'Parser unable to parse line: '));
        return $read;
    }

    /**
     * Returns a stream instance for sending events to the server.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Connection
     *        corresponding to the read stream
     * @return \Phergie\Irc\Client\React\WriteStream
     */
    protected function getWriteStream(ConnectionInterface $connection)
    {
        $write = new WriteStream();
        $this->addLogging($write, $connection);
        return $write;
    }

    /**
     * Returns a stream instance for logging data on the socket connection.
     *
     * @return \Psr\Log\LoggerInterface
     */
    public function getLogger()
    {
        if (!$this->logger) {
            if (!defined('STDERR')) {
                // See testGetLoggerRunFromStdin
                // @codeCoverageIgnoreStart
                define('STDERR', fopen('php://stderr', 'w'));
            }
            // @codeCoverageIgnoreEnd
            $handler = new StreamHandler(STDERR, Logger::DEBUG);
            $handler->setFormatter(new LineFormatter("%datetime% %level_name% %message% %context%\n"));
            $this->logger = new Logger(get_class($this));
            $this->logger->pushHandler($handler);
        }
        return $this->logger;
    }

    /**
     * Sets a logger for logging data on the socket connection.
     *
     * @param \Psr\Log\LoggerInterface
     */
    public function setLogger(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * Returns a callback for proxying IRC events from the read stream to IRC
     * listeners of the client.
     *
     * @param \Phergie\Irc\Client\React\WriteStream $write Write stream
     *        corresponding to the read stream on which the event occurred
     * @param \Phergie\Irc\ConnectionInterface $connection Connection on
     *        which the event occurred
     * @return callable
     */
    protected function getReadCallback(WriteStream $write, ConnectionInterface $connection)
    {
        $logger = $this->getLogger();
        return function($message) use ($write, $connection, $logger) {
            $this->processInput($message, $write, $connection);
            $this->emit('irc.received', array($message, $write, $connection, $logger));
        };
    }

    /**
     * Returns a callback for proxying events from the write stream to IRC
     * listeners of the client.
     *
     * @param \Phergie\Irc\Client\React\WriteStream $write Write stream
     *        corresponding to the read stream on which the event occurred
     * @param \Phergie\Irc\ConnectionInterface $connection Connection on which
     *        the event occurred
     * @return callable
     */
    protected function getWriteCallback(WriteStream $write, ConnectionInterface $connection)
    {
        $logger = $this->getLogger();
        return function($message) use ($write, $connection, $logger) {
            $this->emit('irc.sent', array($message, $write, $connection, $logger));
        };
    }

    /**
     * Returns a callback for proxying connection error events to listeners of
     * the client.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Connection on which
     *        the error occurred
     * @return callable
     */
    protected function getErrorCallback(ConnectionInterface $connection)
    {
        $logger = $this->getLogger();
        return function($exception) use ($connection, $logger) {
            $this->emit('connect.error', array($exception, $connection, $logger));
        };
    }

    /**
     * Returns a callback for when a connection is terminated, whether
     * explicitly by the bot or server or as a result of loss of connectivity.
     *
     * @param \Phergie\Irc\Client\React\ReadStream $read Read stream for this
     *        connection
     * @param \Phergie\Irc\Client\React\WriteStream $write Write stream for this
     *        connection
     * @param \Phergie\Irc\ConnectionInterface $connection Terminated connection
     * @param \React\EventLoop\Timer\TimerInterface $timer Timer used to handle
     *        asynchronously queued events on the connection, which must be
     *        cancelled
     * @return callable
     */
    protected function getEndCallback(ReadStream $read, WriteStream $write, ConnectionInterface $connection, TimerInterface $timer)
    {
        $logger = $this->getLogger();
        return function() use ($read, $write, $connection, $timer, $logger) {
            $this->removeActiveConnection($connection);
            $this->emit('connect.end', array($connection, $logger));
            $this->cancelTimer($timer);
            $connection->clearData();
            $read->close();
            $write->close();
        };
    }

    /**
     * Returns a callback executed periodically to allow events to be sent
     * asynchronously versus in response to received or sent events.
     *
     * @param \Phergie\Irc\Client\React\WriteStream $write Stream used to
     *        send events to the server
     * @param \Phergie\Irc\ConnectionInterface $connection Connection to
     *        receive the event
     */
    protected function getTickCallback(WriteStream $write, ConnectionInterface $connection)
    {
        $logger = $this->getLogger();
        return function() use ($write, $connection, $logger) {
            $this->emit('irc.tick', array($write, $connection, $logger));
        };
    }

    /**
     * Configure streams to handle messages received from and sent to the
     * server.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Metadata for the
     *        connection over which messages are being exchanged
     * @param \React\Stream\DuplexStreamInterface $stream Stream representing the
     *        connection to the server
     * @param \Phergie\Irc\Client\React\WriteStream $write Stream used to
     *        send events to the server
     */
    protected function configureStreams(ConnectionInterface $connection, DuplexStreamInterface $stream, WriteStream $write)
    {
        $timer = $this->addPeriodicTimer($this->getTickInterval(), $this->getTickCallback($write, $connection));
        $read = $this->getReadStream($connection);
        $write->pipe($stream)->pipe($read);
        $read->on('irc.received', $this->getReadCallback($write, $connection));
        $write->on('data', $this->getWriteCallback($write, $connection));
        $end = array($stream, 'end');
        $read->on('end', $end);
        $write->on('end', $end);
        $stream->on('end', $this->getEndCallback($read, $write, $connection, $timer));
        $error = $this->getErrorCallback($connection);
        $read->on('error', $error);
        $write->on('error', $error);
    }

    /**
     * Identifies the user to a server.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Connection on which
     *        to identify the user
     * @param \Phergie\Irc\Client\React\WriteStream $writeStream Stream to
     *        receive commands identifying the user
     */
    protected function identifyUser(ConnectionInterface $connection, WriteStream $writeStream)
    {
        $password = $connection->getPassword();
        if ($password) {
            $writeStream->ircPass($password);
        }

        $writeStream->ircUser(
            $connection->getUsername(),
            $connection->getHostname(),
            $connection->getServername(),
            $connection->getRealname()
        );

        $writeStream->ircNick($connection->getNickname());
    }

    /**
     * Emits a connection error event.
     *
     * @param \Exception $exception
     * @param \Phergie\Irc\ConnectionInterface $connection
     */
    protected function emitConnectionError(\Exception $exception, ConnectionInterface $connection)
    {
        $this->emit(
            'connect.error',
            array(
                $exception->getMessage(),
                $connection,
                $this->getLogger()
            )
        );
    }

    /**
     * Initializes an unsecured connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     */
    protected function addUnsecuredConnection(ConnectionInterface $connection)
    {
        $this->getRemote($connection)->then(
            function($remote) use($connection) {
                $this->initializeConnection($remote, $connection);
            },
            function($error) use($connection) {
                $this->emitConnectionError($error, $connection);
            }
        );
    }

    /**
     * Returns a connector for establishing SSL connections.
     *
     * @return \React\Socket\SecureConnector
     */
    protected function getSecureConnector($connection)
    {
        if (!$this->secureConnector) {
            $loop = $this->getLoop();
            $connector = new \React\Socket\Connector(
                $loop,
                [
                    'dns' => $this->getResolver(),
                ]
            );
            $this->secureConnector = new \React\Socket\SecureConnector($connector, $loop, $this->getSecureContext($connection));
        }
        return $this->secureConnector;
    }

    /**
     * Initializes a secured connection.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection
     * @throws \Phergie\Irc\Client\React\Exception if the SSL transport and
     *         forcing IPv4 usage are both enabled
     */
    protected function addSecureConnection(ConnectionInterface $connection)
    {
        $hostname = $connection->getServerHostname();
        $port = $connection->getServerPort();

        $this->getSecureConnector($connection)
            ->connect($hostname . ':' . $port)
            ->then(
                function(DuplexStreamInterface $stream) use ($connection) {
                    $this->initializeStream($stream, $connection);
                    $this->emit('connect.after.each', array($connection, $connection->getData('write')));
                },
                function($error) use($connection) {
                    $this->emitConnectionError($error, $connection);
                }
            );
    }

    /**
     * Configures an established stream for a given connection.
     *
     * @param \React\Stream\DuplexStreamInterface $stream
     * @param \Phergie\Irc\ConnectionInterface $connection
     */
    protected function initializeStream(DuplexStreamInterface $stream, ConnectionInterface $connection)
    {
        try {
            $connection->setData('stream', $stream);
            $write = $this->getWriteStream($connection);
            $connection->setData('write', $write);
            $this->configureStreams($connection, $stream, $write);
            $this->identifyUser($connection, $write);
        } catch (\Exception $e) {
            $this->emitConnectionError($e, $connection);
        }
    }

    /**
     * Initializes an added connection.
     *
     * @param string $remote
     * @param \Phergie\Irc\ConnectionInterface $connection
     */
    protected function initializeConnection($remote, $connection)
    {
        try {
            $context = $this->getContext($connection);
            $socket = $this->getSocket($remote, $context);
            $stream = $this->getStream($socket);
            $this->initializeStream($stream, $connection);
            $this->addActiveConnection($connection);
        } catch (\Exception $e) {
            $this->emitConnectionError($e, $connection);
        }

        $this->emit('connect.after.each', array($connection, $connection->getData('write')));
    }

    /**
     * Initializes an IRC connection.
     *
     * Emits connect.before.each and connect.after.each events before and
     * after connection attempts are established, respectively.
     *
     * Emits a connect.error event if a connection attempt fails.
     *
     * @param \Phergie\Irc\ConnectionInterface $connection Metadata for connection to establish
     * @throws \Phergie\Irc\Client\React\Exception if unable to establish the connection
     */
    public function addConnection(ConnectionInterface $connection)
    {
        $this->emit('connect.before.each', array($connection));

        if ($this->getTransport($connection) === 'ssl') {
            $this->addSecureConnection($connection);
        } else {
            $this->addUnsecuredConnection($connection);
        }
    }

    /**
     * Executes the event loop, which continues running until no active
     * connections remain.
     *
     * @param \Phergie\Irc\ConnectionInterface|\Phergie\Irc\ConnectionInterface[] $connections
     * @param bool $autorun
     */
    public function run($connections, $autorun = true)
    {
        if (!is_array($connections)) {
            $connections = array($connections);
        }

        $this->on('connect.error', function($message, $connection, $logger) {
            $logger->error($message);
        });

        $this->emit('connect.before.all', array($connections));

        foreach ($connections as $connection) {
            $this->addConnection($connection);
        }

        $writes = array_map(
            function($connection) {
                return $connection->getData('write');
            },
            $connections
        );
        $this->emit('connect.after.all', array($connections, $writes));

        if ($autorun) {
            $this->getLoop()->run();
        }
    }

    /**
     * Adds a one-time callback to execute after a specified amount of time has
     * passed. Proxies to addTimer() implementation of the event loop
     * implementation returned by getLoop().
     *
     * @param numeric $interval Number of seconds to wait before executing
     *        callback
     * @param callable $callback Callback to execute
     * @return \React\EventLoop\Timer\TimerInterface Added timer
     */
    public function addTimer($interval, $callback)
    {
        return $this->getLoop()->addTimer($interval, $callback);
    }

    /**
     * Adds a recurring callback to execute on a specified interval. Proxies to
     * addPeriodTimer() implementation of the event loop implementation
     * returned by getLoop().
     *
     * @param numeric $interval Number of seconds to wait between executions of callback
     * @param callable $callback Callback to execute
     * @return \React\EventLoop\Timer\TimerInterface Added timer
     */
    public function addPeriodicTimer($interval, $callback)
    {
        return $this->getLoop()->addPeriodicTimer($interval, $callback);
    }

    /**
     * Cancels a specified timer created using addTimer() or
     * addPeriodicTimer(). Proxies to the cancelTimer() implementation of the
     * event loop implementation returned by getLoop().
     *
     * @param \React\EventLoop\Timer\TimerInterface $timer Timer returned by
     *        addTimer() or addPeriodicTimer()
     */
    public function cancelTimer(TimerInterface $timer)
    {
        $this->getLoop()->cancelTimer($timer);
    }

    /**
     * There are certain incoming events that the client processes internally.
     * These functions are essential to the client-server relationship: for example,
     * updating the connection's stored nickname, responding to PING events, etc.
     *
     * Any user-level IRC functionality handled internally by the client
     * should be implemented here.
     *
     * @param array $message Parser output
     * @param \Phergie\Irc\Client\React\WriteStream $write Write stream
     *        corresponding to the read stream on which the event occurred
     * @param \Phergie\Irc\ConnectionInterface $connection Connection on
     *        which the event occurred
     */
    protected function processInput(array $message, WriteStream $write, ConnectionInterface $connection)
    {
        switch ($message['command']) {
            // Update connection's internal nickname when changed
            case 'NICK':
                if ($connection->getNickname() === $message['nick']) {
                    $connection->setNickname($message['params']['nickname']);
                }
                break;

            // Play client-server ping-pong
            case 'PING':
                $write->ircPong($message['params']['server1']);
                break;
        }
    }
}