XYOracleNetwork/sdk-xyo-client-js

View on GitHub
packages/modules/packages/bridge/packages/pub-sub/src/AsyncQueryBus/AsyncQueryBusHost.ts

Summary

Maintainability
C
1 day
Test Coverage
import { containsAll } from '@xylabs/array'
import { assertEx } from '@xylabs/assert'
import { Address } from '@xylabs/hex'
import { clearTimeoutEx, setTimeoutEx } from '@xylabs/timer'
import { isQueryBoundWitnessWithMeta, QueryBoundWitness } from '@xyo-network/boundwitness-model'
import { isBridgeInstance } from '@xyo-network/bridge-model'
import { BoundWitnessDivinerQueryPayload, BoundWitnessDivinerQuerySchema } from '@xyo-network/diviner-boundwitness-model'
import {
  asModuleInstance,
  ModuleConfigSchema,
  ModuleIdentifier,
  ModuleInstance,
  resolveAddressToInstance,
  ResolveHelper,
} from '@xyo-network/module-model'
import { PayloadBuilder } from '@xyo-network/payload-builder'
import { Schema, WithMeta } from '@xyo-network/payload-model'

import { AsyncQueryBusBase } from './AsyncQueryBusBase'
import { AsyncQueryBusHostParams } from './model'

export interface ExposeOptions {
  allowedQueries?: Schema[]
  failOnAlreadyExposed?: boolean
}

const IDLE_POLLING_FREQUENCY_RATIO_MIN = 4 as const
const IDLE_POLLING_FREQUENCY_RATIO_MAX = 64 as const
const IDLE_POLLING_FREQUENCY_RATIO_DEFAULT = 16 as const

const IDLE_THRESHOLD_RATIO_MIN = 4 as const
const IDLE_THRESHOLD_RATIO_MAX = 64 as const
const IDLE_THRESHOLD_RATIO_DEFAULT = 16 as const

export class AsyncQueryBusHost<TParams extends AsyncQueryBusHostParams = AsyncQueryBusHostParams> extends AsyncQueryBusBase<TParams> {
  protected _exposedAddresses = new Set<Address>()
  private _exposeOptions: Record<Address, ExposeOptions> = {}
  private _idle = false
  private _lastQueryTime?: number
  private _pollId?: string

  constructor(params: TParams) {
    super(params)
  }

  get exposedAddresses() {
    return this._exposedAddresses
  }

  get idlePollFrequency() {
    const frequency = this.config?.idlePollFrequency ?? IDLE_POLLING_FREQUENCY_RATIO_DEFAULT * this.pollFrequency
    if (frequency < this.pollFrequency * IDLE_POLLING_FREQUENCY_RATIO_MIN) {
      return IDLE_POLLING_FREQUENCY_RATIO_MIN * this.pollFrequency
    }
    if (frequency > this.pollFrequency * IDLE_POLLING_FREQUENCY_RATIO_MAX) {
      return IDLE_POLLING_FREQUENCY_RATIO_MAX * this.pollFrequency
    }
    return frequency
  }

  get idleThreshold() {
    const threshold = this.config?.idleThreshold ?? IDLE_THRESHOLD_RATIO_DEFAULT * this.idlePollFrequency
    if (threshold < this.idlePollFrequency * IDLE_THRESHOLD_RATIO_MIN) {
      return IDLE_POLLING_FREQUENCY_RATIO_MIN * this.pollFrequency
    }
    if (threshold > this.idlePollFrequency * IDLE_THRESHOLD_RATIO_MAX) {
      return IDLE_POLLING_FREQUENCY_RATIO_MAX * this.pollFrequency
    }
    return threshold
  }

  get perAddressBatchQueryLimit(): number {
    return this.config?.perAddressBatchQueryLimit ?? 10
  }

  get started() {
    return !!this._pollId
  }

