senecajs/seneca-s3-store

View on GitHub
src/s3-store.ts

Summary

Maintainability
B
4 hrs
Test Coverage
/* Copyright (c) 2020-2023 Richard Rodger, MIT License */

import Path from 'path'
import Fsp from 'fs/promises'

import chokidar from 'chokidar'

import { Default, Skip, Any, Exact, Child, Empty, One } from 'gubu'

import {
  S3Client,
  PutObjectCommand,
  GetObjectCommand,
  DeleteObjectCommand,
  HeadObjectCommand,
} from '@aws-sdk/client-s3'

import { getSignedUrl } from '@aws-sdk/s3-request-presigner'

// TODO: ent fields as dot paths

s3_store.defaults = {
  debug: false,

  prefix: Empty('seneca/db01/'),
  suffix: Empty('.json'),
  folder: Any(),
  s3: {},

  // keys are canon strings
  map: Skip({}),

  shared: Skip({}),

  // Use a local folder to simulate S3 for local dev and testing.
  local: {
    active: false,
    folder: '',
    watch: false,
    // suffixMode: 'none', // TODO: FIX: Default('none', Exact('none', 'genid')),
    suffixMode: Default('none', Exact('none', 'genid')),

    // { [path-prefix]: msg }
    onObjectCreated: Skip(Child(One(String, Object))),
  },

  // keys are canon strings
  ent: Default(
    {},
    Child({
      // Save a sub array as JSONL. NOTE: Other fields are LOST!
      jsonl: Skip(String),

      // Save a sub field as binary. NOTE: Other fields are LOST!
      bin: Skip(String),
    }),
  ),
}

const PLUGIN = '@seneca/s3-store'

