symplely/coroutine

View on GitHub
Coroutine/Misc/Network/Sockets.php

Summary

Maintainability
D
2 days
Test Coverage
<?php

declare(strict_types=1);

namespace Async\Network;

use Async\Kernel;
use Async\Coroutine;
use Async\Misc\Context;
use Async\Datatype\Tuple;
use Async\Datatype\TupleIterator;
use Async\Network\SocketsInterface;

/**
 * **Non-blocking** wrapper around a `socket` object.
 * A `Sockets` _object_ may also be used as an asynchronous `context manager` where as the underlying `socket` will automatically be closed when done.
 *
 * @see https://curio.readthedocs.io/en/latest/reference.html?highlight=TaskError#socket
 * @source https://github.com/dabeaz/curio/blob/27ccf4d130dd8c048e28bd15a22015bce3f55d53/curio/io.py#L89
 */
class Sockets extends Context implements SocketsInterface
{
  /**
   * Hints for **AddressInfo**, a `addrinfo` structure used by the `socket_addrinfo_lookup()` function to hold host address information.
   *
   *```md
   * - ai_flags:
   * AI_PASSIVE - The socket address will be used in a call to the bind function.
   * AI_CANONNAME - The canonical name is returned in the first ai_canonname member.
   * AI_NUMERICHOST - The address parameter passed to the getaddrinfo function must be a numeric string.
   * AI_ADDRCONFIG - The getaddrinfo will resolve only if a global address is configured. The IPv6 and IPv4 loopback address is not considered a valid global address.
   *
   * - ai_family:
   * AF_INET - The Internet Protocol version 4 (IPv4) address family.
   * AF_UNIX - Local communication protocol family. High efficiency and low overhead make it a great form of IPC (Interprocess Communication).
   * AF_INET6 - The Internet Protocol version 6 (IPv6) address family.
   *
   * - ai_socktype:
   * SOCK_STREAM - Provides sequenced, reliable, two-way, connection-based byte streams with an OOB data transmission mechanism. Uses the Transmission Control Protocol (TCP) for the Internet address family (AF_INET or AF_INET6).
   *
   * SOCK_DGRAM - Supports datagrams, which are connectionless, unreliable buffers of a fixed (typically small) maximum length. Uses the User Datagram Protocol (UDP) for the Internet address family (AF_INET or AF_INET6).
   *
   * SOCK_RAW - Provides a raw socket that allows an application to manipulate the next upper-layer protocol header. To manipulate the IPv4 header, the IP_HDRINCL socket option must be set on the socket. To manipulate the IPv6 header, the IPV6_HDRINCL socket option must be set on the socket.
   *
   * SOCK_SEQPACKET - Provides a pseudo-stream packet based on datagrams.
   *
   * - ai_protocol:
   * IPPROTO_TCP - The Transmission Control Protocol (TCP). This is a possible value when the ai_family member is AF_INET or AF_INET6 and the ai_socktype member is SOCK_STREAM.
   *
   * IPPROTO_UDP - The User Datagram Protocol (UDP). This is a possible value when the ai_family member is AF_INET or AF_INET6 and the type parameter is SOCK_DGRAM.
   *
   * - ai_canonname: The canonical name for the host.
   * - ai_addr: socket address array
   * -        ['sin_port' => 0, 'sin_addr' => -1]
   * -        ['sin6_port' => 0, 'sin6_addr' => -1]
   *```
   * @var array[]
   */
  const ADDR_INFO = [
    'ai_flags' => \AI_ADDRCONFIG | \AI_PASSIVE | \AI_CANONNAME,
    'ai_family' => \AF_INET,
    'ai_socktype' => \SOCK_STREAM,
    'ai_protocol' => 0
  ];

  /**
   * @var \Socket|resource
   */
  protected $socket;

  /**
   * Stream-based interface to the `socket`.
   *
   * @var resource
   */
  protected $stream;

  /**
   * The `fd` file descriptor of the underlying `socket`.
   *
   * @var resource
   */
  protected $fileno;

