coast-team/netflux

View on GitHub
src/WebChannel.ts

Summary

Maintainability
D
2 days
Test Coverage
import { Observable, Subject } from 'rxjs'

import { Channel } from './Channel'
import {
  extractHostnameAndPort,
  generateId,
  generateKey,
  isBrowser,
  isOnline,
  IStream,
  isVisible,
  log,
  validateKey,
  validateWebSocketURL,
} from './misc/util'
import { IMessage, Message } from './proto/index'
import { ChannelBuilder } from './service/channelBuilder/ChannelBuilder'
import { FullMesh } from './service/topology/FullMesh'
import { ITopology, TopologyEnum, TopologyState } from './service/topology/Topology'
import { UserDataType, UserMessage } from './service/UserMessage'
import { Signaling, SignalingState } from './Signaling'
import { WebChannelState } from './WebChannelState'
import { WebSocketBuilder } from './WebSocketBuilder'

export interface IWebChannelOptions {
  topology?: TopologyEnum
  signalingServer?: string
  rtcConfiguration?: RTCConfiguration
  autoRejoin?: boolean
}

export const webChannelDefaultOptions = {
  topology: TopologyEnum.FULL_MESH,
  signalingServer: 'wss://signaling.netflux.coedit.re',
  rtcConfiguration: {
    iceServers: [{ urls: 'stun:stun3.l.google.com:19302' }],
  },
  autoRejoin: true,
}

export interface InWcMsg extends Message {
  channel: Channel
}

export type OutWcMessage = IMessage

const REJOIN_TIMEOUT = 3000

/**
 * This class is an API starting point. It represents a group of collaborators
 * also called peers. Each peer can send/receive broadcast as well as personal
 * messages. Every peer in the `WebChannel` can invite another person to join
 * the `WebChannel` and he also possess enough information to be able to add it
 * preserving the current `WebChannel` structure (network topology).
 * [[include:installation.md]]
 */
export class WebChannel implements IStream<OutWcMessage, InWcMsg> {
  public readonly STREAM_ID = 2
  public members: number[]
  public topologyEnum: TopologyEnum
  public myId: number
  public key: string
  public autoRejoin: boolean
  public rtcConfiguration: RTCConfiguration
  public state: WebChannelState

  public onSignalingStateChange: (state: SignalingState) => void
  public onStateChange: (state: WebChannelState) => void
  public onMemberJoin: (id: number) => void
  public onMemberLeave: (id: number) => void
  public onMessage: (id: number, msg: UserDataType) => void
  public onMyId: (id: number) => void

  public webSocketBuilder: WebSocketBuilder
  public channelBuilder: ChannelBuilder
  public topology: ITopology
  public signaling: Signaling
  public userMsg: UserMessage
  public streamSubject: Subject<InWcMsg>

  private _id: number
  private idSubject: Subject<number>
  private _onAlone: () => void
  private rejoinEnabled: boolean
  private rejoinTimer: any

  constructor(options: IWebChannelOptions) {
    const { topology, autoRejoin, rtcConfiguration, signalingServer } = {
      ...webChannelDefaultOptions,
      ...options,
    }
    this.streamSubject = new Subject()
    this.idSubject = new Subject()
    this.topologyEnum = topology
    this.autoRejoin = autoRejoin
    this.rtcConfiguration = rtcConfiguration
    this.members = []
    this._id = 0
    this.key = ''
    this.myId = 0
    this.state = WebChannelState.LEFT
    this.rejoinEnabled = false
    this.rejoinTimer = undefined
    this.topology = {} as ITopology
    this._onAlone = () => {}
    this.onMemberJoin = function none() {}
    this.onMemberLeave = function none() {}
    this.onMessage = function none() {}
    this.onMyId = function none() {}
    this.onStateChange = function none() {}
    this.onSignalingStateChange = function none() {}

    // Initialize services
    this.userMsg = new UserMessage()
    this.signaling = new Signaling(this, signalingServer)
    this.subscribeToSignalingState()
    this.webSocketBuilder = new WebSocketBuilder(this)
    this.channelBuilder = new ChannelBuilder(this)
    this.setTopology(topology)

    // Listen to browser events
    if (isBrowser) {
      this.subscribeToBrowserEvents()
    }
  }

  get onIdChange(): Observable<number> {
    return this.idSubject.asObservable()
  }

  get id(): number {
    return this._id
  }

  set id(value: number) {
    this._id = value
    this.idSubject.next(value)
  }

