XYOracleNetwork/sdk-xyo-client-js

View on GitHub
packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusBase.ts

Summary

Maintainability
D
1 day
Test Coverage
import { assertEx } from '@xylabs/assert'
import { Address } from '@xylabs/hex'
import { Base, TypeCheck } from '@xylabs/object'
import { ArchivistInstance, isArchivistInstance } from '@xyo-network/archivist-model'
import { BoundWitness, QueryBoundWitness } from '@xyo-network/boundwitness-model'
import { BoundWitnessDivinerParams, BoundWitnessDivinerQueryPayload } from '@xyo-network/diviner-boundwitness-model'
import { DivinerInstance, isDivinerInstance } from '@xyo-network/diviner-model'
import { ModuleConfig, ModuleIdentifier, ModuleInstance, ResolveHelper } from '@xyo-network/module-model'
import { Mutex } from 'async-mutex'
import { LRUCache } from 'lru-cache'

import { AsyncQueryBusParams } from './model'

const POLLING_FREQUENCY_MIN = 100 as const
const POLLING_FREQUENCY_MAX = 60_000 as const
const POLLING_FREQUENCY_DEFAULT = 1000 as const

export class AsyncQueryBusBase<TParams extends AsyncQueryBusParams = AsyncQueryBusParams> extends Base<TParams> {
  protected _lastState?: LRUCache<Address, number>
  protected _targetConfigs: Record<Address, ModuleConfig> = {}
  protected _targetQueries: Record<Address, string[]> = {}

  private _lastResolveFailure: Record<ModuleIdentifier, number> = {}
  private _queriesArchivist?: ArchivistInstance
  private _queriesDiviner?: DivinerInstance<BoundWitnessDivinerParams, BoundWitnessDivinerQueryPayload, QueryBoundWitness>
  private _reResolveDelay = 1000 * 5 //5 seconds
  private _resolveMutex = new Mutex()
  private _responsesArchivist?: ArchivistInstance
  private _responsesDiviner?: DivinerInstance<BoundWitnessDivinerParams, BoundWitnessDivinerQueryPayload, BoundWitness>

  constructor(params: TParams) {
    super(params)
  }

  get config(): TParams['config'] {
    return this.params.config
  }

  get pollFrequency(): number {
    const frequency = this.config?.pollFrequency ?? POLLING_FREQUENCY_DEFAULT
    if (frequency < POLLING_FREQUENCY_MIN || frequency > POLLING_FREQUENCY_MAX) {
      return POLLING_FREQUENCY_DEFAULT
    }
    return frequency
  }

  get rootModule() {
    return this.params.rootModule
  }

  /**
   * A cache of the last offset of the Diviner process per address
   */
  protected get lastState(): LRUCache<Address, number> {
    const requiredConfig = { max: 1000, ttl: 0 }
    this._lastState = this._lastState ?? new LRUCache<Address, number>(requiredConfig)
    return this._lastState
  }

  async queriesArchivist() {
    return await this._resolveMutex.runExclusive(async () => {
      this._queriesArchivist =
        this._queriesArchivist ??
        (await this.resolve(
          assertEx(this.config?.intersect?.queries?.archivist, () => 'No queries Archivist defined'),
          isArchivistInstance,
        ))
      return this._queriesArchivist
    })
  }

  async queriesDiviner() {
    return await this._resolveMutex.runExclusive(async () => {
      this._queriesDiviner =
        this._queriesDiviner ??
        ((await this.resolve(
          assertEx(this.config?.intersect?.queries?.boundWitnessDiviner, () => 'No queries Diviner defined'),
          isDivinerInstance,
        )) as DivinerInstance<BoundWitnessDivinerParams, BoundWitnessDivinerQueryPayload, QueryBoundWitness>)
      return this._queriesDiviner
    })
  }

  async responsesArchivist() {
    return await this._resolveMutex.runExclusive(async () => {
      this._responsesArchivist =
        this._responsesArchivist ??
        (await this.resolve(
          assertEx(this.config?.intersect?.responses?.archivist, () => 'No responses Archivist defined'),
          isArchivistInstance,
        ))
      return this._responsesArchivist
    })
  }

  async responsesDiviner() {
    return await this._resolveMutex.runExclusive(async () => {
      this._responsesDiviner =
        this._responsesDiviner ??
        ((await this.resolve(
          assertEx(this.config?.intersect?.responses?.boundWitnessDiviner, () => 'No responses Diviner defined'),
          isDivinerInstance,
        )) as DivinerInstance<BoundWitnessDivinerParams, BoundWitnessDivinerQueryPayload, BoundWitness>)
      return this._responsesDiviner
    })
  }

  /**
   * Commit the internal state of the process. This is similar
   * to a transaction completion in a database and should only be called
   * when results have been successfully persisted to the appropriate
   * external stores.
   * @param address The module address to commit the state for
   * @param nextState The state to commit
   */
  protected async commitState(address: Address, nextState: number) {
    await Promise.resolve()
    // TODO: Offload to Archivist/Diviner instead of in-memory
    const lastState = this.lastState.get(address)
    if (lastState && lastState >= nextState) return
    this.lastState.set(address, nextState)
  }

  /**
   * Retrieves the last state of the process. Used to recover state after
   * preemptions, reboots, etc.
   */
  protected async retrieveState(address: Address): Promise<number> {
    await Promise.resolve()
    const state = this.lastState.get(address)
    if (state === undefined) {
      // If this is a boot we can go back a bit in time
      // and begin processing recent commands
      const newState = Date.now() - 1000
      this.lastState.set(address, newState)
      return newState
    } else {
      return state
    }
  }

  private async resolve<T extends ModuleInstance>(id: ModuleIdentifier, typeCheck: TypeCheck<T>): Promise<T | undefined> {
    if (Date.now() - (this._lastResolveFailure[id] ?? 0) < this._reResolveDelay) {
      return
    }
    this._lastResolveFailure[id] = Date.now()
    const resolved = await ResolveHelper.resolveModuleIdentifier(this.rootModule, id)
    if (resolved) {
      if (typeCheck(resolved)) {
        delete this._lastResolveFailure[id]
        return resolved
      } else {
        this.logger?.warn(`Unable to resolve responsesDiviner as correct type [${id}][${resolved?.constructor?.name}]: ${resolved.id}`)
      }
    } else {
      this.logger?.debug(`Unable to resolve queriesArchivist [${id}] [${await ResolveHelper.traceModuleIdentifier(this.rootModule, id)}]`)
    }
  }
}