dstack-js/dstack

View on GitHub
packages/transport/src/listener.js

Summary

Maintainability
A
3 hrs
Test Coverage
'use strict'

const EventEmitter = require('events')
const debug = require('debug')
const log = debug('libp2p:webrtc-star:listener')
log.error = debug('libp2p:webrtc-star:listener:error')

const multiaddr = require('multiaddr')

const io = require('socket.io-client')
const SimplePeer = require('simple-peer')
const pDefer = require('p-defer')

const toConnection = require('./socket-to-conn')
const { cleanUrlSIO } = require('./utils')
const { CODE_P2P } = require('./constants')

module.exports = ({ handler, upgrader }, WebRTCStar, options = {}) => {
  const sioOptions = {
    path: '/socket.io/',
    transports: ['websocket'],
    'force new connection': true,
    ...options.sioOptions
  }

  const listener = new EventEmitter()
  let listeningAddr

  listener.__connections = []
  listener.listen = (ma) => {
    const defer = pDefer()

    listeningAddr = ma
    if (!ma.protoCodes().includes(CODE_P2P) && upgrader.localPeer) {
      WebRTCStar._signallingAddr = ma.encapsulate(
        `/p2p/${upgrader.localPeer.toB58String()}`
      )
    } else {
      WebRTCStar._signallingAddr = ma
    }

    const sioUrl = cleanUrlSIO(ma)

    log('Dialing to Signalling Server on: ' + sioUrl)
    listener.io = io.connect(sioUrl, sioOptions)

    const incommingDial = (offer) => {
      if (offer.answer || offer.err) {
        return
      }

      const spOptions = {
        trickle: false,
        ...options
      }

      // Use custom WebRTC implementation
      if (WebRTCStar.wrtc) {
        spOptions.wrtc = WebRTCStar.wrtc
      }

      const channel = new SimplePeer(spOptions)

      const onError = (err) => {
        log.error('incoming connectioned errored', err)
      }

      channel.on('error', onError)
      channel.once('close', (...args) => {
        channel.removeListener('error', onError)
      })

      channel.once('signal', (signal) => {
        offer.signal = signal
        offer.answer = true
        listener.io.emit('ss-handshake', offer)
      })

      channel.signal(offer.signal)

      channel.once('connect', async () => {
        const maConn = toConnection(channel)
        log('new inbound connection %s', maConn.remoteAddr)

        let conn
        try {
          conn = await upgrader.upgradeInbound(maConn)
        } catch (err) {
          log.error('inbound connection failed to upgrade', err)
          return maConn.close()
        }

        if (!conn.remoteAddr) {
          try {
            conn.remoteAddr = ma
              .decapsulateCode(CODE_P2P)
              .encapsulate(`/p2p/${conn.remotePeer.toB58String()}`)
          } catch (err) {
            log.error('could not determine remote address', err)
          }
        }

        log('inbound connection %s upgraded', maConn.remoteAddr)

        trackConn(listener, maConn)

        listener.emit('connection', conn)
        handler(conn)
      })
    }

    listener.io.once('connect_error', (err) => defer.reject(err))
    listener.io.once('error', (err) => {
      listener.emit('error', err)
      listener.emit('close')
    })

    listener.io.on('ws-handshake', incommingDial)
    listener.io.on('ws-peer', WebRTCStar._peerDiscovered)

    listener.io.on('connect', () => {
      listener.io.emit('ss-join', WebRTCStar._signallingAddr.toString())
    })

    listener.io.once('connect', () => {
      listener.emit('listening')
      defer.resolve()
    })

    return defer.promise
  }

  listener.close = async () => {
    listener.io && listener.io.emit('ss-leave')
    await Promise.all(listener.__connections.map((maConn) => maConn.close()))
    listener.emit('close')
  }

  listener.getAddrs = () => {
    return [listeningAddr]
  }

  WebRTCStar.listenersRefs[multiaddr.toString()] = listener
  return listener
}

function trackConn(listener, maConn) {
  listener.__connections.push(maConn)

  const untrackConn = () => {
    listener.__connections = listener.__connections.filter((c) => c !== maConn)
  }

  maConn.conn.once('close', untrackConn)
}