coast-team/netflux

View on GitHub
src/service/Service.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import { merge, Observable } from 'rxjs'
import { filter, map } from 'rxjs/operators'

import { Channel } from '../Channel'
import { IStream, log } from '../misc/util'
import { InSigMsg, OutSigMsg } from '../Signaling'
import { InWcMsg, OutWcMessage, WebChannel } from '../WebChannel'

export interface IMessageFactory<OutMsg, InMsg extends OutMsg> {
  create: (properties?: OutMsg) => InMsg
  encode: (message: OutMsg) => { finish: () => Uint8Array }
  decode: (reader: Uint8Array) => InMsg
}

export interface IWebChannelStream<OutMsg, InMsg> {
  id: number
  message: Observable<{
    senderId: number
    msg: InMsg
    channel: Channel
    recipientId: number
  }>
  send: (msg: Uint8Array | OutMsg, recipientId: number) => void
}

export interface ISignalingStream<OutMsg, InMsg> {
  id: number
  message: Observable<{ senderId: number; recipientId: number; msg: InMsg }>
  send: (msg: Uint8Array | OutMsg, recipientId: number, senderId: number) => void
}

export interface IAllStreams<OutMsg, InMsg> {
  message: Observable<{ streamId: number; senderId: number; recipientId: number; msg: InMsg }>
  sendOver: (
    streamId: number,
    msg: Uint8Array | OutMsg,
    recipientId: number,
    senderId: number
  ) => void
}

/**
 * Services are specific classes. Instance of such class communicates via
 * network with another instance of the same class. Indeed each peer in the
 * network instantiates its own service.
 * Each service has `.proto` file containing the desciption of its
 * communication protocol.
 */
export abstract class Service<OutMsg, InMsg extends OutMsg> {
  /*
   * Unique service identifier.
   */
  private serviceId: number

  /*
   * Service protobufjs object generated from `.proto` file.
   */
  private proto: IMessageFactory<OutMsg, InMsg>

  constructor(serviceId: number, proto: IMessageFactory<OutMsg, InMsg>) {
    this.serviceId = serviceId
    this.proto = proto
  }

  protected useWebChannelStream(
    wc: IStream<OutWcMessage, InWcMsg> & WebChannel
  ): IWebChannelStream<OutMsg, InMsg> {
    return {
      id: wc.STREAM_ID,
      message: wc.messageFromStream.pipe(
        filter(({ serviceId }) => serviceId === this.serviceId),
        map(({ channel, senderId, recipientId, content }) => ({
          channel,
          senderId,
          recipientId,
          msg: this.decode(content),
        })),
        filter(({ msg }: any) => msg && msg.type)
      ),
      send: (content: Uint8Array | OutMsg, recipientId: number) => {
        wc.sendOverStream({
          senderId: wc.myId,
          recipientId,
          serviceId: this.serviceId,
          content: content instanceof Uint8Array ? content : this.encode(content),
        })
      },
    }
  }

  protected useSignalingStream(sig: IStream<OutSigMsg, InSigMsg>): ISignalingStream<OutMsg, InMsg> {
    return {
      id: sig.STREAM_ID,
      message: sig.messageFromStream.pipe(
        filter(({ serviceId }) => serviceId === this.serviceId),
        map(({ senderId, recipientId, content }) => ({
          senderId,
          recipientId,
          msg: this.decode(content),
        })),
        filter(({ msg }: any) => msg && msg.type)
      ),
      send: (content: Uint8Array | OutMsg, recipientId: number, senderId: number) => {
        sig.sendOverStream({
          senderId,
          recipientId,
          serviceId: this.serviceId,
          content: content instanceof Uint8Array ? content : this.encode(content),
        })
      },
    }
  }

  protected useAllStreams(
    wc: IStream<OutWcMessage, InWcMsg> & WebChannel,
    sig: IStream<OutSigMsg, InSigMsg>
  ): IAllStreams<OutMsg, InMsg> {
    const wcStream = this.useWebChannelStream(wc)
    const sigStream = this.useSignalingStream(sig)
    return {
      message: merge(
        wcStream.message.pipe(
          map(({ senderId, recipientId, msg }) => ({
            streamId: wcStream.id,
            senderId,
            recipientId,
            msg,
          }))
        ),
        sigStream.message.pipe(
          map(({ senderId, recipientId, msg }) => ({
            streamId: sigStream.id,
            senderId,
            recipientId,
            msg,
          }))
        )
      ),
      sendOver: (
        streamId: number,
        msg: Uint8Array | OutMsg,
        recipientId: number,
        senderId: number
      ) => {
        if (streamId === wcStream.id) {
          wcStream.send(msg, recipientId)
        } else {
          sigStream.send(msg, recipientId, senderId)
        }
      },
    }
  }

  /**
   * Encode service message for sending over the network.
   *
   * @param msg Service specific message object
   */
  protected encode(msg: OutMsg): Uint8Array {
    return this.proto.encode(this.proto.create(msg)).finish()
  }

  /**
   * Decode service message received from the network.
   *
   * @return  Service specific message object
   */
  protected decode(bytes: Uint8Array): InMsg {
    try {
      return this.proto.decode(bytes)
    } catch (err) {
      log.warn('Decode service message error: ', err)
    }
    return { type: undefined } as any
  }
}