  expose(module: ModuleInstance, options?: ExposeOptions) {
    const { failOnAlreadyExposed } = options ?? {}
    if (isBridgeInstance(module)) {
      this.logger?.warn(`Attempted to expose a BridgeModule [${module.id}] - Not exposing`)
    } else {
      assertEx(
        !failOnAlreadyExposed || !this._exposedAddresses.has(module.address),
        () => `Address already exposed: ${module.id} [${module.address}]`,
      )
      this._exposedAddresses.add(module.address)
      this._exposeOptions[module.address] = { ...options }
      this.logger?.debug(`${module.id} exposed [${module.address}]`)
      return module
    }
  }

  async listeningModules(): Promise<ModuleInstance[]> {
    const exposedModules = [...(this.config?.listeningModules ?? []), ...this.exposedAddresses.values()]
    const mods = await Promise.all(
      exposedModules.map(async (exposedModule) =>
        assertEx(
          asModuleInstance(await resolveAddressToInstance(this.rootModule, exposedModule)),
          () => `Unable to resolve listeningModule [${exposedModule}]`,
        ),
      ),
    )
    return mods
  }

  start() {
    if (this.started) {
      console.warn('AsyncQueryBus starting when already started')
    }
    this.poll()
  }

  stop() {
    if (!this.started) {
      console.warn('AsyncQueryBus stopping when already stopped')
    }
    if (this._pollId) clearTimeoutEx(this._pollId)
    this._pollId = undefined
  }

  async unexpose(id: ModuleIdentifier, validate = true) {
    const module = asModuleInstance(await this.rootModule.resolve(id, { maxDepth: 10 }))
    if (module) {
      assertEx(!validate || this._exposedAddresses.has(module.address), () => `Address not exposed [${module.address}][${module.id}]`)
      this._exposedAddresses.delete(module.address)
      delete this._exposeOptions[module.address]
      this.logger?.debug(`${module.address} [${module.id}] unexposed`)
    }
    return module
  }

  protected callLocalModule = async (localModule: ModuleInstance, query: WithMeta<QueryBoundWitness>) => {
    this._idle = false
    this._lastQueryTime = Date.now()
    const localModuleName = localModule.id
    const queryArchivist = assertEx(
      await this.queriesArchivist(),
      () => `Unable to contact queriesArchivist [${this.config?.intersect?.queries?.archivist}]`,
    )
    const responsesArchivist = assertEx(
      await this.responsesArchivist(),
      () => `Unable to contact responsesArchivist [${this.config?.intersect?.queries?.archivist}]`,
    )
    const queryDestination = (query.$meta as { destination?: string[] })?.destination
    if (queryDestination && queryDestination?.includes(localModule.address)) {
      // Find the query
      const queryIndex = query.payload_hashes.indexOf(query.query)
      if (queryIndex !== -1) {
        const querySchema = query.payload_schemas[queryIndex]
        // If the destination can process this type of query
        if (localModule.queries.includes(querySchema)) {
          // Get the associated payloads
          const queryPayloads = await queryArchivist.get(query.payload_hashes)
          this.params.onQueryFulfillStarted?.({ payloads: queryPayloads, query })
          const queryPayloadsDict = await PayloadBuilder.toAllHashMap(queryPayloads)
          const queryHash = (await PayloadBuilder.build(query)).$hash
          // Check that we have all the arguments for the command
          if (!containsAll(Object.keys(queryPayloadsDict), query.payload_hashes)) {
            this.logger?.error(`Error processing command ${queryHash} for module ${localModuleName}, missing payloads`)
            return
          }
          try {
            // Issue the query against module
            const querySchema = queryPayloadsDict[query.query].schema
            this.logger?.debug(`Issuing query ${querySchema} (${queryHash}) addressed to module: ${localModuleName}`)
            const result = await localModule.query(query, queryPayloads, {
              allowedQueries: this._exposeOptions[localModule.address]?.allowedQueries,
              schema: ModuleConfigSchema,
            })
            const [bw, payloads, errors] = result
            this.logger?.debug(`Replying to query ${queryHash} addressed to module: ${localModuleName}`)
            const insertResult = await responsesArchivist.insert([bw, ...payloads, ...errors])
            // NOTE: If all archivists support the contract that numPayloads inserted === numPayloads returned we can
            // do some deeper assertions here like lenIn === lenOut, but for now this should be good enough since BWs
            // should always be unique causing at least one insertion
            if (insertResult.length === 0) {
              this.logger?.error(`Error replying to query ${queryHash} addressed to module: ${localModuleName}`)
            }
            if (query?.timestamp) {
              // TODO: This needs to be thought through as we can't use a distributed timestamp
              // because of collisions. We need to ensure we are using the timestamp of the store
              // so there's no chance of multiple commands at the same time
              await this.commitState(localModule.address, query.timestamp)
            }
            this.params.onQueryFulfillFinished?.({ payloads: queryPayloads, query, result, status: 'success' })
          } catch (error) {
            this.params.onQueryFulfillFinished?.({ payloads: queryPayloads, query, status: 'failure' })
            this.logger?.error(`Error processing query ${queryHash} for module ${localModuleName}: ${error}`)
          }
        }
      }
    }
  }

