feathersjs/feathers

View on GitHub
packages/mongodb/src/adapter.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import {
  ObjectId,
  Collection,
  FindOptions,
  BulkWriteOptions,
  InsertOneOptions,
  DeleteOptions,
  CountDocumentsOptions,
  ReplaceOptions,
  Document
} from 'mongodb'
import { BadRequest, MethodNotAllowed, NotFound } from '@feathersjs/errors'
import { _ } from '@feathersjs/commons'
import {
  AdapterBase,
  select,
  AdapterParams,
  AdapterServiceOptions,
  PaginationOptions,
  AdapterQuery,
  getLimit
} from '@feathersjs/adapter-commons'
import { Id, Paginated } from '@feathersjs/feathers'
import { errorHandler } from './error-handler'

export interface MongoDBAdapterOptions extends AdapterServiceOptions {
  Model: Collection | Promise<Collection>
  disableObjectify?: boolean
  useEstimatedDocumentCount?: boolean
}

export interface MongoDBAdapterParams<Q = AdapterQuery>
  extends AdapterParams<Q, Partial<MongoDBAdapterOptions>> {
  pipeline?: Document[]
  mongodb?:
    | BulkWriteOptions
    | FindOptions
    | InsertOneOptions
    | DeleteOptions
    | CountDocumentsOptions
    | ReplaceOptions
}

export type AdapterId = Id | ObjectId

export type NullableAdapterId = AdapterId | null

// Create the service.
export class MongoDbAdapter<
  Result,
  Data = Partial<Result>,
  ServiceParams extends MongoDBAdapterParams<any> = MongoDBAdapterParams,
  PatchData = Partial<Data>
