faasjs/faasjs

View on GitHub
packages/knex/src/index.ts

Summary

Maintainability
A
0 mins
Test Coverage
C
70%
/**
 * FaasJS's sql plugin, base on [Knex](https://knexjs.org/).
 *
 * [![License: MIT](https://img.shields.io/npm/l/@faasjs/knex.svg)](https://github.com/faasjs/faasjs/blob/main/packages/faasjs/knex/LICENSE)
 * [![NPM Version](https://img.shields.io/npm/v/@faasjs/knex.svg)](https://www.npmjs.com/package/@faasjs/knex)
 *
 * ## Install
 *
 * ```sh
 * npm install @faasjs/knex
 * ```
 * @packageDocumentation
 */
import {
  type Plugin,
  type Next,
  type DeployData,
  type MountData,
  usePlugin,
  type UseifyPlugin,
  type InvokeData,
} from '@faasjs/func'
import { Logger } from '@faasjs/logger'
import { deepMerge } from '@faasjs/deep_merge'
import knex, { type Knex as OriginKnex } from 'knex'
import { randomUUID } from 'node:crypto'

/**
 * Origin [knex](https://knexjs.org/) instance.
 */
export const originKnex = knex

/**
 * Origin [knex](https://knexjs.org/) type.
 */
export type { OriginKnex }

export type KnexConfig = {
  name?: string
  config?: OriginKnex.Config
}

const Name = 'knex'

declare let global: {
  FaasJS_Knex?: Record<string, Knex>
}

if (!global.FaasJS_Knex) {
  global.FaasJS_Knex = {}
}

export class Knex implements Plugin {
  public readonly type: string = Name
  public readonly name: string = Name
  public config: OriginKnex.Config
  public adapter: OriginKnex
  public query: OriginKnex
  public logger: Logger

  constructor(config?: KnexConfig) {
    if (config) {
      this.name = config.name || this.type
      this.config = config.config || Object.create(null)
    } else {
      this.name = this.type
      this.config = Object.create(null)
    }
  }

  public async onDeploy(data: DeployData, next: Next): Promise<void> {
    const client = (data.config.plugins[this.name].config as OriginKnex.Config)
      .client as string
    if (!client) throw Error('[Knex] client required.')

    data.dependencies['@faasjs/knex'] = '*'
    if (client === 'sqlite3') data.dependencies['better-sqlite3'] = '*'
    else data.dependencies[client] = '*'
    new Logger(this.name).debug(`add dependencies: ${client}`)

    await next()
  }

  public async onMount(data: MountData, next: Next): Promise<void> {
    this.logger = data.logger

    if (global.FaasJS_Knex[this.name]) {
      this.config = global.FaasJS_Knex[this.name].config
      this.adapter = global.FaasJS_Knex[this.name].adapter
      this.query = this.adapter
      this.logger.debug('[%s] use exists adapter', this.name)
      await next()
      return
    }

    const prefix = `SECRET_${this.name.toUpperCase()}_`

    for (let key in process.env)
      if (key.startsWith(prefix)) {
        const value = process.env[key]
        key = key.replace(prefix, '').toLowerCase()
        if (typeof (this.config as any)[key] === 'undefined')
          if (key.startsWith('connection_')) {
            if (!this.config.connection) {
              this.config.connection = Object.create(null)
            }
            ;(this.config as any).connection[key.replace('connection_', '')] =
              value
          } else (this.config as any)[key] = value
      }

    if (data.config.plugins?.[this.name]?.config)
      this.config = deepMerge(
        data.config.plugins[this.name].config,
        this.config
      )

    if (this.config.client === 'sqlite3') {
      this.config.client = 'better-sqlite3'
      this.config.useNullAsDefault = true
    }

    if (this.config.client === 'pg') {
      if (!this.config.pool) this.config.pool = Object.create(null)

      this.config.pool = Object.assign(
        {
          propagateCreateError: false,
          min: 0,
          max: 10,
          acquireTimeoutMillis: 5000,
          idleTimeoutMillis: 30000,
        },
        this.config.pool
      )

      if (
        typeof this.config.connection === 'string' &&
        !this.config.connection.includes('json=true')
      )
        this.config.connection = `${this.config.connection}?json=true`
    }

    this.adapter = knex(this.config)

    if (this.config.client === 'pg') {
      const pg = require('pg')

      for (const t of ['INT2', 'INT4', 'INT8'])
        pg.types.setTypeParser(pg.types.builtins[t], (v: string) =>
          Number.parseInt(v)
        )

      for (const t of ['FLOAT4', 'FLOAT8', 'NUMERIC'])
        pg.types.setTypeParser(pg.types.builtins[t], (v: string) =>
          Number.parseFloat(v)
        )
    }

    this.query = this.adapter

    this.query
      .on('query', ({ sql, __knexQueryUid, bindings }) => {
        if (!__knexQueryUid) return

        this.logger.time(`Knex${this.name}${__knexQueryUid}`)
        this.logger.debug(
          '[%s] [%s] query begin: %s %j',
          this.name,
          __knexQueryUid,
          sql,
          bindings
        )
      })
      .on('query-response', (response, { sql, __knexQueryUid, bindings }) => {
        if (!__knexQueryUid) return

        this.logger.timeEnd(
          `Knex${this.name}${__knexQueryUid}`,
          '[%s] [%s] query done: %s %j %j',
          this.name,
          __knexQueryUid,
          sql,
          bindings,
          response
        )
      })
      .on('query-error', (_, { __knexQueryUid, sql, bindings }) => {
        if (!__knexQueryUid) return

        this.logger.timeEnd(
          `Knex${this.name}${__knexQueryUid}`,
          '[%s] [%s] query failed: %s %j',
          this.name,
          __knexQueryUid,
          sql,
          bindings
        )
      })

    data.logger.debug('[%s] connected', this.name)

    global.FaasJS_Knex[this.name] = this

    await next()
  }