async function s3_store(this: any, options: any) {
  const seneca = this
  const init = seneca.export('entity/init')

  let generate_id = options.generate_id || seneca.export('entity/generate_id')
  let aws_s3: any = null
  let s3_shared_options = {
    Bucket: '!not-a-bucket!',
    ...options.shared,
  }

  let local_folder: string = ''

  seneca.init(function (reply: () => void) {
    if (options.local.active) {
      let folder: string = options.local.folder
      local_folder =
        'genid' == options.local.suffixMode
          ? folder + '-' + seneca.util.Nid()
          : folder

      if (options.local.watch) {
        const localFolder = Path.resolve(options.local.folder)

        // Watch for local file changes and trigger upload logic.
        const watcher = chokidar.watch(localFolder, {
          ignoreInitial: true,
        })

        const onObjectCreated = options.local.onObjectCreated

        if (onObjectCreated) {
          watcher.on('add', (path: string) => {
            const keyPath = path.substring(localFolder.length + 1)
            for (let prefix in onObjectCreated) {
              if (keyPath.startsWith(prefix)) {
                const event = {
                  Records: [
                    {
                      s3: {
                        object: {
                          key: keyPath,
                        },
                      },
                    },
                  ],
                }

                seneca.act(onObjectCreated[prefix], { event })
              }
            }
          })
          // .on('error', error => console.log(`WATCH error: ${error}`))
          // .on('ready', () => console.log('WATCH initial scan complete. ready for changes'));
        }
      }
    } else {
      const s3_opts = {
        s3ForcePathStyle: true,
        ...options.s3,
      }
      aws_s3 = new S3Client(s3_opts)
    }

    reply()
  })

  let store = {
    name: 's3-store',
    save: function (msg: any, reply: any) {
      let canon = msg.ent.entity$
      let id = '' + (msg.ent.id || msg.ent.id$ || generate_id(msg.ent))
      let d = msg.ent.data$()
      d.id = id

      let entSpec = options.ent[canon]
      let jsonl = entSpec?.jsonl || msg.jsonl$ || msg.q.jsonl$
      let bin = entSpec?.bin || msg.bin$ || msg.q.bin$

      let s3id = make_s3id(id, msg.ent, options, bin)
      let Body: Buffer | undefined = undefined

      if (entSpec || jsonl || bin) {
        // JSONL files
        if ('string' === typeof jsonl && '' !== jsonl) {
          let arr = msg.ent[jsonl]
          if (!Array.isArray(arr)) {
            throw new Error(
              's3-store: option ent.jsonl array field not found: ' + jsonl,
            )
          }

          let content = arr.map((n: any) => JSON.stringify(n)).join('\n') + '\n'
          Body = Buffer.from(content)
        }

        // Binary files
        else if ('string' === typeof bin && '' !== bin) {
          let dataRef = msg.ent[bin]
          if (null == dataRef) {
            throw new Error(
              's3-store: option ent.bin data field not found: ' + bin,
            )
          }

          let data = dataRef

          // A function can be used to 'hide" very large data.
          if ('function' === typeof dataRef) {
            data = dataRef()
          }

          Body = Buffer.from(data)
        }
      }

      if (null == Body) {
        let dj = JSON.stringify(d)
        Body = Buffer.from(dj)
      }

      let ento = msg.ent.make$().data$(d)

      // Local file
      if (options.local.active) {
        let full: string = Path.join(local_folder, s3id || id)
        let path: string = Path.dirname(full)

        if (options.debug) {
          console.log(PLUGIN, 'save', path, Body.length)
        }

        Fsp.mkdir(path, { recursive: true })
          .then(() => {
            Fsp.writeFile(full, Body as any)
              .then((_res: any) => {
                reply(null, ento)
              })
              .catch((err: any) => {
                reply(err)
              })
          })
          .catch((err: any) => {
            reply(err)
          })
      }

      // AWS S3
      else {
        const s3cmd = new PutObjectCommand({
          ...s3_shared_options,
          Key: s3id,
          Body,
        })

        aws_s3
          .send(s3cmd)
          .then((_res: any) => {
            reply(null, ento)
          })
          .catch((err: any) => {
            reply(err)
          })
      }
    },

    load: function (msg: any, reply: any) {
      let canon = msg.ent.entity$
      let qent = msg.qent
      let id = '' + msg.q.id
      let entSpec = options.ent[canon]
      let output: 'ent' | 'jsonl' | 'bin' = 'ent'
      let jsonl = entSpec?.jsonl || msg.jsonl$ || msg.q.jsonl$
      let bin = entSpec?.bin || msg.bin$ || msg.q.bin$
      let exists = !!(msg.exists$ || msg.q.exists$)

      let s3id = make_s3id(id, msg.ent, options, bin)

      output = jsonl && '' != jsonl ? 'jsonl' : bin && '' != bin ? 'bin' : 'ent'

      function replyEnt(body: any) {
        let entdata: any = {}

        if (null != body) {
          if ('bin' !== output) {
            body = body.toString('utf-8')
          }

          if ('jsonl' === output) {
            entdata[jsonl] = body
              .split('\n')
              .filter((n: string) => '' !== n)
              .map((n: string) => JSON.parse(n))
          } else if ('bin' === output) {
            entdata[bin] = body
          } else {
            entdata = JSON.parse(body)
          }
        }

        entdata.id = id

        let ento = qent.make$().data$(entdata)
        reply(null, ento)
      }

      // Local file
      if (options.local.active) {
        let full: string = Path.join(local_folder, s3id || id)

        if (options.debug) {
          console.log(PLUGIN, 'load', full)
        }

        if (exists) {
          Fsp.stat(full)
            .then(() => {
              replyEnt(null)
            })
            .catch((err: any) => {
              if ('ENOENT' == err.code) {
                return reply()
              }
              reply(err)
            })
        } else {
          Fsp.readFile(full)
            .then((body: any) => {
              replyEnt(body)
            })
            .catch((err: any) => {
              if ('ENOENT' == err.code) {
                return reply()
              }
              reply(err)
            })
        }
      }

      // AWS S3
      else {
        if (exists) {
          const s3cmd = new HeadObjectCommand({
            ...s3_shared_options,
            Key: s3id,
          })

          aws_s3
            .send(s3cmd)
            .then(() => {
              replyEnt(null)
            })
            .catch((err: any) => {
              if ('NotFound' === err.name) {
                return reply()
              }

              reply(err)
            })
        } else {
          const s3cmd = new GetObjectCommand({
            ...s3_shared_options,
            Key: s3id,
          })

          aws_s3
            .send(s3cmd)
            .then((res: any) => {
              destream(output, res.Body)
                .then((body: any) => {
                  replyEnt(body)
                })
                .catch((err) => reply(err))
            })
            .catch((err: any) => {
              if ('NoSuchKey' === err.Code) {
                return reply()
              }

              reply(err)
            })
        }
      }
    },

    // NOTE: S3 folder listing not supported yet.
    list: function (_msg: any, reply: any) {
      reply([])
    },

    remove: function (msg: any, reply: any) {
      let canon = (msg.ent || msg.qent).entity$
      let id = '' + msg.q.id
      let entSpec = options.ent[canon]
      let bin = entSpec?.bin || msg.bin$ || msg.q.bin$

      let s3id = make_s3id(id, msg.ent, options, bin)

      // Local file
      if (options.local.active) {
        let full: string = Path.join(local_folder, s3id || id)

        Fsp.unlink(full)
          .then((_res: any) => {
            reply()
          })
          .catch((err: any) => {
            if ('ENOENT' == err.code) {
              return reply()
            }
            reply(err)
          })
      } else {
        const s3cmd = new DeleteObjectCommand({
          ...s3_shared_options,
          Key: s3id,
        })

        aws_s3
          .send(s3cmd)
          .then((_res: any) => {
            reply()
          })
          .catch((err: any) => {
            if ('NoSuchKey' === err.Code) {
              return reply()
            }

            reply(err)
          })
      }
    },

    close: function (_msg: any, reply: () => void) {
      reply()
    },

    native: function (_msg: any, reply: any) {
      reply({ client: aws_s3, local: { ...options.local } })
    },
  }

  let meta = init(seneca, options, store)

  seneca.message(
    'cloud:aws,service:store,get:url,kind:upload',
    {
      bucket: String,
      filepath: String,
      expire: Number,
    },
    get_upload_url,
  )

  seneca.message(
    'cloud:aws,service:store,get:url,kind:download',
    {
      bucket: String,
      filepath: String,
      expire: Number,
    },
    get_download_url,
  )

  async function get_upload_url(msg: any) {
    const bucket = msg.bucket
    const filepath = msg.filepath
    const expire = msg.expire

    const command = new PutObjectCommand({
      Bucket: bucket,
      Key: filepath,
    })
    const url: string = await getSignedUrl(aws_s3, command, {
      expiresIn: expire,
    })

    return {
      url,
      bucket,
      filepath,
      expire,
    }
  }

  async function get_download_url(msg: any) {
    const bucket = msg.bucket
    const filepath = msg.filepath
    const expire = msg.expire

    const command = new GetObjectCommand({
      Bucket: bucket,
      Key: filepath,
    })
    const url: string = await getSignedUrl(aws_s3, command, {
      expiresIn: expire,
    })
    return {
      url,
      bucket,
      filepath,
      expire,
    }
  }

  const makeGatewayHandler = (msgin: object | string) => {
    const msg = seneca.util.Jsonic(msgin)
    const gatewayHandler = {
      name: 's3',
      match: (trigger: { record: any }) => {
        let matched = 'aws:s3' === trigger.record.eventSource
        return matched
      },
      process: async function (
        this: typeof seneca,
        trigger: { record: any; event: any },
        gateway: Function,
      ) {
        let { record, event } = trigger
        return gateway({ ...msg, record, event }, trigger)
      },
    }
    return gatewayHandler
  }

  return {
    name: store.name,
    tag: meta.tag,
    exports: {
      native: aws_s3,
      makeGatewayHandler,
    },
  }
}

function make_s3id(id: string, ent: any, options: any, bin: boolean) {
  let s3id =
    null == id
      ? null
      : (null == options.folder
          ? options.prefix + ent.entity$
          : options.folder) +
        ('' == options.folder ? '' : '/') +
        id +
        (bin ? '' : options.suffix)

  return s3id
}

async function destream(output: 'ent' | 'jsonl' | 'bin', stream: any) {
  return new Promise((resolve, reject) => {
    const chunks: any = []
    stream.on('data', (chunk: any) => chunks.push(chunk))
    stream.on('error', reject)
    stream.on('end', () => {
      let buffer = Buffer.concat(chunks)
      if ('bin' === output) {
        resolve(buffer)
      } else {
        resolve(buffer.toString('utf-8'))
      }
    })
  })
}

Object.defineProperty(s3_store, 'name', { value: 's3-store' })
module.exports = s3_store