packages/transport/src/index.js
'use strict'
const debug = require('debug')
const log = debug('libp2p:webrtc-star')
log.error = debug('libp2p:webrtc-star:error')
const { EventEmitter } = require('events')
const errcode = require('err-code')
const withIs = require('class-is')
const { AbortError } = require('abortable-iterator')
const SimplePeer = require('simple-peer')
const webrtcSupport = require('webrtcsupport')
const multiaddr = require('multiaddr')
const mafmt = require('mafmt')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const { CODE_CIRCUIT } = require('./constants')
const createListener = require('./listener')
const toConnection = require('./socket-to-conn')
const { cleanMultiaddr } = require('./utils')
// eslint-disable-next-line @typescript-eslint/no-empty-function
function noop() {}
class WebRTCStar {
constructor(options = {}) {
if (!options.upgrader) {
throw new Error(
'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.'
)
}
this._upgrader = options.upgrader
this._signallingAddr = undefined
this.sioOptions = {
auth: {
namespace: options.namespace || 'dstack'
}
}
if (options.wrtc) {
this.wrtc = options.wrtc
}
this.listenersRefs = {}
// Discovery
this.discovery = new EventEmitter()
this.discovery.tag = 'webRTCStar'
this.discovery._isStarted = false
this.discovery.start = () => {
this.discovery._isStarted = true
}
this.discovery.stop = () => {
this.discovery._isStarted = false
}
this._peerDiscovered = this._peerDiscovered.bind(this)
}
async dial(ma, options = {}) {
const rawConn = await this._connect(ma, options)
const maConn = toConnection(rawConn, {
remoteAddr: ma,
signal: options.signal
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
_connect(ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
}
const spOptions = {
initiator: true,
trickle: false
}
// Use custom WebRTC implementation
if (this.wrtc) {
spOptions.wrtc = this.wrtc
}
const cOpts = ma.toOptions()
const intentId = (~~(Math.random() * 1e9)).toString(36) + Date.now()
const sioClient = this.listenersRefs[Object.keys(this.listenersRefs)[0]].io
return new Promise((resolve, reject) => {
const start = Date.now()
let connected
log('dialing %s:%s', cOpts.host, cOpts.port)
const channel = new SimplePeer(spOptions)
const onError = (err) => {
if (!connected) {
const msg = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
err.message = msg
log.error(msg)
done(err)
}
}
const onTimeout = () => {
log('connnection timeout %s:%s', cOpts.host, cOpts.port)
const err = errcode(
new Error(`connection timeout after ${Date.now() - start}ms`),
'ERR_CONNECT_TIMEOUT'
)
// Note: this will result in onError() being called
channel.emit('error', err)
}
const onConnect = () => {
connected = true
log('connection opened %s:%s', cOpts.host, cOpts.port)
done(null)
}
const onAbort = () => {
log.error('connection aborted %s:%s', cOpts.host, cOpts.port)
channel.destroy()
done(new AbortError())
}
const done = (err) => {
channel.removeListener('timeout', onTimeout)
channel.removeListener('connect', onConnect)
options.signal && options.signal.removeEventListener('abort', onAbort)
err ? reject(err) : resolve(channel)
}
channel.on('error', onError)
channel.once('timeout', onTimeout)
channel.once('connect', onConnect)
channel.on('close', () => {
channel.removeListener('error', onError)
})
options.signal && options.signal.addEventListener('abort', onAbort)
channel.on('signal', (signal) => {
sioClient.emit('ss-handshake', {
intentId: intentId,
srcMultiaddr: this._signallingAddr.toString(),
dstMultiaddr: ma.toString(),
signal: signal
})
})
// NOTE: aegir segfaults if we do .once on the socket.io event emitter and we
// are clueless as to why.
sioClient.on('ws-handshake', (offer) => {
if (offer.intentId === intentId && offer.err) {
reject(
errcode(
offer.err instanceof Error ? offer.err : new Error(offer.err),
'ERR_SIGNALLING_FAILED'
)
)
}
if (offer.intentId !== intentId || !offer.answer || channel.destroyed) {
return
}
channel.signal(offer.signal)
})
})
}
/**
* Creates a WebrtcStar listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {object} [options] simple-peer options for listener
* @param {function (Connection)} handler
* @returns {Listener} A WebrtcStar listener
*/
createListener(options = {}, handler) {
if (!webrtcSupport.support && !this.wrtc) {
throw errcode(new Error('no WebRTC support'), 'ERR_NO_WEBRTC_SUPPORT')
}
if (typeof options === 'function') {
handler = options
options = {}
}
handler = handler || noop
return createListener({ handler, upgrader: this._upgrader }, this, {
...options,
sioOptions: this.sioOptions
})
}
filter(multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter((ma) => {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}
return mafmt.WebRTCStar.matches(ma)
})
}
_peerDiscovered(maStr) {
if (!this.discovery._isStarted) return
log('Peer Discovered:', maStr)
maStr = cleanMultiaddr(maStr)
const ma = multiaddr(maStr)
const peerId = PeerId.createFromB58String(ma.getPeerId())
const peerInfo = new PeerInfo(peerId)
peerInfo.multiaddrs.add(ma)
this.discovery.emit('peer', peerInfo)
}
}
module.exports = withIs(WebRTCStar, {
className: 'WebRTCStar',
symbolName: '@libp2p/js-libp2p-webrtc-star/webrtcstar'
})