nikkatalnikov/apeiron

View on GitHub
streamProviders.js

Summary

Maintainability
C
1 day
Test Coverage
import { Subscriber, Observable, Subject } from 'rxjs'
import axios from 'axios'
import { StreamAPI } from './streamAPI'

class StreamProvider {
  constructor(type, ...args) {
    Object.assign(this, { type }, ...args)
    this.errorStream = new Subject()
  }
}

class HTTPProvider extends StreamProvider {
  constructor(type, { endpoints, config }) {
    super(type, { endpoints, config })

    this.service = axios.create(config)
    this.requestStream = new Subject()

    this.dataStream = this.requestStream
      .concatMap(data => Observable.fromPromise(HTTPProvider.callXHR(this.service, data)))
      .do(() => void 0, err => this.errorStream.next(err))
      .retry()
      .share()

    this.dataStream.source.connect()
  }

  static callXHR(service, { endpoint, data, config }) {
    const bodylessMethods = ['get', 'delete', 'head']
    const method = endpoint.method
    const handler = service[method].bind(service)
    let url = endpoint.url

    if (url.includes(':id')) {
      if (!config.id) {
        return Promise.reject(new Error(`${url} request must have id`))
      }
      url = url.replace(':id', config.id)
    }
    return bodylessMethods.includes(method) ?
      handler(url, config) : handler(url, data, config)
  }

  send(alias, payload = {}) {
    const endpoint = this.endpoints[alias]
    if (endpoint) {
      this.requestStream.next({ endpoint, data: payload.data, config: payload.config })
    } else {
      this.errorStream.next(new Error(`${alias} endpoint doesn't exist`))
    }

    return this.dataStream.asObservable()
  }

  sendMany(list, delay = 0) {
    if (!Number.isInteger(delay)) {
      this.errorStream.next(new Error(`${delay} is not integer`))
      throw new Error(`${delay} is not integer`)
    }

    Observable
      .from(list)
      .concatMap(x => Observable.of(x).delay(delay))
      .subscribe(([alias, data]) => this.send(alias, data))

    return this.dataStream.asObservable()
  }

  groupByMethod(method) {
    const endpoints = Object
      .keys(this.endpoints)
      .filter(x => this.endpoints[x].method === method)

    return this.groupByName(...endpoints)
  }

  groupByUrl(url) {
    const endpoints = Object
      .keys(this.endpoints)
      .filter(x => this.endpoints[x].url === url)

    return this.groupByName(...endpoints)
  }

  groupByName(...newEndpoints) {
    if (!newEndpoints.length) {
      this.errorStream.next(new Error('endpoints must be provided'))
      throw new Error('endpoints must be provided')
    }

    const endpoints = newEndpoints
      .map(x => {
        if (this.endpoints[x]) {
          return {
            [x]: this.endpoints[x],
          }
        }
        this.errorStream.next(new Error(`endpoint ${x} is not valid`))
        throw new Error(`endpoint ${x} is not valid`)
      })
      .reduceRight((x, acc) => Object.assign(acc, x), {})

    return new StreamAPI(this.type, {
      endpoints,
      config: this.config,
    })
  }

  static setHeader(method, header, value) {
    Object.assign(axios.defaults.headers[method], {
      [header]: value,
    })
  }

  static removeHeader(method, header) {
    delete axios.defaults.headers[method][header]
  }
}

class WSProvider extends StreamProvider {
  constructor(type, endpoint, protocol) {
    super(type, endpoint, protocol)

    this.buffer = []
    this.service = this.fromWebSocket(endpoint, protocol)

    this.dataStream = this.service
      .concatMap(data => Observable.of(data))
      .do(() => void 0, err => this.errorStream.next(err))
      .retry()
      .share()

    this.dataStream.source.connect()
  }

  fromWebSocket(endpoint, protocol) {
    const ws = new WebSocket(endpoint, protocol)
    const close = ws.close.bind(ws)

    const observable = Observable.create((wsObservable) => {
      ws.onerror = (err) => wsObservable.error(err)
      ws.onmessage = (data) => wsObservable.next(data)
      ws.onopen = (state) => wsObservable.next(state)
      ws.onclose = (state) => wsObservable.next(state)
    })

    const buffer$ = Observable
      .interval(1000)
      .takeWhile(() => ws.readyState !== WebSocket.OPEN)
      .flatMap(() => Observable.of(this.buffer))

    const observer = Subscriber.create(
      (data) => {
        buffer$
          .concat(Observable.of(data))
          .subscribe(x => {
            if (x.code) {
              close(x.code, x.reason)
            } else {
              ws.send(JSON.stringify(x))
            }
          })

        if (ws.readyState !== WebSocket.OPEN) {
          this.buffer = [...this.buffer, data]
        }
      },
      (err) => console.error(`Error: ${err}`),
      () => close())

    return Subject.create(observer, observable)
  }

  send(data) {
    this.service.next(data)
    return this.dataStream.asObservable()
  }

  sendMany(list, delay = 0) {
    if (!Number.isInteger(delay)) {
      this.errorStream.next(new Error(`${delay} is not integer`))
      throw new Error(`${delay} is not integer`)
    }

    Observable
      .from(list)
      .concatMap(x => Observable.of(x).delay(delay))
      .subscribe(x => this.send(x))

    return this.dataStream.asObservable()
  }

  close(code = 1000, reason) {
    this.send({ code, reason })
  }
}

class SSEProvider extends StreamProvider {
  constructor(type, endpoint, options) {
    super(type, endpoint, options)

    this.service = SSEProvider.fromSSE(endpoint, options)

    this.dataStream = this.service
      .concatMap(data => Observable.of(data))
      .do(() => void 0, err => this.errorStream.next(err))
      .retry()
      .share()
  }

  static fromSSE(endpoint, options) {
    const sse = new EventSource(endpoint, options)
    const close = sse.close.bind(sse)

    const observable = Observable.create((sseObservable) => {
      sse.onerror = (err) => sseObservable.error(err)
      sse.onmessage = (data) => sseObservable.next(data)
      sse.onopen = (state) => sseObservable.next(state)
    })
    const observer = Subscriber.create(
      () => void 0,
      (err) => console.error(`Error: ${err}`),
      () => close())

    return Subject.create(observer, observable)
  }
  close() {
    this.service.complete()
  }
}

/*
class MutationObserver extends StreamProvider {
  constructor(type, structure) {
    super(type)

    if (!Proxy) {
      throw new Error('Proxy API is not supported')
    }
    const service = new Subject()
    const proxied = new Proxy(structure, {
      set: function (target, prop, value) {
        const newStruct = JSON.parse(JSON.stringify(target));
        const oldStruct = JSON.parse(JSON.stringify(target));

        if (target[prop]) {
          oldStruct[prop] = target[prop]
        }
        newStruct[prop] = value;

        service.next([oldStruct, newStruct])
        return Reflect.set(target, prop, value)
      },
      deleteProperty: function (target, prop) {
        return Reflect.deleteProperty(target, prop)
      },
    })
    Object.assign(this, { structure: proxied }, { dataStream: service.asObservable() })
  }
}
*/
export { HTTPProvider, WSProvider, SSEProvider }