  /**
   * Last error on the socket
   *
   * @var integer
   */
  protected $last_error = 0;

  /**
   * `getaddrinfo`
   *
   * @var array
   */
  protected $info = [];

  protected $closed = false;
  protected $secured = false;

  /**
   * @var SSLContext
   */
  protected $instance;
  protected $address;
  protected $port;

  /**
   * @var float
   */
  protected $timeout = 1.0;

  /**
   * @param resource|\Socket $socket
   * @param int $timeout
   * @param int $port
   */
  public function __construct($socket = None, $timeout = 1)
  {
    $isSocketOrStream = (!\IS_PHP8)
      ? \get_resource_type($socket)
      : ($socket instanceof \Socket ? 'Socket' : \get_resource_type($socket));

    if ($isSocketOrStream === 'stream') {
      $stream = \socket_import_stream($socket);
      if ($stream === false)
        $this->error();

      \socket_set_nonblock($stream);
      $this->socket = $stream;
      \stream_set_read_buffer($socket, 0);
      \stream_set_write_buffer($socket, 0);
      \stream_set_timeout($socket, $timeout);
      $this->stream = $socket;
      \stream_set_blocking($this->stream, false);
      if (\stream_is_local($socket))
        $this->fileno = $socket;
    } elseif ($isSocketOrStream === 'Socket') {
      \socket_set_nonblock($socket);
      $this->socket = $socket;
      $stream = \socket_export_stream($socket);
      if ($stream === false)
        $this->error();

      \stream_set_read_buffer($stream, 0);
      \stream_set_write_buffer($stream, 0);
      \stream_set_timeout($stream, $timeout);
      $this->stream = $stream;
      \stream_set_blocking($this->stream, false);
      if (\stream_is_local($stream))
        $this->fileno = $stream;
    }

    $this->timeout = $timeout;
  }

  public function create(int $domain = \AF_INET, int $type = \SOCK_STREAM, int $protocol = \SOL_TCP): SocketsInterface
  {
    $this->close();
    $socket = \socket_create($domain, $type, $protocol);
    if ($socket === false)
      $this->error();

    $this->info = self::ADDR_INFO;
    $this->info['ai_family'] = $domain;
    $this->info['ai_socktype'] = $type;
    $this->info['ai_protocol'] = $protocol;
    \socket_set_nonblock($socket);
    $this->socket = $socket;
    $this->stream = \socket_export_stream($socket);
    \stream_set_read_buffer($this->stream, 0);
    \stream_set_write_buffer($this->stream, 0);
    \stream_set_blocking($this->stream, false);
    $this->closed = false;

    return $this;
  }

  /**
   * @param \Socket|resource|null $socket
   * @return void
   */
  protected function error($socket = null)
  {
    $code = \socket_last_error($socket);
    $error = $this->getError($code);
    if (!empty($error)) {
      if (empty($this->socket))
        \panic('Failed: ' . $error . \EOL);
      else
        \debugging_info('Failed: ' . $error . \EOL);

      $this->last_error = $code;
    }
  }

  public function getError(int $code = 0): ?string
  {
    $code = empty($code) ? $this->last_error : $code;
    if ($code > 0)
      return \socket_strerror($code);

    return null;
  }

  public function clearError(): void
  {
    $this->last_error = 0;
    if ($this->socket !== false)
      \socket_clear_error($this->socket);
  }

  public function setopt(int $level = \SOL_SOCKET, int $option = \SO_REUSEADDR, $value = 1): bool
  {
    return \socket_set_option($this->socket, $level, $option, $value);
  }

  public function recv($maxBytes, $flags = \MSG_DONTWAIT)
  {
    yield;
    $buffer = '';
    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      if (\socket_recv($this->socket, $buffer, $maxBytes, $flags) === false)
        $this->error($this->socket);
    }

