symplely/coroutine

View on GitHub
Coroutine/Networks.php

Summary

Maintainability
F
5 days
Test Coverage
<?php

declare(strict_types=1);

namespace Async;

use Async\Kernel;
use Async\TaskInterface;
use Async\CoroutineInterface;
use Async\Network\OptionsInterface;
use Async\Network\Sockets;
use Async\Network\SSLContext;
use Async\Network\SocketsInterface;

use function Async\Socket\net_getaddrinfo;
use function Async\Socket\create_ssl_context;

/**
 * A general purpose networking class where the `uri` _scheme_ is detected and direct usage.
 * - This allows either a **libuv** object or built-in **native** resource/object usage.
 */
final class Networks
{
  protected static $isRunning = [];

  /**
   * Check for `libuv` for network operations.
   *
   * @return bool
   */
  protected static function isUv(): bool
  {
    $co = \coroutine();
    return ($co instanceof CoroutineInterface && $co->isUv() && Co::uvNative());
  }

  /**
   * Connect to *address* and return the `sockets` object.
   *
   * - Connect to *address* (a 2-tuple ``(host, port)``) and return the socket object.
   * - Passing the optional *timeout* parameter will set the timeout on the socket instance
   * before attempting to connect. If no *timeout* is supplied, the
   * global default timeout setting is used.
   * - If *source_address* is set it must be a tuple of (host, port) for the socket to `bind` as a source address
   * before making the connection.
   *
   * A _host_ of `''` or _port_ `0` tells the OS to use the default, `localhost`.
   * - This function needs to be prefixed with `yield`
   *
   * @param string $address
   * @param integer $timeout
   * @param string $source_address
   * @return Sockets
   * @source https://github.com/dabeaz/curio/blob/27ccf4d130dd8c048e28bd15a22015bce3f55d53/curio/socket.py#L41
   */
  public static function create_connection(string $address, int $timeout = 1, string $source_address = None)
  {
    [$parts, $uri, $ip, $addrInfo] = yield net_getaddrinfo($address);
    $isPipe = ($parts['scheme'] == 'file') || ($parts['scheme'] == 'unix');
    $host = $isPipe ? $parts['host'] : $ip;
    $port = $parts['port'];
    $af = $addrInfo['ai_family'];
    $socktype = $addrInfo['ai_socktype'];
    $proto = $addrInfo['ai_protocol'];
    $canonname = $addrInfo['ai_canonname'];
    $sa = $addrInfo['ai_addr']['sin_addr'];
    $sock = new Sockets(None, $timeout, $host, $port);
    $sock->create($af, $socktype, $proto);
    if ($source_address) {
      $sock->bind($source_address, $port);
    }

    yield $sock->connect($sa);
    return $sock;
  }

  /**
   * Get the IP `address` corresponding to Internet host name.
   * - This function needs to be prefixed with `yield`
   *
   * @param string $hostname
   * @param bool $useUv
   */
  public static function gethostbyname(string $hostname, bool $useUv = true)
  {
    if (!\filter_var($hostname, \FILTER_VALIDATE_DOMAIN, \FILTER_FLAG_HOSTNAME)) {
      return \value(false);
    }

    $co = \coroutine();
    if ($co instanceof CoroutineInterface && $co->isUv() && $useUv) {
      return new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($hostname) {
          $coroutine->ioAdd();
          \uv_getaddrinfo(
            $coroutine->getUV(),
            function (int $status, $result) use ($task, $coroutine) {
              $coroutine->ioRemove();
              $task->sendValue(($status < 0 ? false : $result[0]));
              $coroutine->schedule($task);
            },
            $hostname,
            '',
            [
              "ai_family" => \UV::AF_UNSPEC
            ]
          );
        }
      );
    }

