lipp/node-jet

View on GitHub
src/3_jet/daemon/index.ts

Summary

Maintainability
C
1 day
Test Coverage
'use strict'

import { Logger, logger } from '../log.js'
import {
  AddRequest,
  AuthParams,
  FetchParams,
  PathParams,
  SetParams,
  UserParams
} from '../messages.js'
import { createPathMatcher } from './path_matcher.js'
import { Subscription } from './subscription.js'
import { Route } from './route.js'
import {
  ConnectionInUse,
  InvvalidCredentials,
  JsonRPCError,
  notAllowed,
  NotAuthorized,
  NotFound,
  Occupied
} from '../errors.js'
import JsonRPC from '../../2_jsonrpc/index.js'
import { JsonRPCServer } from '../../2_jsonrpc/server.js'
import { WebServerConfig } from '../../1_socket/wsserver.js'
import { TCPServerConfig } from '../../1_socket/tcpserver.js'
import { EventEmitter } from '../../1_socket/index.js'
import { UserManager } from './UserManager.js'

const version = '2.2.0'

interface Features {
  authenticate?: boolean
  batches?: boolean
  fetch?: 'full' | 'simple'
  asNotification?: boolean
}
export interface InfoOptions {
  protocolVersion?: string
  version?: string
  name?: string
  features?: Features
}

const defaultListenOptions = {
  tcpPort: 11122,
  wsPort: 11123
}

class InfoObject implements InfoOptions {
  name: string
  version: string
  protocolVersion: string
  features: Features
  constructor(options: InfoOptions, authenticate: boolean) {
    this.name = options.name || 'node-jet'
    this.version = version
    this.protocolVersion = '1.1.0'
    this.features = {
      batches: options.features?.batches || false,
      fetch: options.features?.fetch || 'full',
      asNotification: options.features?.asNotification || false,
      authenticate: authenticate
    }
  }
}

interface DaemonOptions {
  log?: logger
  username?: string
  password?: string
}
/**
 * Creates a Daemon instance
 *
 * In most cases you need one Jet Daemon instance running.
 * All Peers connect to it as in typical master(Daemon) slave(Peer)
 * architectures.
 *
 */
export class Daemon extends EventEmitter {
  infoObject: InfoObject
  log: Logger
  jsonRPCServer!: JsonRPCServer
  routes: Record<string, Route> = {}
  subscriber: Subscription[] = []
  authenticator: UserManager
  /**
   * Constructor for creating the instance
   * @param {DaemonOptions & InfoOptions} [options] Options for the daemon creation
   */
  constructor(options: DaemonOptions & InfoOptions = {}) {
    super()

    this.authenticator = new UserManager(options.username, options.password)
    this.infoObject = new InfoObject(options, this.authenticator.enabled)
    this.log = new Logger(options.log)
  }
  asNotification = () => this.infoObject.features.asNotification
  simpleFetch = () => this.infoObject.features.fetch === 'simple'
  respond = (peer: JsonRPC, id: string) => {
    if (this.asNotification()) {
      peer.respond(id, {}, true)
      this.emit('notify')
    } else {
      this.emit('notify')
      peer.respond(id, {}, true)
    }
  }

  authenticate = (peer: JsonRPC, id: string, params: AuthParams) => {
    if (this.authenticator.login(params.user, params.password)) {
      peer.user = params.user
      peer.respond(id, {}, true)
    } else {
      peer.respond(id, new InvvalidCredentials(params.user), false)
    }
  }

  addUser = (peer: JsonRPC, id: string, params: UserParams) => {
    try {
      this.authenticator.addUser(
        peer.user,
        params.user,
        params.password,
        params.groups
      )
      peer.respond(id, {}, true)
    } catch (ex) {
      peer.respond(id, ex as NotAuthorized, false)
    }
  }
  /*
  Add as Notification: The message is acknowledged,then all the peers are informed about the new state
  Add synchronous: First all Peers are informed about the new value then message is acknowledged
  */
  add = (peer: JsonRPC, id: string, params: AddRequest) => {
    const path = params.path
    if (path in this.routes) {
      peer.respond(id, new Occupied(path), false)
      return
    }
    this.routes[path] = new Route(peer, path, params.value, params.access)
    if (typeof params.value !== 'undefined') {
      this.subscriber.forEach((fetchRule) => {
        if (this.simpleFetch() || fetchRule.matchesPath(path)) {
          fetchRule.addRoute(this.routes[path])
        }
      })
    }
    this.respond(peer, id)
  }
  /*
  Change as Notification: The message is acknowledged,then all the peers are informed about the value change
  change synchronous: First all Peers are informed about the new value then the message is acknowledged
  */
  change = (peer: JsonRPC, id: string, msg: SetParams) => {
    if (msg.path in this.routes && typeof msg.value !== 'undefined') {
      this.routes[msg.path].updateValue(msg.value)
      this.respond(peer, id)
    } else {
      peer.respond(id, new NotFound(), false)
    }
  }