  /**
   * Finds unprocessed commands addressed to the supplied address
   * @param address The address to find commands for
   */
  protected findQueriesToAddress = async (address: Address) => {
    const queriesDivinerId = assertEx(this.config?.intersect?.queries?.boundWitnessDiviner, () => 'No queries Diviner defined')
    const queriesBoundWitnessDiviner = await this.queriesDiviner()
    if (queriesBoundWitnessDiviner) {
      // Retrieve last offset from state store
      const prevState = await this.retrieveState(address)
      const destination = [address]
      const limit = this.perAddressBatchQueryLimit
      // Filter for commands to us by destination address
      const divinerQuery: BoundWitnessDivinerQueryPayload = {
        destination,
        limit,
        order: 'asc',
        schema: BoundWitnessDivinerQuerySchema,
        timestamp: prevState + 1,
      }
      const result = await queriesBoundWitnessDiviner.divine([divinerQuery])
      const queries = result.filter(isQueryBoundWitnessWithMeta)
      const nextState = queries.length > 0 ? Math.max(...queries.map((c) => c.timestamp ?? prevState)) + 1 : Date.now()
      // TODO: This needs to be thought through as we can't use a distributed timestamp
      // because of collisions. We need to use the timestamp of the store so there's no
      // chance of multiple commands at the same time
      await this.commitState(address, nextState)
      this.logger?.debug('findQueriesToAddress', address, prevState, nextState)
      return queries
    } else {
      this.logger?.warn(
        `Unable to resolve queriesBoundWitnessDiviner [${queriesDivinerId}] [${await ResolveHelper.traceModuleIdentifier(this.rootModule, queriesDivinerId)}]`,
      )
    }
  }

  /**
   * Runs the background divine process on a loop with a delay
   * specified by the `config.pollFrequency`
   */
  private poll() {
    this._pollId = setTimeoutEx(
      async () => {
        try {
          await this.processIncomingQueries()
        } catch (e) {
          this.logger?.error?.(`Error in main loop: ${e}`)
        } finally {
          if (this._pollId) clearTimeoutEx(this._pollId)
          this._pollId = undefined
          this.poll()
        }
        const now = Date.now()
        if (this.idleThreshold < now - (this._lastQueryTime ?? now)) {
          this._idle = true
        }
      },
      this._idle ? this.idlePollFrequency : this.pollFrequency,
    )
  }

  /**
   * Background process for checking for inbound queries
   */
  private processIncomingQueries = async () => {
    this.logger?.debug('Checking for inbound queries')
    // Check for any queries that have been issued and have not been responded to
    const localModules = await this.listeningModules()

    // TODO: Do in throttled batches
    await Promise.allSettled(
      localModules.map(async (localModule) => {
        try {
          const localModuleName = localModule.id
          this.logger?.debug(`Checking for inbound queries to ${localModuleName} [${localModule.address}]`)
          const queries = (await this.findQueriesToAddress(localModule.address)) ?? []
          if (queries.length === 0) return
          this.logger?.debug(`Found queries addressed to local module: ${localModuleName}`)
          for (const query of queries) {
            await this.callLocalModule(localModule, query)
          }
        } catch (error) {
          this.logger?.error(`Error processing queries for address ${localModule.address}: ${error}`)
        }
      }),
    )
  }
}