senecajs/seneca-mongo-store

View on GitHub
mongo-store.js

Summary

Maintainability
D
1 day
Test Coverage
/* Copyright (c) 2010-2020 Richard Rodger and other contributors, MIT License */
'use strict'

const Mongo = require('mongodb')
const MongoClient = Mongo.MongoClient

const { intern } = require('./lib/intern')

const name = 'mongo-store'

/*
native$ = object => use object as query, no meta settings
native$ = array => use first elem as query, second elem as meta settings
*/

const mongo_store = function mongo_store(options) {
  const seneca = this
  let desc

  let dbinst = null
  let dbclient = null
  let collmap = {}

  function error(args, err, cb) {
    if (err) {
      seneca.log.error('entity', err, args, { store: name })

      cb(err)
      return true
    } else return false
  }

  function configure(conf, cb) {
    // defer connection
    // TODO: expose connection action
    //if (!_.isUndefined(conf.connect) && !conf.connect) {
    if (false === conf.connect) {
      return cb()
    }

    // Turn the hash into a mongo uri
    if (!conf.uri) {
      conf.uri = 'mongodb://'
      conf.uri += conf.username ? conf.username : ''
      conf.uri += conf.password ? ':' + conf.password + '@' : ''
      conf.uri += conf.host || conf.server
      conf.uri += conf.port ? ':' + conf.port : ':27017'
    }

    conf.db = conf.db || conf.name

    // Connect using the URI
    return MongoClient.connect(
      conf.uri,

      { useUnifiedTopology: true },

      function (err, client) {
        if (err) {
          return seneca.die('connect', err, conf)
        }
        dbclient = client
        // Set the instance to use throughout the plugin
        dbinst = client.db(conf.db)
        seneca.log.debug('init', 'db open', conf.db)
        cb(null)
      }
    )
  }

  function getcoll(args, ent, cb) {
    const canon = ent.canon$({ object: true })
    const collname = (canon.base ? canon.base + '_' : '') + canon.name

    if (!collmap[collname]) {
      dbinst.collection(collname, function (err, coll) {
        if (!error(args, err, cb)) {
          collmap[collname] = coll
          cb(null, coll)
        }
      })
    } else {
      cb(null, collmap[collname])
    }
  }

  const store = {
    name: name,

    close: function (args, cb) {
      if (dbclient) {
        dbclient.close(cb)
      } else return cb()
    },

    save: function (msg, done) {
      return getcoll(msg, msg.ent, function (err, coll) {
        if (error(msg, err, done)) {
          return
        }

        const is_update = null != msg.ent.id

        if (is_update) {
          return update(msg, coll, done)
        }

        return create(msg, coll, done)
      })

      function create(msg, coll, done) {
        const upsert_fields = isUpsert(msg)

        if (null == upsert_fields) {
          return createNew(msg, coll, done)
        }

        return doUpsert(upsert_fields, msg, coll, done)

        function isUpsert(msg) {
          if (!Array.isArray(msg.q.upsert$)) {
            return null
          }

          const upsert_fields = msg.q.upsert$.filter((p) => !p.includes('$'))
          const public_entdata = msg.ent.data$(false)

          const is_upsert =
            upsert_fields.length > 0 &&
            upsert_fields.every((p) => p in public_entdata)

          return is_upsert ? upsert_fields : null
        }

        function doUpsert(upsert_fields, msg, coll, done) {
          const filter_by = upsert_fields.reduce((acc, field) => {
            acc[field] = msg.ent[field]
            return acc
          }, {})

          const public_entdata = msg.ent.data$(false)
          const replacement = { $set: public_entdata }
          const new_id = intern.ensure_id(msg.ent, options)

          if (null != new_id) {
            replacement.$setOnInsert = { _id: new_id }
          }

          return intern.attempt_upsert(
            coll,
            filter_by,
            replacement,
            (err, update) => {
              if (error(msg, err, done)) {
                return
              }

              const doc = update.value
              const { ent } = msg

              return finalizeSave('save/upsert', doc, ent, seneca, done)
            }
          )
        }

        function createNew(msg, coll, done) {
          const new_doc = (function () {
            const public_entdata = msg.ent.data$(false)
            const id = intern.ensure_id(msg.ent, options)

            const new_doc = Object.assign({}, public_entdata)

            if (null != id) {
              new_doc._id = id
            }

            return new_doc
          })()

          return coll.insertOne(new_doc, function (err, inserts) {
            if (error(msg, err, done)) {
              return
            }

            const doc = inserts.ops[0]
            const { ent } = msg

            return finalizeSave('save/insert', doc, ent, seneca, done)
          })
        }
      }

      function update(msg, coll, done) {
        const ent = msg.ent
        const entp = ent.data$(false)

        const q = { _id: intern.makeid(ent.id) }
        delete entp.id

        let set = entp
        let func = 'findOneAndReplace'

        if (intern.should_merge(ent, options)) {
          set = { $set: entp }
          func = 'findOneAndUpdate'
        }

        coll[func](
          q,
          set,
          { upsert: true, returnOriginal: false },
          function (err, update) {
            if (error(msg, err, done)) {
              return
            }

            const doc = update.value
            const { ent } = msg

            return finalizeSave('save/update', doc, ent, seneca, done)
          }
        )
      }

      function finalizeSave(operation_name, doc, ent, seneca, done) {
        const fent = intern.makeent(doc, ent, seneca)

        seneca.log.debug(operation_name, ent, desc)

        return done(null, fent)
      }
    },

    load: function (args, cb) {
      const qent = args.qent
      const q = args.q

      return getcoll(args, qent, function (err, coll) {
        if (!error(args, err, cb)) {
          const mq = intern.metaquery(q)
          const qq = intern.fixquery(q, seneca, options)

          return coll.findOne(qq, mq, function (err, entp) {
            if (!error(args, err, cb)) {
              let fent = null
              if (entp) {
                entp.id = intern.idstr(entp._id)
                delete entp._id
                fent = qent.make$(entp)
              }
              seneca.log.debug('load', q, fent, desc)
              cb(null, fent)
            }
          })
        }
      })
    },

    list: function (args, cb) {
      const qent = args.qent
      const q = args.q

      return getcoll(args, qent, function (err, coll) {
        if (!error(args, err, cb)) {
          const mq = intern.metaquery(q)
          const qq = intern.fixquery(q, seneca, options)

          return coll.find(qq, mq, function (err, cur) {
            if (!error(args, err, cb)) {
              const list = []

              cur.each(function (err, entp) {
                if (!error(args, err, cb)) {
                  if (entp) {
                    let fent = null
                    entp.id = intern.idstr(entp._id)
                    delete entp._id
                    fent = qent.make$(entp)
                    list.push(fent)
                  } else {
                    seneca.log.debug('list', q, list.length, list[0], desc)
                    cb(null, list)
                  }
                }
              })
            }
          })
        }
      })
    },

    remove: function (args, cb) {
      const qent = args.qent
      const q = args.q

      const all = q.all$ // default false
      const load = null == q.load$ ? false : q.load$ // default false

      getcoll(args, qent, function (err, coll) {
        if (!error(args, err, cb)) {
          const mq = intern.metaquery(q)
          const qq = intern.fixquery(q, seneca, options)

          if (all) {
            return coll.find(qq, mq, function (err, cur) {
              if (!error(args, err, cb)) {
                const list = []
                const toDelete = []

                cur.each(function (err, entp) {
                  if (!error(args, err, cb)) {
                    if (entp) {
                      let fent = null
                      if (entp) {
                        toDelete.push(entp._id)
                        entp.id = intern.idstr(entp._id)
                        delete entp._id
                        fent = qent.make$(entp)
                      }
                      list.push(fent)
                    } else {
                      coll.deleteMany(
                        { _id: { $in: toDelete } },
                        function (err) {
                          seneca.log.debug('remove/all', q, desc)
                          cb(err, null)
                        }
                      )
                    }
                  }
                })
              }
            })
          } else {
            return coll.findOne(qq, mq, function (err, entp) {
              if (!error(args, err, cb)) {
                if (entp) {
                  return coll.deleteOne({ _id: entp._id }, {}, function (err) {
                    seneca.log.debug('remove/one', q, entp, desc)
                    const ent = load ? entp : null
                    cb(err, ent)
                  })
                } else cb(null)
              }
            })
          }
        }
      })
    },

    native: function (args, done) {
      dbinst.collection('seneca', function (err, coll) {
        if (!error(args, err, done)) {
          coll.findOne({}, {}, function (err) {
            if (!error(args, err, done)) {
              done(null, dbinst)
            } else {
              done(err)
            }
          })
        } else {
          done(err)
        }
      })
    },
  }

  const meta = seneca.store.init(seneca, options, store)
  desc = meta.desc

  seneca.add({ init: store.name, tag: meta.tag }, function (args, done) {
    configure(options, function (err) {
      if (err) {
        return seneca.die('store', err, { store: store.name, desc: desc })
      }

      return done()
    })
  })

  return {
    name: store.name,
    tag: meta.tag,
    export: {
      mongo: () => dbinst,
    },
  }
}

mongo_store.intern = intern

module.exports = mongo_store