    return \await('gethostbyname', $hostname);
  }

  /**
   * Get the Internet host `name` corresponding to IP address.
   * - This function needs to be prefixed with `yield`
   *
   * @param string $ipAddress
   *
   * @return string|bool
   */
  public static function gethostbyaddr(string $ipAddress)
  {
    if (!\filter_var($ipAddress, \FILTER_VALIDATE_IP)) {
      return \value(false);
    }

    return \await('gethostbyaddr', $ipAddress);
  }

  /**
   * Get DNS Resource Records associated with hostname.
   * - This function needs to be prefixed with `yield`
   *
   * @param string $hostname
   * @param int $options Constant type:
   * - `DNS_A` IPv4 Address Resource
   * - `DNS_CAA` Certification Authority Authorization Resource (available as of PHP 7.0.16 and 7.1.2)
   * - `DNS_MX` Mail Exchanger Resource
   * - `DNS_CNAME` Alias (Canonical Name) Resource
   * - `DNS_NS` Authoritative Name Server Resource
   * - `DNS_PTR` Pointer Resource
   * - `DNS_HINFO` Host Info Resource (See IANA Operating System Names for the meaning of these values)
   * - `DNS_SOA` Start of Authority Resource
   * - `DNS_TXT` Text Resource
   * - `DNS_ANY` Any Resource Record. On most systems this returns all resource records,
   * however it should not be counted upon for critical uses. Try DNS_ALL instead.
   * - `DNS_AAAA` IPv6 Address Resource
   * - `DNS_ALL` Iteratively query the name server for each available record type.
   *
   * @return array|bool
   */
  public static function dns_get_record(string $hostname, int $options = \DNS_A)
  {
    if (!\filter_var($hostname, \FILTER_VALIDATE_DOMAIN, \FILTER_FLAG_HOSTNAME)) {
      return \value(false);
    }

    return \await('dns_get_record', $hostname, $options);
  }

  /**
   * Stop/close the `net_listen` server __listening__ for new connection on specified `listen_task` **ID**.
   * - This function needs to be prefixed with `yield`
   *
   * @param int $listener Task ID
   */
  public static function stop(int $listener)
  {
    try {
      $instance = \coroutine()->getTask($listener)->getCustomData();
      self::$isRunning[$listener] = false;
      self::$isRunning['stopped'] = $listener;
      yield self::close($instance);
      yield \cancel_task($listener);
    } catch (\Throwable $e) {
    }
  }

  /**
   * Add a `listen` task handler for the **socket/stream** server, that's continuously monitored.
   *
   * This function will return `int` immediately, use with `net_listen()`.
   * - The `$handler` function will be executed every time on new incoming connections or data.
   * - Expect the `$handler` to receive `(resource|\UVStream $newConnection)`.
   *
   *  Or
   * - Expect the `$handler` to receive `($data)`. If `UVUdp`
   * - This function needs to be prefixed with `yield`
   *
   * @param callable $handler
   *
   * @return int
   */
  public static function listenerTask(callable $handler)
  {
    return Kernel::away(function () use ($handler) {
      $coroutine = \coroutine();
      $tid = yield \stateless_task();
      self::$isRunning[$tid] = true;
      while (self::$isRunning[$tid]) {
        $received = yield;
        if (\is_array($received) && (\count($received) === 3 && $received[0] === $tid)) {
          [, $callerID, $clientConnectionOrData] = $received;
          $received = null;
          $newId = yield \away(function () use ($handler, $clientConnectionOrData) {
            return yield $handler($clientConnectionOrData);
          });

          $coroutine->getTask($newId)->taskType('stateless');
        }
      }

      $task = $coroutine->getTask($callerID);
      if ($task instanceof TaskInterface) {
        $server = $coroutine->getTask($tid)->getCustomData();
        if ($server instanceof \UVStream || $server instanceof \UVUdp)
          $coroutine->ioRemove();

        $task->sendValue(true);
        $coroutine->schedule($task);
      }
    });
  }

  /**
   * @param SocketsInterface|\UVTcp|\UVUdp|\UVPipe $server
   * @param int $listenerTask
   * @param int $backlog
   */
  public static function listen($server, int $listenerTask, int $backlog = 0)
  {
    if (self::isUv() && ($server instanceof \UVStream || $server instanceof \UVUdp)) {
      return yield new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($server, $listenerTask, $backlog) {
          $task->taskType('stateless');
          $coroutine->ioAdd();
          if ($server instanceof \UVUdp) {
            \uv_udp_recv_start($server, function ($stream, $status, $data) use ($task, $coroutine, $listenerTask, $server) {
              $listen = $coroutine->getTask($listenerTask);
              $listen->customData($server);
              $listen->sendValue([$listenerTask, $task->taskId(), $data]);
              $coroutine->schedule($listen);
            });
          } else {
            $backlog = empty($backlog) ? ($server instanceof \UVTcp ? 1024 : 8192) : $backlog;
            \uv_listen($server, $backlog, function ($server, $status) use ($task, $coroutine, $listenerTask) {
              $listen = $coroutine->getTask($listenerTask);
              $uv = $coroutine->getUV();
              if ($server instanceof \UVTcp) {
                $client = \uv_tcp_init($uv);
              } elseif ($server instanceof \UVPipe) {
                $client = \uv_pipe_init($uv, \IS_WINDOWS);
              }

              \uv_accept($server, $client);
              $listen->customData($server);
              $listen->sendValue([$listenerTask, $task->taskId(), $client]);
              $coroutine->schedule($listen);
            });
          }
        }
      );
    }

    if (!$server instanceof SocketsInterface)
      return false;

    try {
      while (true) {
        $client = yield self::accept($server);
        yield self::listening($client, $listenerTask, $server);
        $isTrue = yield;
        if ($isTrue === true) {
          break;
        }
      }

      if (isset(self::$isRunning['stopped']) && self::$isRunning['stopped'] === $listenerTask)
        \debugging_info('Listening stopped at: ' . \gmdate('D, d M Y H:i:s T') . \CRLF);

      return $isTrue;
    } catch (\Throwable $error) {
      \debugging_info('Error: ' . $error->getMessage() . \CRLF);
      return false;
    }
  }

  protected static function listening($client, int $listenerTask, $server = null)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($client, $listenerTask, $server) {
        $task->taskType('stateless');
        $listen = $coroutine->getTask($listenerTask);
        if ($listen instanceof TaskInterface) {
          if (!empty($server))
            $listen->customData($server);

          $listen->sendValue([$listenerTask, $task->taskId(), $client]);
          $coroutine->schedule($listen);
        }

        $coroutine->schedule($task);
      }
    );
  }

  /**
   * @param \UVTcp|\UVTty|\UVPipe|SocketsInterface $handle
   * @param int $size
   *
   * @return string|bool
   */
  public static function read($handle, $size = -1)
  {
    if (self::isUv() && $handle instanceof \UV) {
      if (\uv_is_closing($handle))
        return false;

      return yield new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($handle) {
          if (!\uv_is_closing($handle)) {
            $coroutine->ioAdd();
            \uv_read_start(
              $handle,
              function ($handle, $nRead, $data) use ($task, $coroutine) {
                if ($nRead > 0) {
                  $coroutine->ioRemove();
                  $task->sendValue($data);
                  $coroutine->schedule($task);
                  \uv_read_stop($handle);
                }
              }
            );
          } else {
            $task->sendValue(false);
            $coroutine->schedule($task);
          }
        }
      );
    }

    if ($handle instanceof SocketsInterface)
      return yield $handle->read($size);

    return false;
  }

  /**
   * @param \UV|SocketsInterface $handle
   * @param mixed $data
   *
   * @return int|bool
   */
  public static function write($handle, $data = '')
  {
    if (self::isUv() && $handle instanceof \UV) {
      if (\uv_is_closing($handle))
        return false;

      return yield new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($handle, $data) {
          if (!\uv_is_closing($handle)) {
            $size = \strlen($data);
            $coroutine->ioAdd();
            \uv_write(
              $handle,
              $data,
              function ($handle, $status) use ($task, $coroutine, $size) {
                $coroutine->ioRemove();
                $task->sendValue(($status === 0 ? $size : $status));
                $coroutine->schedule($task);
              }
            );
          } else {
            $task->sendValue(false);
            $coroutine->schedule($task);
          }
        }
      );
    }

    if ($handle instanceof SocketsInterface)
      return yield $handle->write($data);

    return false;
  }

  /**
   * @param \UV|SocketsInterface $handle
   * @return bool
   */
  public static function close($handle)
  {
    if ($handle instanceof \UV) {
      return new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($handle) {
          $coroutine->ioAdd();
          \uv_close(
            $handle,
            function ($handle, $status = null) use ($task, $coroutine) {
              $coroutine->ioRemove();
              $task->sendValue($status);
              $coroutine->schedule($task);
              unset($handle);
            }
          );
        }
      );
    }

    if ($handle instanceof SocketsInterface)
      return $handle->close();

    return false;
  }

  /**
   * - This function needs to be prefixed with `yield`
   *
   * @param string|null|int $uri
   * @param OptionsInterface $context
   *
   * @return \UVTcp|\UVUdp|\UVPipe|SocketsInterface
   */
  public static function client($uri = null, OptionsInterface $context = null)
  {
    [$parts, $uri, $ip,] = yield net_getaddrinfo((string)$uri);
    $isSSL = \in_array($parts['scheme'], ['https', 'wss', 'tls', 'ssl']) || $parts['port'] === 443
      || ($context instanceof SSLContext);
    $retry = 0;
    while (true) {
      if (self::isUv() && !$isSSL) {
        $isPipe = ($parts['scheme'] == 'file') || ($parts['scheme'] == 'unix');
        $address = $isPipe ? $parts['host'] : $ip;
        $client = yield self::connect($parts['scheme'], $address, $parts['port'], $context);
      } else {
        $ctx = \stream_context_create();
        if ($isSSL) {
          $context = ($context instanceof SSLContext)
            ? $context : yield create_ssl_context(
              \CLIENT_AUTH,
              $parts['host'],
              (empty($context) ? [] : $context)
            );

          $mask = \STREAM_CLIENT_ASYNC_CONNECT | \STREAM_CLIENT_CONNECT;
        } else {
          $mask = \STREAM_CLIENT_CONNECT;
        }

        $ctx = $context instanceof OptionsInterface ? $context() : $ctx;
        $client = @\stream_socket_client(
          $uri,
          $errNo,
          $errStr,
          1,
          $mask,
          $ctx
        );

        if (\is_resource($client)) {
          \stream_set_blocking($client, false);

          if ($context instanceof SSLContext) {
            $clientSocket = \socket_import_stream($client);
            while (true) {
              yield Kernel::writeWait($client);
              $enabled = @\stream_socket_enable_crypto($client, true, $context->get_crypto());
              if ($enabled === false)
                throw new \RuntimeException(\sprintf('Failed to enable socket encryption: %s', \error_get_last()['message'] ?? ''));
              if ($enabled === true)
                break;
            }
          }
        }
      }

      if (!$client) {
        if ($retry < 3) {
          $retry++;
          yield;
          yield;
          continue;
        }

        if (isset($errStr))
          throw new \RuntimeException(\sprintf('Failed to connect to %s: %s, %d', $uri, $errStr, $errNo));
        else
          throw new \RuntimeException(\sprintf('Failed to connect to: %s', $uri));
      } else {
        break;
      }
    }

    if (!$client instanceof \UV)
      $client = ($context instanceof SSLContext) ? $context->wrap_socket($client, $clientSocket) : new Sockets($client);

    return $client;
  }

  /**
   * Creates a `secure` or `plaintext` server and starts listening on the given address.
   * This starts accepting new incoming connections on the given address.
   *
   * - The TCP/IP socket server `type` depends on _uri_ **scheme**, `wss, https, udp, etc...` used.
   * - This function needs to be prefixed with `yield`
   *
   * @param string|null|int $uri
   * @param OptionsInterface $context
   * @param string $capath `directory` to hostname certificate
   * @param string $cafile hostname `certificate`
   * @param array $caSelfDetails - for a self signed certificate, will be created automatically if no hostname certificate available.
   *```md
   *  [
   *      "countryName" =>  '',
   *      "stateOrProvinceName" => '',
   *      "localityName" => '',
   *      "organizationName" => '',
   *      "organizationalUnitName" => '',
   *      "commonName" => '',
   *      "emailAddress" => ''
   *  ];
   *```
   * @throws InvalidArgumentException if the listening address is invalid
   * @throws RuntimeException if listening on this address fails (already in use etc.)
   *
   * @return \UVTcp|\UVUdp|\UVPipe|SocketsInterface
   */
  public static function server(
    $uri,
    ?OptionsInterface $context = null,
    ?string $capath = None,
    ?string $cafile = None,
    array $caSelfDetails = []
  ) {
    [$parts, $uri, $ip,] = yield net_getaddrinfo((string)$uri);
    $isSSL = \in_array($parts['scheme'], ['https', 'wss', 'tls', 'ssl']) || $parts['port'] === 443
      || ($context instanceof SSLContext);
    if (self::isUv() && !$isSSL) {
      $isPipe = ($parts['scheme'] == 'file') || ($parts['scheme'] == 'unix');
      $address = $isPipe ? $parts['host'] : $ip;
      $server = self::bind($parts['scheme'], $address, $parts['port']);
    } else {
      $ctx = \stream_context_create();
      if ($isSSL)
        $context = ($context instanceof SSLContext)
          ? $context : yield create_ssl_context(
            \SERVER_AUTH,
            $parts['host'],
            (empty($context) ? [] : $context),
            $capath,
            $cafile,
            $caSelfDetails
          );

      if ($context instanceof OptionsInterface)
        $ctx = $context();

      $flags = $parts['scheme'] === 'udp' ? \STREAM_SERVER_BIND : \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN;
      #create a stream server on IP:Port
      $server = \stream_socket_server(
        $uri,
        $errNo,
        $errStr,
        $flags,
        $ctx
      );

      \stream_set_blocking($server, false);
    }

    if (!$server) {
      if (isset($errStr))
        throw new \RuntimeException('Failed to listen on "' . $uri . '": ' . $errStr, $errNo);
      else
        throw new \RuntimeException('Failed to listen on "' . $uri);
    }

    if ($context instanceof SSLContext) {
      $serverSocket = \socket_import_stream($server);
      \stream_socket_enable_crypto($server, false, $context->get_crypto());
    }

    \debugging_info("Listening to {$uri} for connections started at: " . \gmdate('D, d M Y H:i:s T') . \CRLF);
    if (!$server instanceof \UV)
      $server = ($context instanceof SSLContext) ? $context->wrap_socket($server, $serverSocket) : new Sockets($server);

    return $server;
  }

  public static function bind(string $scheme, string $address, int $port)
  {
    $uv = \coroutine()->getUV();
    $ip = (\strpos($address, ':') === false)
      ? \uv_ip4_addr($address, $port)
      : \uv_ip6_addr($address, $port);

    switch ($scheme) {
      case 'file':
      case 'unix':
        $handle = \uv_pipe_init($uv, \IS_WINDOWS);
        \uv_pipe_bind($handle, $address);
        break;
      case 'udp':
        $handle = \uv_udp_init($uv);
        \uv_udp_bind($handle, $ip);
        break;
      case 'tcp':
      case 'tls':
      case 'ftp':
      case 'ftps':
      case 'ssl':
      case 'http':
      case 'https':
      default:
        $handle = \uv_tcp_init($uv);
        \uv_tcp_bind($handle, $ip);
        break;
    }

    return $handle;
  }

  public static function accept($server)
  {
    if (self::isUv() && $server instanceof \UV) {
      return yield new Kernel(
        function (TaskInterface $task, CoroutineInterface $coroutine) use ($server) {
          $task->customData($server);
          $task->taskType('stateless');
          $coroutine->ioAdd();
          if ($server instanceof \UVUdp) {
            \uv_udp_recv_start($server, function ($stream, $status, $data) use ($task, $coroutine) {
              $task->customData($stream);
              $task->sendValue($data);
              $coroutine->ioRemove();
              $coroutine->schedule($task);
            });
          } else {
            $backlog = $server instanceof \UVTcp ? 1024 : 8192;
            \uv_listen($server, $backlog, function ($server, $status) use ($task, $coroutine) {
              $coroutine->ioRemove();
              $uv = $coroutine->getUV();
              if ($server instanceof \UVTcp) {
                $client = \uv_tcp_init($uv);
              } elseif ($server instanceof \UVPipe) {
                $client = \uv_pipe_init($uv, \IS_WINDOWS);
              }

              \uv_accept($server, $client);
              $task->customData($server);
              $task->sendValue($client);
              $coroutine->ioRemove();
              $coroutine->schedule($task);
            });
          }
        }
      );
    }

    if ($server instanceof SocketsInterface)
      return yield $server->accept();

    return false;
  }

  /**
   * Get the address of the remote connected handle.
   *
   * @param UVTcp|UVUdp|SocketsInterface $handle
   * @return string|bool
   */
  public static function peer($handle)
  {
    if ($handle instanceof \UVTcp) {
      $peer = \uv_tcp_getpeername($handle);
      return $peer['address'] . ':' . $peer['port'];
    } elseif ($handle instanceof \UVUdp) {
      $peer = \uv_udp_getsockname($handle);
      return $peer['address'] . ':' . $peer['port'];
    }

    if ($handle instanceof SocketsInterface)
      return $handle->get_peer();

    return false;
  }

  /**
   * Get the address of the local handle.
   *
   * @param UVTcp|SocketsInterface $handle
   * @return string|bool
   */
  public static function local($handle)
  {
    if ($handle instanceof \UVTcp) {
      $local = \uv_tcp_getsockname($handle);
      return $local['address'] . ':' . $local['port'];
    } elseif ($handle instanceof \UVUdp) {
      $peer = \uv_udp_getsockname($handle);
      return $peer['address'] . ':' . $peer['port'];
    }

    if ($handle instanceof SocketsInterface)
      return $handle->get_local();

    return false;
  }

  public static function connect(string $scheme, string $address, int $port, $data = null)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($scheme, $address, $port, $data) {
        $callback =  function ($client, $status) use ($task, $coroutine) {
          $coroutine->ioRemove();
          if (\is_int($status))
            $task->sendValue(($status < 0 ? false : $client));
          else
            $task->sendValue(($client < 0 ? false : $status));

          $coroutine->schedule($task);
        };

        $coroutine->ioAdd();
        $uv = $coroutine->getUV();
        $ip = (\strpos($address, ':') === false)
          ? @\uv_ip4_addr($address, $port)
          : @\uv_ip6_addr($address, $port);

        switch ($scheme) {
          case 'file':
          case 'unix':
            $client = \uv_pipe_init($uv, \IS_WINDOWS);
            \uv_pipe_connect($client, $address, $callback);
            break;
          case 'udp':
            $client = \uv_udp_init($uv);
            \uv_udp_send($client, $data, $ip, $callback);
            break;
          case 'tcp':
          case 'tls':
          case 'ftp':
          case 'ftps':
          case 'ssl':
          case 'http':
          case 'https':
          default:
            $client = \uv_tcp_init($uv);
            \uv_tcp_connect($client, $ip, $callback);
            break;
        }
      }
    );
  }
}