    return $buffer;
  }

  public function recv_into(&$buffer, $nBytes = 0, $flags = \PHP_BINARY_READ)
  {
    yield;
    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      $buffer = \socket_read($this->socket, $nBytes, $flags);
      if ($buffer === false)
        $this->error($this->socket);
    }
  }

  public function recvfrom($maxsize, $flags = \MSG_DONTWAIT, &$address = null, &$port = null)
  {
    yield;
    $buffer = '';
    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      if (\socket_recvfrom($this->socket, $buffer, $maxsize, $flags, $address, $port) === false)
        $this->error($this->socket);

      return new Tuple($buffer, $address);
    }
  }

  public function recvfrom_into(&$buffer, $nBytes, $flags = \MSG_DONTWAIT, &$address = null, &$port = null)
  {
    yield;
    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      if (\socket_recvfrom($this->socket, $buffer, $nBytes, $flags, $address, $port) === false)
        $this->error($this->socket);
    }
  }

  public function recvmsg($bufSize, $flags = \MSG_DONTWAIT)
  {
    yield;
    $buffer = [
      "name" => ["family" => \AF_INET6, "addr" => "::1"],
      "buffer_size" => $bufSize,
      "controllen" => \socket_cmsg_space(IPPROTO_IPV6, IPV6_PKTINFO)
    ];

    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      if (\socket_recvmsg($this->socket, $buffer, $flags) === false)
        $this->error($this->socket);

      return $buffer;
    }
  }

  public function recvmsg_into(array &$buffers, $flags = \MSG_DONTWAIT)
  {
    yield;
    if (\is_resource($this->stream)) {
      yield Kernel::readWait($this->stream);
      if (\socket_recvmsg($this->socket, $buffers, $flags) === false)
        $this->error($this->socket);
    }
  }

  public function send($data, int $length, int $flags = 0)
  {
    \socket_send($this->socket, $data, $length, $flags);
  }

  public function sendAll($data, $flags = 0)
  {
  }

  public function sendto($data, $address)
  {
  }

  public function sendto_alt($data, $flags, $address)
  {
  }

  public function sendmsg($buffers, TupleIterator $ancData = null, $flags = 0, $address = None)
  {
  }

  public function read(int $length = -1, int $mode = \PHP_BINARY_READ)
  {
    $data = false;
    if ($this->closed) {
    } elseif ($this->secured) {
      if (\is_resource($this->stream)) {
        yield Kernel::readWait($this->stream);
        $data = \stream_get_contents($this->stream, $length);
      }
    } elseif ($this->socket) {
      $length = ($length === -1) ? 1024 * 16 : $length;
      yield Kernel::readWait($this->stream);
      $data = \socket_read($this->socket, $length, $mode);
      if (false === $data)
        $this->error($this->socket);

      $data = (false === $data && \socket_last_error($this->socket) === 0) ? '' : $data;
    }

    return $data;
  }

  public function write(string $data, ?int $length = 0)
  {
    $count = false;
    $length = (empty($length) ? \strlen($data) : $length);
    if ($this->closed) {
    } elseif ($this->secured) {
      if (\is_resource($this->stream)) {
        yield Kernel::writeWait($this->stream);
        $count = \fwrite($this->stream, $data,  $length);
      }
    } elseif ($this->socket) {
      yield Kernel::writeWait($this->stream);
      $count = \socket_write($this->socket, $data, $length);
      if (false === $count)
        $this->error($this->socket);
    }

    return $count;
  }

  public function accept()
  {
    yield \stateless_task();
    if (!\is_resource($this->stream))
      return false;

    yield Kernel::readWait($this->stream);
    if ($this->secured)
      return $this->do_handshake();
    else
      return new Sockets($this->accepting());
  }

  protected function accepting()
  {
    if ($this->secured) {
      $client = \stream_socket_accept($this->stream, 0);
      if (false === $client)
        throw new \RuntimeException('Error accepting new connection');
    } else {
      $client = \socket_accept($this->socket);
      if (false === $client)
        $this->error($this->socket);
    }

    return $client;
  }

  public function do_handshake()
  {
    \stream_set_blocking($this->stream, true);
    $secure = self::accepting();
    \stream_set_blocking($this->stream, false);

    $error = null;
    \set_error_handler(function ($_, $errstr) use (&$error) {
      $error = \str_replace(array("\r", "\n"), ' ', $errstr);
      // remove useless function name from error message
      if (($pos = \strpos($error, "): ")) !== false) {
        $error = \substr($error, $pos + 3);
      }
    });

    $socket = \socket_import_stream($secure);
    \stream_set_blocking($secure, true);
    $result = @\stream_socket_enable_crypto($secure, true, $this->instance->get_crypto());

    \restore_error_handler();

    if (false === $result) {
      if (\feof($secure) || $error === null) {
        // EOF or failed without error => connection closed during handshake
        if ($error !== 'SSL: The operation completed successfully.  ')
          \debugging_info(\sprintf("Connection lost during TLS handshake with: %s\n", \stream_socket_get_name($secure, true)));
      } else {
        // handshake failed with error message
        \debugging_info(\sprintf("Unable to complete TLS handshake: %s\n", $error));
      }
    }

    return $this->instance->wrap_socket($secure, $socket);
  }

  public function bind($address, ?int $port = 0): bool
  {
    if ($this->socket === false)
      return false;

    $status = \socket_bind($this->socket, $address, $port);
    if (false === $status)
      $this->error($this->socket);

    return $status;
  }

  public function listen(int $backlog = 0): bool
  {
    if ($this->socket === false)
      return false;

    $status = \socket_listen($this->socket, $backlog);
    if (false === $status)
      $this->error($this->socket);

    return $status;
  }

  public function connect($address, ?int $port = 0): bool
  {
    if ($this->socket === false)
      return false;

    $status = \socket_connect($this->socket, $address, $port);
    if ($status === false)
      throw new \Exception($this->getError(\socket_last_error($this->socket)));

    return $status;
  }

  public function connect_ex($address, ?int $port = None): int
  {
    if ($this->socket === false)
      return false;

    $status = \socket_connect($this->socket, $address, $port);
    if ($status === false)
      $status = \socket_last_error($this->socket);

    return $status;
  }

  public function close(): void
  {
    if (!$this->closed) {
      $this->shutdown(2);
      $this->clearError();

      if ($this->secured) {
        if (\is_resource($this->stream))
          \fclose($this->stream);
      } elseif ($this->socket)
        \socket_close($this->socket);

      $this->stream = null;
      $this->socket = null;
      $this->closed = true;

      parent::close();
    }
  }

  public function shutdown(int $how = 2)
  {
    if ($this->secured) {
      if (\is_resource($this->stream))
        \stream_socket_shutdown($this->stream, $how);
    } else {
      if (\is_resource($this->stream))
        \stream_socket_shutdown($this->stream, $how);
      elseif ($this->socket)
        \socket_shutdown($this->socket, $how);
    }
  }

  public function get_peer()
  {
    if ($this->secured) {
      if (\is_resource($this->stream))
        return \stream_socket_get_name($this->stream, true);

      return false;
    } else {
      if ($this->socket) {
        $status = \socket_getpeername($this->socket, $this->address, $this->port);
        if (false === $status)
          $this->error($this->socket);
      }

      return $this->address . ':' . (string) $this->port;
    }
  }

  public function get_local()
  {
    if ($this->secured) {
      if (\is_resource($this->stream))
        return \stream_socket_get_name($this->stream, false);

      return false;
    } else {
      if ($this->socket) {
        $status = \socket_getsockname($this->socket, $this->address, $this->port);
        if (false === $status)
          $this->error($this->socket);
      }

      return $this->address . ':' . (string) $this->port;
    }
  }

  public function makefile($mode, $buffering = 0)
  {
    return new FileStream();
  }

  public function as_stream()
  {
    return $this->stream;
  }

  public function blocking()
  {
  }
}