  /*
  Fetch as Notification: The message is acknowledged,then the peer is informed of all the states matching the fetchrule
  Fetch synchronous: First the peer is informed of all the states matching the fetchrule then the message is acknowledged
  */
  fetch = (peer: JsonRPC, id: string, msg: FetchParams) => {
    if (
      this.simpleFetch() &&
      this.subscriber.find((sub) => sub.owner === peer)
    ) {
      peer.respond(
        id,
        new ConnectionInUse('Only one fetcher per peer in simple fetch Mode'),
        false
      )
      return
    }
    if (this.subscriber.find((sub) => sub.id === msg.id)) {
      peer.respond(id, new Occupied('FetchId already in use'), false)
      return
    }
    try {
      const sub = new Subscription(msg, peer)
      this.addListener('notify', sub.send)
      this.subscriber.push(sub)

      sub.setRoutes(
        Object.values(this.routes).filter(
          (route) =>
            this.routes[route.path].value !== undefined && //check if state
            (this.simpleFetch() || sub.matchesPath(route.path)) //check if simpleFetch or pathrule matches
        )
      )
      this.respond(peer, id)
    } catch (err) {
      peer.respond(id, err as JsonRPCError, false)
    }
  }

  /*
  Unfetch synchronous: Unfetch fires and no more updates are send with the given fetch_id. Message is acknowledged
  */
  unfetch = (peer: JsonRPC, id: string, params: FetchParams) => {
    const subIdx = this.subscriber.findIndex((fetch) => fetch.id === params.id)
    if (subIdx < 0) {
      peer.respond(
        id,
        new NotFound(`No Subscription with id ${params.id} found`),
        false
      )
      return
    }
    if (this.subscriber[subIdx].owner !== peer) {
      peer.respond(
        id,
        new notAllowed(`Peer does not own subscription with id ${params.id}`),
        false
      )
      return
    }
    this.subscriber[subIdx].close()
    this.subscriber.splice(subIdx, 1)
    peer.respond(id, {}, true)
  }

  /*
  Get synchronous: Only synchronous implementation-> all the values are added to an array and send as response
  */
  get = (peer: JsonRPC, id: string, params: FetchParams) => {
    try {
      const matcher = createPathMatcher(params)
      const resp = Object.keys(this.routes)
        .filter(
          (route) =>
            matcher(route) &&
            this.authenticator.isAllowed(
              'get',
              peer.user,
              this.routes[route].access
            )
        )
        .map((route: string) => {
          return { path: route, value: this.routes[route].value }
        })
      peer.respond(id, resp, true)
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
    } catch (ex: any) {
      peer.respond(id, ex, false)
    }
  }

  /*
  remove synchronous: Only synchronous implementation-> state is removed then message is acknowledged
  */
  remove = (peer: JsonRPC, id: string, params: PathParams) => {
    const route = params.path
    if (!(route in this.routes)) {
      peer.respond(id, new NotFound(route), false)
      return
    }
    this.routes[route].remove()
    delete this.routes[route]
    this.respond(peer, id)
  }
  /*
  Call and Set requests: Call and set requests are always forwarded synchronous
  */
  forward = (method: 'set' | 'call', user: string, params: PathParams) => {
    if (!(params.path in this.routes)) {
      return Promise.reject(new NotFound(params.path))
    }
    if (
      !this.authenticator.isAllowed(
        'set',
        user,
        this.routes[params.path].access
      )
    ) {
      return Promise.reject(new NotAuthorized(params.path))
    }
    return this.routes[params.path].owner.sendRequest(method, params, true)
  }

  /*
  Info requests: Info requests are always synchronous
  */
  info = (peer: JsonRPC, id: string) => {
    peer.respond(id, this.infoObject, true)
  }

  configure = (peer: JsonRPC, id: string) => {
    peer.respond(id, {}, true)
  }

  filterRoutesByPeer = (peer: JsonRPC): string[] =>
    Object.entries(this.routes)
      .filter(([, route]) => route.owner === peer)
      .map((el) => el[0])

  /**
   * This function starts to listen on the specified port
   * @param listenOptions
   */
  listen = (
    listenOptions: TCPServerConfig & WebServerConfig = defaultListenOptions
  ) => {
    this.jsonRPCServer = new JsonRPCServer(
      this.log,
      listenOptions,
      this.infoObject.features.batches
    )
    this.jsonRPCServer.addListener('connection', (newPeer: JsonRPC) => {
      this.log.info('Peer connected')

      newPeer.addListener('info', this.info)
      newPeer.addListener('configure', this.configure)
      newPeer.addListener('authenticate', this.authenticate)
      newPeer.addListener('addUser', this.addUser)

      newPeer.addListener('add', this.add)
      newPeer.addListener('change', this.change)
      newPeer.addListener('remove', this.remove)

      newPeer.addListener('get', this.get)
      newPeer.addListener('fetch', this.fetch)
      newPeer.addListener('unfetch', this.unfetch)

      newPeer.addListener(
        'set',
        (peer: JsonRPC, id: string, params: PathParams) =>
          this.forward('set', peer.user, params)
            .then((res) => newPeer.respond(id, res, true))
            .catch((err) => newPeer.respond(id, err, false))
            .finally(() => newPeer.send())
      )
      newPeer.addListener(
        'call',
        (peer: JsonRPC, id: string, params: PathParams) =>
          this.forward('call', peer.user, params)
            .then((res) => newPeer.respond(id, res, true))
            .catch((err) => newPeer.respond(id, err, false))
            .finally(() => newPeer.send())
      )
    })
    this.jsonRPCServer.addListener('disconnect', (peer: JsonRPC) => {
      this.filterRoutesByPeer(peer).forEach((route) => {
        this.log.warn('Removing route that was owned by peer')
        this.routes[route].remove()
        delete this.routes[route]
      })

      this.subscriber = this.subscriber.filter((fetcher) => {
        if (fetcher.owner !== peer) {
          return true
        }
        fetcher.close()
        return false
      })
    })
    this.jsonRPCServer.listen()
    this.log.info('Daemon started')
  }

  close = () => {
    this.jsonRPCServer.close()
  }
}

export default Daemon