  get messageFromStream(): Observable<InWcMsg> {
    return this.streamSubject.asObservable()
  }

  set onAlone(handler: () => void) {
    this._onAlone = handler
  }

  sendOverStream(msg: OutWcMessage) {
    this.topology.sendTo(msg)
  }

  join(key: string = generateKey()): void {
    validateKey(key)
    if (this.state === WebChannelState.LEFT) {
      this.startJoin(key)
    }
  }

  invite(url: string): void {
    validateWebSocketURL(url)
    const hostnamePort = extractHostnameAndPort(url)
    for (const ch of this.topology.neighbors) {
      if (hostnamePort === extractHostnameAndPort(ch.url)) {
        return
      }
    }
    this.webSocketBuilder
      .connect(
        url,
        Channel.WITH_JOINING,
        -1,
        -1,
        this.id
      )
      .catch((err) => log.webgroup(`Failed to invite the bot ${url}: ${err.message}`))
  }

  leave() {
    if (this.state !== WebChannelState.LEFT) {
      this.key = ''
      this.internalLeave()
    }
  }

  send(data: UserDataType): void {
    if (this.members.length !== 1) {
      for (const chunk of this.userMsg.encodeUserMessage(data)) {
        this.topology.send({
          senderId: this.myId,
          recipientId: 0,
          serviceId: UserMessage.SERVICE_ID,
          content: chunk,
        })
      }
    }
  }

  sendTo(id: number, data: UserDataType): void {
    if (this.members.length !== 1) {
      for (const chunk of this.userMsg.encodeUserMessage(data)) {
        this.topology.sendTo({
          senderId: this.myId,
          recipientId: id,
          serviceId: UserMessage.SERVICE_ID,
          content: chunk,
        })
      }
    }
  }

  onMemberJoinProxy(id: number): void {
    if (!this.members.includes(id)) {
      this.members[this.members.length] = id
      this.onMemberJoin(id)
    }
  }

  onAdjacentMembersLeaveProxy(ids: number[]): void {
    if (this.onMemberLeaveProxy(ids)) {
      if (this.members.length === 1) {
        this._onAlone()
        this.topology.leave()
      } else if (
        this.signaling.state === SignalingState.CHECKED &&
        this.topology.state === TopologyState.CONSTRUCTED
      ) {
        this.signaling.check()
      }
    }
  }

  onDistantMembersLeaveProxy(ids: number[]) {
    this.onMemberLeaveProxy(ids)
  }

  init(key: string, id: number = generateId()) {
    log.webgroup('INIT')
    this.id = id
    this.myId = generateId()
    this.members = [this.myId]
    this.key = key
    this.rejoinEnabled = this.autoRejoin
    this.onMyId(this.myId)
    if (this.rejoinTimer) {
      clearTimeout(this.rejoinTimer)
      this.rejoinTimer = undefined
    }
    this.setState(WebChannelState.JOINING)
  }

  private clean() {
    log.webgroup('CLEAN')
    if (this.rejoinTimer) {
      clearTimeout(this.rejoinTimer)
      this.rejoinTimer = undefined
    }
    this.members = []
    this.id = 0
    this.myId = 0
  }

  private setState(state: WebChannelState): void {
    if (this.state !== state) {
      log.webGroupState(WebChannelState[state], this.myId)
      this.state = state
      this.onStateChange(state)
    }
  }

  private setTopology(topologyEnum: TopologyEnum): void {
    this.topologyEnum = topologyEnum
    this.topology = new FullMesh(this)
    this.topology.onState.subscribe((state: TopologyState) => {
      log.webgroup('Topology state: ', TopologyState[state])
      switch (state) {
        case TopologyState.CONSTRUCTING:
          this.setState(WebChannelState.JOINING)
          break
        case TopologyState.CONSTRUCTED:
          if (this.signaling.state === SignalingState.OPEN) {
            this.signaling.check()
          } else if (this.signaling.state === SignalingState.CHECKED) {
            this.setState(WebChannelState.JOINED)
            this.signaling.check()
          } else if (this.signaling.state === SignalingState.CLOSED) {
            // This is for a bot who was invited to the group
            this.signaling.connect(this.key)
          }
          break
        case TopologyState.IDLE:
          this.channelBuilder.clean()
          this.userMsg.clean()
          switch (this.signaling.state) {
            case SignalingState.CLOSED:
              if (this.rejoinEnabled) {
                this.rejoin()
              } else {
                this.internalLeave()
              }
              break
            case SignalingState.CONNECTING:
              this.setState(WebChannelState.JOINING)
              break
            case SignalingState.OPEN:
              this.setState(WebChannelState.JOINING)
              this.signaling.check()
              break
            case SignalingState.CHECKING:
              this.setState(WebChannelState.JOINING)
              break
            case SignalingState.CHECKED:
              this.signaling.check()
              break
          }

          break
      }
    })
  }