  public async onInvoke(data: InvokeData<any, any, any>, next: Next) {
    this.logger = data.logger
    await next()
  }

  public async raw<TResult = any>(
    sql: string,
    bindings: OriginKnex.RawBinding[] | OriginKnex.ValueDict = []
  ): Promise<OriginKnex.Raw<TResult>> {
    if (!this.adapter) throw Error('[Knex] Client not initialized.')

    return this.adapter.raw<TResult>(sql, bindings)
  }

  /**
   * Wraps a transaction, returning a promise that resolves to the return value of the callback.
   *
   * - Support 'commit' and 'rollback' event.
   */
  public async transaction<TResult = any>(
    scope: (trx: OriginKnex.Transaction<any, any>) => Promise<TResult>,
    config?: OriginKnex.TransactionConfig,
    options?: {
      trx?: OriginKnex.Transaction
    }
  ): Promise<TResult> {
    if (!this.adapter) throw Error(`[${this.name}] Client not initialized.`)

    if (options?.trx) return scope(options.trx)

    const trx = await this.adapter.transaction(config)
    const trxId = randomUUID()
    this.logger.debug('[%s] [%s] transaction begin', this.name, trxId)

    try {
      const result = await scope(trx)

      if (trx.isCompleted()) {
        this.logger.debug(
          '[%s] [%s] transaction has been finished in scope',
          this.name,
          trxId
        )
        return result
      }

      this.logger.debug('[%s] [%s] transaction begin commit', this.name, trxId)
      await trx.commit()
      this.logger.debug(
        '[%s] [%s] transaction committed: %j',
        this.name,
        trxId,
        result
      )
      trx.emit('commit')
      return result
    } catch (error) {
      await trx.rollback(error)
      this.logger.error(
        '[%s] [%s] transaction rollback: %s',
        this.name,
        trxId,
        error
      )
      trx.emit('rollback', error)
      throw error
    }
  }

  public schema(): OriginKnex.SchemaBuilder {
    if (!this.adapter) throw Error(`[${this.name}] Client not initialized.`)

    return this.adapter.schema
  }

  public async quit(): Promise<void> {
    try {
      await global.FaasJS_Knex[this.name].adapter.destroy()
      delete global.FaasJS_Knex[this.name]
    } catch (error) {
      console.error(error)
    }
  }
}

export function useKnex(config?: KnexConfig): UseifyPlugin<Knex> {
  const name = config?.name || Name

  if (global.FaasJS_Knex[name]) return usePlugin<Knex>(global.FaasJS_Knex[name])

  return usePlugin<Knex>(new Knex(config))
}

export function query<TName extends OriginKnex.TableNames>(
  table: TName
): OriginKnex.QueryBuilder<
  OriginKnex.TableType<TName>,
  {
    _base: OriginKnex.ResolveTableType<OriginKnex.TableType<TName>, 'base'>
    _hasSelection: false
    _keys: never
    // biome-ignore lint/complexity/noBannedTypes: <explanation>
    _aliases: {}
    _single: false
    // biome-ignore lint/complexity/noBannedTypes: <explanation>
    _intersectProps: {}
    _unionProps: never
  }[]
>
export function query<TName extends {} = any, TResult = any[]>(
  table: string
): OriginKnex.QueryBuilder<TName, TResult>
export function query<
  // biome-ignore lint/complexity/noBannedTypes: <explanation>
  TName extends OriginKnex.TableNames | {} = any,
  TResult = any[],
>(
  table: TName extends OriginKnex.TableNames ? TName : string
): TName extends OriginKnex.TableNames
  ? OriginKnex.QueryBuilder<
      OriginKnex.TableType<TName>,
      {
        _base: OriginKnex.ResolveTableType<OriginKnex.TableType<TName>, 'base'>
        _hasSelection: false
        _keys: never
        // biome-ignore lint/complexity/noBannedTypes: <explanation>
        _aliases: {}
        _single: false
        // biome-ignore lint/complexity/noBannedTypes: <explanation>
        _intersectProps: {}
        _unionProps: never
      }[]
    >
  : OriginKnex.QueryBuilder<TName, TResult> {
  return useKnex().query<TName, TResult>(
    table
  ) as TName extends OriginKnex.TableNames
    ? OriginKnex.QueryBuilder<
        OriginKnex.TableType<TName>,
        {
          _base: OriginKnex.ResolveTableType<
            OriginKnex.TableType<TName>,
            'base'
          >
          _hasSelection: false
          _keys: never
          // biome-ignore lint/complexity/noBannedTypes: <explanation>
          _aliases: {}
          _single: false
          // biome-ignore lint/complexity/noBannedTypes: <explanation>
          _intersectProps: {}
          _unionProps: never
        }[]
      >
    : OriginKnex.QueryBuilder<TName, TResult>
}

export async function transaction<TResult = any>(
  scope: (trx: OriginKnex.Transaction<any, any>) => Promise<TResult>,
  config?: OriginKnex.TransactionConfig,
  options?: {
    trx?: OriginKnex.Transaction
  }
): Promise<TResult> {
  return useKnex().transaction<TResult>(scope, config, options)
}

export async function raw<TResult = any>(
  sql: string,
  bindings: OriginKnex.RawBinding[] | OriginKnex.ValueDict = []
): Promise<OriginKnex.Raw<TResult>> {
  return useKnex().raw<TResult>(sql, bindings)
}