makeomatic/mservice

View on GitHub
packages/plugin-aws-elasticsearch/src/plugin.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import type * as _ from '@microfleet/plugin-logger'
import type * as __ from '@microfleet/plugin-validator'
import { strict as assert } from 'node:assert'
import { resolve } from 'path'
import retry from 'bluebird-retry'
import { Client, ClientOptions } from '@opensearch-project/opensearch'
import { NotFoundError } from 'common-errors'
import { PluginTypes } from '@microfleet/utils'
import AWS from 'aws-sdk'
import { createAwsElasticsearchConnector } from './utils/createAwsElasticsearchConnector'
import { PluginInterface, Microfleet } from '@microfleet/core-types'
declare module '@microfleet/core-types' {
  interface Microfleet {
    awsElasticsearch: Client
  }

  interface ConfigurationOptional {
    awsElasticsearch: Config
  }
}

export type Config = ClientOptions & {
  accessKeyId: string;
  secretAccessKey: string;
  region?: string;
}

/**
 * Relative priority inside the same plugin group type
 */
export const priority = 0
export const name = 'awsElasticsearch'
export const type = PluginTypes.database
export async function attach(
  this: Microfleet,
  opts: Partial<Config> = {}
): Promise<PluginInterface> {
  assert(this.hasPlugin('logger'), new NotFoundError('logger module must be included'))
  assert(this.hasPlugin('validator'), new NotFoundError('validator module must be included'))

  await this.validator.addLocation(resolve(__dirname, '../schemas'))
  const{ accessKeyId, secretAccessKey, region, ...conf } = this.validator
    .ifError<Config>('awsElasticsearch', opts)

  const awsConfig = new AWS.Config({
    credentials: {
      accessKeyId,
      secretAccessKey,
    },
    region,
  })

  // instead of Constructor for Transport/Connection it says to pass on instances
  this.awsElasticsearch = new Client({
    ...conf,
    ...createAwsElasticsearchConnector(awsConfig),
  })

  return {
    /**
     * @returns aws-elasticsearch connection.
     */
    async connect(this: Microfleet) {
      const reportError = (connectFn: () => Promise<void>) => async () => {
        try {
          await connectFn()
        } catch (e: any) {
          this.log.warn({ err: e }, 'Failed to connect to aws elastic')
          throw e
        }
      }

      const reconnectOpts = {
        interval: 500,
        backoff: 2,
        max_interval: 5000,
        timeout: 60000,
        max_tries: 100,
      }

      await retry(reportError(async () => {
        await this.awsElasticsearch.nodes.info({ human: true })
      }), reconnectOpts)

      this.emit('plugin:connect:awsElasticsearch', this.awsElasticsearch)
      return this.awsElasticsearch
    },

    /**
     * @returns Closes aws-elasticsearch connection.
     */
    async close(this: Microfleet) {
      await this.awsElasticsearch.close()
      this.emit('plugin:close:awsElasticsearch')
    },
  }
}