  private subscribeToSignalingState() {
    this.signaling.onState.subscribe((state: SignalingState) => {
      log.signalingState(SignalingState[state], this.myId)
      this.onSignalingStateChange(state)
      switch (state) {
        case SignalingState.CLOSED:
          if (this.topology.state === TopologyState.IDLE) {
            if (this.rejoinEnabled) {
              this.rejoin()
            } else {
              this.internalLeave()
            }
          } else if (this.topology.state === TopologyState.CONSTRUCTED) {
            if (this.members.length === 1) {
              this.topology.leave()
            } else if (this.rejoinEnabled && !this.rejoinTimer) {
              this.reconnectToSignaling()
            }
          }
          break
        case SignalingState.OPEN:
          if (this.topology.state !== TopologyState.CONSTRUCTING) {
            this.signaling.check()
          }
          break
        case SignalingState.CHECKED:
          if (this.topology.state === TopologyState.IDLE) {
            if (this.signaling.connected) {
              this.topology.setJoinedState()
            }
          } else if (this.topology.state === TopologyState.CONSTRUCTED) {
            this.setState(WebChannelState.JOINED)
          }
          break
      }
    })
  }

  private subscribeToBrowserEvents() {
    window.addEventListener('online', () => this.onBrowserBack())
    window.addEventListener('visibilitychange', () => this.onBrowserBack())
    window.addEventListener('beforeunload', () => this.leave())
  }

  private startJoin(key = this.key) {
    log.webgroup('start join...')
    this.init(key)
    this.signaling.connect(key)
  }

  private rejoin() {
    this.setState(WebChannelState.JOINING)
    if (!isVisible() || !isOnline()) {
      this.internalLeave()
    } else {
      this.clean()
      log.webgroup(`rejoin in ${REJOIN_TIMEOUT}ms`)
      this.rejoinTimer = setTimeout(() => {
        if (
          this.signaling.state === SignalingState.CLOSED &&
          isVisible() &&
          isOnline() &&
          this.rejoinEnabled
        ) {
          this.startJoin()
        } else {
          log.webgroup('abandon rejoin because: ', {
            isVisible: isVisible(),
            isOnline: isOnline(),
            signalingState: SignalingState[this.signaling.state],
            rejoinEnabled: this.rejoinEnabled,
          })
          this.internalLeave()
        }
        this.rejoinTimer = undefined
      }, REJOIN_TIMEOUT)
    }
  }

  private reconnectToSignaling() {
    if (this.members.length === 1 && (!isVisible() || !isOnline())) {
      this.internalLeave()
    } else {
      log.webgroup(`reconnect to Signaling server in ${REJOIN_TIMEOUT}ms`)
      this.setState(WebChannelState.JOINING)
      this.rejoinTimer = setTimeout(() => {
        if (this.signaling.state === SignalingState.CLOSED) {
          if (isVisible() && isOnline()) {
            if (this.rejoinEnabled) {
              this.signaling.connect(this.key)
            }
          } else if (this.members.length === 1) {
            this.internalLeave()
          }
        }
        this.rejoinTimer = undefined
      }, REJOIN_TIMEOUT)
    }
  }

  private onBrowserBack() {
    if (isVisible() && isOnline()) {
      log.webgroup('onBrowserBack', { isVisible: isVisible(), isOnline: isOnline() })
      this.rejoinEnabled = this.autoRejoin
      if (this.rejoinEnabled) {
        if (this.state === WebChannelState.LEFT) {
          this.startJoin()
        } else if (this.signaling.state === SignalingState.CLOSED) {
          this.signaling.connect(this.key)
        }
      }
    }
  }

  private onMemberLeaveProxy(ids: number[]) {
    let atLeastOneLeft = false
    ids.forEach((id) => {
      if (this.members.includes(id)) {
        this.members.splice(this.members.indexOf(id), 1)
        atLeastOneLeft = true
        this.onMemberLeave(id)
      }
    })
    return atLeastOneLeft
  }

  private internalLeave() {
    log.webgroup('internal leave')
    this.rejoinEnabled = false
    this.signaling.close()
    this.topology.leave()
    this.clean()
    this.setState(WebChannelState.LEFT)
  }
}