> extends AdapterBase<Result, Data, PatchData, ServiceParams, MongoDBAdapterOptions, AdapterId> {
  constructor(options: MongoDBAdapterOptions) {
    if (!options) {
      throw new Error('MongoDB options have to be provided')
    }

    super({
      id: '_id',
      ...options
    })
  }

  getObjectId(id: AdapterId) {
    if (this.options.disableObjectify) {
      return id
    }

    if (this.id === '_id' && ObjectId.isValid(id)) {
      id = new ObjectId(id.toString())
    }

    return id
  }

  filterQuery(id: NullableAdapterId, params: ServiceParams) {
    const options = this.getOptions(params)
    const { $select, $sort, $limit: _limit, $skip = 0, ...query } = (params.query || {}) as AdapterQuery
    const $limit = getLimit(_limit, options.paginate)
    if (id !== null) {
      query.$and = (query.$and || []).concat({
        [this.id]: this.getObjectId(id)
      })
    }

    if (query[this.id]) {
      query[this.id] = this.getObjectId(query[this.id])
    }

    return {
      filters: { $select, $sort, $limit, $skip },
      query
    }
  }

  getModel(params: ServiceParams = {} as ServiceParams) {
    const { Model } = this.getOptions(params)
    return Promise.resolve(Model)
  }

  async findRaw(params: ServiceParams) {
    const { filters, query } = this.filterQuery(null, params)
    const model = await this.getModel(params)
    const q = model.find(query, { ...params.mongodb })

    if (filters.$select !== undefined) {
      q.project(this.getSelect(filters.$select))
    }

    if (filters.$sort !== undefined) {
      q.sort(filters.$sort)
    }

    if (filters.$skip !== undefined) {
      q.skip(filters.$skip)
    }

    if (filters.$limit !== undefined) {
      q.limit(filters.$limit)
    }

    return q
  }

  async aggregateRaw(params: ServiceParams) {
    const model = await this.getModel(params)
    const pipeline = params.pipeline || []
    const index = pipeline.findIndex((stage: Document) => stage.$feathers)
    const before = index >= 0 ? pipeline.slice(0, index) : []
    const feathersPipeline = this.makeFeathersPipeline(params)
    const after = index >= 0 ? pipeline.slice(index + 1) : pipeline

    return model.aggregate([...before, ...feathersPipeline, ...after])
  }

  makeFeathersPipeline(params: ServiceParams) {
    const { filters, query } = this.filterQuery(null, params)
    const pipeline: Document[] = [{ $match: query }]

    if (filters.$select !== undefined) {
      pipeline.push({ $project: this.getSelect(filters.$select) })
    }

    if (filters.$sort !== undefined) {
      pipeline.push({ $sort: filters.$sort })
    }

    if (filters.$skip !== undefined) {
      pipeline.push({ $skip: filters.$skip })
    }

    if (filters.$limit !== undefined) {
      pipeline.push({ $limit: filters.$limit })
    }
    return pipeline
  }

  getSelect(select: string[] | { [key: string]: number }) {
    if (Array.isArray(select)) {
      if (!select.includes(this.id)) {
        select = [this.id, ...select]
      }
      return select.reduce<{ [key: string]: number }>(
        (value, name) => ({
          ...value,
          [name]: 1
        }),
        {}
      )
    }

    if (!select[this.id]) {
      return {
        ...select,
        [this.id]: 1
      }
    }

    return select
  }

  async _findOrGet(id: NullableAdapterId, params: ServiceParams) {
    return id === null ? await this._find(params) : await this._get(id, params)
  }

  normalizeId<D>(id: NullableAdapterId, data: D): D {
    if (this.id === '_id') {
      // Default Mongo IDs cannot be updated. The Mongo library handles
      // this automatically.
      return _.omit(data, this.id)
    } else if (id !== null) {
      // If not using the default Mongo _id field set the ID to its
      // previous value. This prevents orphaned documents.
      return {
        ...data,
        [this.id]: id
      }
    }
    return data
  }

  async _get(id: AdapterId, params: ServiceParams = {} as ServiceParams): Promise<Result> {
    const {
      query,
      filters: { $select }
    } = this.filterQuery(id, params)
    const projection = $select
      ? {
          projection: {
            ...this.getSelect($select),
            [this.id]: 1
          }
        }
      : {}
    const findOptions: FindOptions = {
      ...params.mongodb,
      ...projection
    }

    return this.getModel(params)
      .then((model) => model.findOne(query, findOptions))
      .then((data) => {
        if (data == null) {
          throw new NotFound(`No record found for id '${id}'`)
        }

        return data
      })
      .catch(errorHandler)
  }

  async _find(params?: ServiceParams & { paginate?: PaginationOptions }): Promise<Paginated<Result>>
  async _find(params?: ServiceParams & { paginate: false }): Promise<Result[]>
  async _find(params?: ServiceParams): Promise<Paginated<Result> | Result[]>
  async _find(params: ServiceParams = {} as ServiceParams): Promise<Paginated<Result> | Result[]> {
    const { paginate, useEstimatedDocumentCount } = this.getOptions(params)
    const { filters, query } = this.filterQuery(null, params)
    const useAggregation = !params.mongodb && filters.$limit !== 0
    const countDocuments = async () => {
      if (paginate && paginate.default) {
        const model = await this.getModel(params)
        if (useEstimatedDocumentCount && typeof model.estimatedDocumentCount === 'function') {
          return model.estimatedDocumentCount()
        } else {
          return model.countDocuments(query, { ...params.mongodb })
        }
      }
      return Promise.resolve(0)
    }

    const [request, total] = await Promise.all([
      useAggregation ? this.aggregateRaw(params) : this.findRaw(params),
      countDocuments()
    ])

    const page = {
      total,
      limit: filters.$limit,
      skip: filters.$skip || 0,
      data: filters.$limit === 0 ? [] : ((await request.toArray()) as any as Result[])
    }

    return paginate && paginate.default ? page : page.data
  }

  async _create(data: Data, params?: ServiceParams): Promise<Result>
  async _create(data: Data[], params?: ServiceParams): Promise<Result[]>
  async _create(data: Data | Data[], _params?: ServiceParams): Promise<Result | Result[]>
  async _create(
    data: Data | Data[],
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    const writeOptions = params.mongodb
    const model = await this.getModel(params)
    const setId = (item: any) => {
      const entry = Object.assign({}, item)

      // Generate a MongoId if we use a custom id
      if (this.id !== '_id' && typeof entry[this.id] === 'undefined') {
        return {
          [this.id]: new ObjectId().toHexString(),
          ...entry
        }
      }

      return entry
    }

    const promise = Array.isArray(data)
      ? model
          .insertMany(data.map(setId), writeOptions)
          .then(async (result) =>
            model.find({ _id: { $in: Object.values(result.insertedIds) } }, params.mongodb).toArray()
          )
      : model
          .insertOne(setId(data), writeOptions)
          .then(async (result) => model.findOne({ _id: result.insertedId }, params.mongodb))

    return promise.then(select(params, this.id)).catch(errorHandler)
  }

  async _patch(id: null, data: PatchData | Partial<Result>, params?: ServiceParams): Promise<Result[]>
  async _patch(id: AdapterId, data: PatchData | Partial<Result>, params?: ServiceParams): Promise<Result>
  async _patch(
    id: NullableAdapterId,
    data: PatchData | Partial<Result>,
    _params?: ServiceParams
  ): Promise<Result | Result[]>
  async _patch(
    id: NullableAdapterId,
    _data: PatchData | Partial<Result>,
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    if (id === null && !this.allowsMulti('patch', params)) {
      throw new MethodNotAllowed('Can not patch multiple entries')
    }

    const data = this.normalizeId(id, _data)
    const model = await this.getModel(params)
    const {
      query,
      filters: { $select }
    } = this.filterQuery(id, params)
    const updateOptions = { ...params.mongodb }
    const modifier = Object.keys(data).reduce((current, key) => {
      const value = (data as any)[key]

      if (key.charAt(0) !== '$') {
        current.$set = {
          ...current.$set,
          [key]: value
        }
      } else {
        current[key] = value
      }

      return current
    }, {} as any)
    const originalIds = await this._findOrGet(id, {
      ...params,
      query: {
        ...query,
        $select: [this.id]
      },
      paginate: false
    })
    const items = Array.isArray(originalIds) ? originalIds : [originalIds]
    const idList = items.map((item: any) => item[this.id])
    const findParams = {
      ...params,
      paginate: false,
      query: {
        [this.id]: { $in: idList },
        $select
      }
    }

    await model.updateMany(query, modifier, updateOptions)

    return this._findOrGet(id, findParams).catch(errorHandler)
  }

  async _update(id: AdapterId, data: Data, params: ServiceParams = {} as ServiceParams): Promise<Result> {
    if (id === null || Array.isArray(data)) {
      throw new BadRequest("You can not replace multiple instances. Did you mean 'patch'?")
    }

    const model = await this.getModel(params)
    const { query } = this.filterQuery(id, params)
    const replaceOptions = { ...params.mongodb }

    await model.replaceOne(query, this.normalizeId(id, data), replaceOptions)

    return this._findOrGet(id, params).catch(errorHandler)
  }

  async _remove(id: null, params?: ServiceParams): Promise<Result[]>
  async _remove(id: AdapterId, params?: ServiceParams): Promise<Result>
  async _remove(id: NullableAdapterId, _params?: ServiceParams): Promise<Result | Result[]>
  async _remove(
    id: NullableAdapterId | ObjectId,
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    if (id === null && !this.allowsMulti('remove', params)) {
      throw new MethodNotAllowed('Can not remove multiple entries')
    }

    const model = await this.getModel(params)
    const {
      query,
      filters: { $select }
    } = this.filterQuery(id, params)
    const deleteOptions = { ...params.mongodb }
    const findParams = {
      ...params,
      paginate: false,
      query: {
        ...query,
        $select
      }
    }

    return this._findOrGet(id, findParams)
      .then(async (items) => {
        await model.deleteMany(query, deleteOptions)
        return items
      })
      .catch(errorHandler)
  }
}