feathersjs/feathers

View on GitHub
packages/mongodb/src/adapter.ts

Summary

Maintainability
B
6 hrs
Test Coverage
import {
  ObjectId,
  Collection,
  FindOptions,
  BulkWriteOptions,
  InsertOneOptions,
  DeleteOptions,
  CountDocumentsOptions,
  ReplaceOptions,
  FindOneAndReplaceOptions,
  FindOneAndUpdateOptions,
  Document,
  FindOneAndDeleteOptions
} from 'mongodb'
import { BadRequest, MethodNotAllowed, NotFound } from '@feathersjs/errors'
import { _ } from '@feathersjs/commons'
import {
  AdapterBase,
  AdapterParams,
  AdapterServiceOptions,
  PaginationOptions,
  AdapterQuery,
  getLimit
} from '@feathersjs/adapter-commons'
import { Id, Paginated } from '@feathersjs/feathers'
import { errorHandler } from './error-handler'

export interface MongoDBAdapterOptions extends AdapterServiceOptions {
  Model: Collection | Promise<Collection>
  disableObjectify?: boolean
  useEstimatedDocumentCount?: boolean
}

export interface MongoDBAdapterParams<Q = AdapterQuery>
  extends AdapterParams<Q, Partial<MongoDBAdapterOptions>> {
  pipeline?: Document[]
  mongodb?:
    | BulkWriteOptions
    | FindOptions
    | InsertOneOptions
    | DeleteOptions
    | CountDocumentsOptions
    | ReplaceOptions
    | FindOneAndReplaceOptions
    | FindOneAndDeleteOptions
}

export type AdapterId = Id | ObjectId

export type NullableAdapterId = AdapterId | null

// Create the service.
export class MongoDbAdapter<
  Result,
  Data = Partial<Result>,
  ServiceParams extends MongoDBAdapterParams<any> = MongoDBAdapterParams,
  PatchData = Partial<Data>
> extends AdapterBase<Result, Data, PatchData, ServiceParams, MongoDBAdapterOptions, AdapterId> {
  constructor(options: MongoDBAdapterOptions) {
    if (!options) {
      throw new Error('MongoDB options have to be provided')
    }

    super({
      id: '_id',
      ...options
    })
  }

  getObjectId(id: AdapterId) {
    if (this.options.disableObjectify) {
      return id
    }

    if (this.id === '_id' && ObjectId.isValid(id)) {
      id = new ObjectId(id.toString())
    }

    return id
  }

  filterQuery(id: NullableAdapterId, params: ServiceParams) {
    const options = this.getOptions(params)
    const { $select, $sort, $limit: _limit, $skip = 0, ...query } = (params.query || {}) as AdapterQuery
    const $limit = getLimit(_limit, options.paginate)
    if (id !== null) {
      query.$and = (query.$and || []).concat({
        [this.id]: this.getObjectId(id)
      })
    }

    if (query[this.id]) {
      query[this.id] = this.getObjectId(query[this.id])
    }

    return {
      filters: { $select, $sort, $limit, $skip },
      query
    }
  }

  getModel(params: ServiceParams = {} as ServiceParams) {
    const { Model } = this.getOptions(params)
    return Promise.resolve(Model)
  }

  async findRaw(params: ServiceParams) {
    const { filters, query } = this.filterQuery(null, params)
    const model = await this.getModel(params)
    const q = model.find(query, params.mongodb)

    if (filters.$sort !== undefined) {
      q.sort(filters.$sort)
    }

    if (filters.$select !== undefined) {
      q.project(this.getProjection(filters.$select))
    }

    if (filters.$skip !== undefined) {
      q.skip(filters.$skip)
    }

    if (filters.$limit !== undefined) {
      q.limit(filters.$limit)
    }

    return q
  }

  /* TODO: Remove $out and $merge stages, else it returns an empty cursor. I think its safe to assume this is primarily for querying. */
  async aggregateRaw(params: ServiceParams) {
    const model = await this.getModel(params)
    const pipeline = params.pipeline || []
    const index = pipeline.findIndex((stage: Document) => stage.$feathers)
    const before = index >= 0 ? pipeline.slice(0, index) : []
    const feathersPipeline = this.makeFeathersPipeline(params)
    const after = index >= 0 ? pipeline.slice(index + 1) : pipeline

    return model.aggregate([...before, ...feathersPipeline, ...after], params.mongodb)
  }

  makeFeathersPipeline(params: ServiceParams) {
    const { filters, query } = this.filterQuery(null, params)
    const pipeline: Document[] = [{ $match: query }]

    if (filters.$sort !== undefined) {
      pipeline.push({ $sort: filters.$sort })
    }

    if (filters.$skip !== undefined) {
      pipeline.push({ $skip: filters.$skip })
    }

    if (filters.$limit !== undefined) {
      pipeline.push({ $limit: filters.$limit })
    }

    if (filters.$select !== undefined) {
      pipeline.push({ $project: this.getProjection(filters.$select) })
    }

    return pipeline
  }

  getProjection(select?: string[] | { [key: string]: number }) {
    if (!select) {
      return undefined
    }

    if (Array.isArray(select)) {
      if (!select.includes(this.id)) {
        select = [this.id, ...select]
      }
      return select.reduce<{ [key: string]: number }>(
        (value, name) => ({
          ...value,
          [name]: 1
        }),
        {}
      )
    }

    if (!select[this.id]) {
      return {
        ...select,
        [this.id]: 1
      }
    }

    return select
  }

  normalizeId<D>(id: NullableAdapterId, data: D): D {
    if (this.id === '_id') {
      // Default Mongo IDs cannot be updated. The Mongo library handles
      // this automatically.
      return _.omit(data, this.id)
    } else if (id !== null) {
      // If not using the default Mongo _id field set the ID to its
      // previous value. This prevents orphaned documents.
      return {
        ...data,
        [this.id]: id
      }
    }
    return data
  }

  async countDocuments(params: ServiceParams) {
    const { useEstimatedDocumentCount } = this.getOptions(params)
    const { query } = this.filterQuery(null, params)

    if (params.pipeline) {
      const aggregateParams = {
        ...params,
        paginate: false,
        pipeline: [...params.pipeline, { $count: 'total' }],
        query: {
          ...params.query,
          $select: [this.id],
          $sort: undefined,
          $skip: undefined,
          $limit: undefined
        }
      }
      const [result] = await this.aggregateRaw(aggregateParams).then((result) => result.toArray())
      if (!result) {
        return 0
      }
      return result.total
    }

    const model = await this.getModel(params)

    if (useEstimatedDocumentCount && typeof model.estimatedDocumentCount === 'function') {
      return model.estimatedDocumentCount()
    }

    return model.countDocuments(query, params.mongodb)
  }

  async _get(id: AdapterId, params: ServiceParams = {} as ServiceParams): Promise<Result> {
    const {
      query,
      filters: { $select }
    } = this.filterQuery(id, params)

    if (params.pipeline) {
      const aggregateParams = {
        ...params,
        query: {
          ...params.query,
          $limit: 1,
          $and: (params.query.$and || []).concat({
            [this.id]: this.getObjectId(id)
          })
        }
      }

      return this.aggregateRaw(aggregateParams)
        .then((result) => result.toArray())
        .then(([result]) => {
          if (!result) {
            throw new NotFound(`No record found for id '${id}'`)
          }

          return result
        })
        .catch(errorHandler)
    }

    const findOptions: FindOptions = {
      projection: this.getProjection($select),
      ...params.mongodb
    }

    return this.getModel(params)
      .then((model) => model.findOne(query, findOptions))
      .then((result) => {
        if (!result) {
          throw new NotFound(`No record found for id '${id}'`)
        }

        return result
      })
      .catch(errorHandler)
  }

  async _find(params?: ServiceParams & { paginate?: PaginationOptions }): Promise<Paginated<Result>>
  async _find(params?: ServiceParams & { paginate: false }): Promise<Result[]>
  async _find(params?: ServiceParams): Promise<Paginated<Result> | Result[]>
  async _find(params: ServiceParams = {} as ServiceParams): Promise<Paginated<Result> | Result[]> {
    const { paginate } = this.getOptions(params)
    const { filters } = this.filterQuery(null, params)
    const paginationDisabled = params.paginate === false || !paginate || !paginate.default

    const getData = () => {
      const result = params.pipeline ? this.aggregateRaw(params) : this.findRaw(params)
      return result.then((result) => result.toArray())
    }

    if (paginationDisabled) {
      if (filters.$limit === 0) {
        return [] as Result[]
      }
      const data = await getData()
      return data as Result[]
    }

    if (filters.$limit === 0) {
      return {
        total: await this.countDocuments(params),
        data: [] as Result[],
        limit: filters.$limit,
        skip: filters.$skip || 0
      }
    }

    const [data, total] = await Promise.all([getData(), this.countDocuments(params)])

    return {
      total,
      data: data as Result[],
      limit: filters.$limit,
      skip: filters.$skip || 0
    }
  }

  async _create(data: Data, params?: ServiceParams): Promise<Result>
  async _create(data: Data[], params?: ServiceParams): Promise<Result[]>
  async _create(data: Data | Data[], _params?: ServiceParams): Promise<Result | Result[]>
  async _create(
    data: Data | Data[],
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    if (Array.isArray(data) && !this.allowsMulti('create', params)) {
      throw new MethodNotAllowed('Can not create multiple entries')
    }

    const model = await this.getModel(params)
    const setId = (item: any) => {
      const entry = Object.assign({}, item)

      if (this.id !== '_id' && typeof entry[this.id] === 'undefined') {
        return {
          [this.id]: new ObjectId().toHexString(),
          ...entry
        }
      }

      return entry
    }

    if (Array.isArray(data)) {
      const created = await model.insertMany(data.map(setId), params.mongodb).catch(errorHandler)
      return this._find({
        ...params,
        paginate: false,
        query: {
          _id: { $in: Object.values(created.insertedIds) },
          $select: params.query?.$select
        }
      })
    }

    const created = await model.insertOne(setId(data), params.mongodb).catch(errorHandler)
    const result = await this._find({
      ...params,
      paginate: false,
      query: {
        _id: created.insertedId,
        $select: params.query?.$select,
        $limit: 1
      }
    })
    return result[0]
  }

  async _patch(id: null, data: PatchData | Partial<Result>, params?: ServiceParams): Promise<Result[]>
  async _patch(id: AdapterId, data: PatchData | Partial<Result>, params?: ServiceParams): Promise<Result>
  async _patch(
    id: NullableAdapterId,
    data: PatchData | Partial<Result>,
    _params?: ServiceParams
  ): Promise<Result | Result[]>
  async _patch(
    id: NullableAdapterId,
    _data: PatchData | Partial<Result>,
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    if (id === null && !this.allowsMulti('patch', params)) {
      throw new MethodNotAllowed('Can not patch multiple entries')
    }

    const data = this.normalizeId(id, _data)
    const model = await this.getModel(params)
    const {
      query,
      filters: { $sort, $select }
    } = this.filterQuery(id, params)

    const replacement = Object.keys(data).reduce(
      (current, key) => {
        const value = (data as any)[key]

        if (key.charAt(0) !== '$') {
          current.$set[key] = value
        } else if (key === '$set') {
          current.$set = {
            ...current.$set,
            ...value
          }
        } else {
          current[key] = value
        }

        return current
      },
      { $set: {} } as any
    )

    if (id === null) {
      const findParams = {
        ...params,
        paginate: false,
        query: {
          ...params.query,
          $select: [this.id]
        }
      }

      return this._find(findParams)
        .then(async (result) => {
          const idList = (result as Result[]).map((item: any) => item[this.id])
          await model.updateMany({ [this.id]: { $in: idList } }, replacement, params.mongodb)
          return this._find({
            ...params,
            paginate: false,
            query: {
              [this.id]: { $in: idList },
              $sort,
              $select
            }
          })
        })
        .catch(errorHandler)
    }

    if (params.pipeline) {
      const getParams = {
        ...params,
        query: {
          ...params.query,
          $select: [this.id]
        }
      }

      return this._get(id, getParams)
        .then(async () => {
          await model.updateOne({ [this.id]: id }, replacement, params.mongodb)
          return this._get(id, {
            ...params,
            query: { $select }
          })
        })
        .catch(errorHandler)
    }

    const updateOptions: FindOneAndUpdateOptions = {
      projection: this.getProjection($select),
      ...(params.mongodb as FindOneAndUpdateOptions),
      returnDocument: 'after'
    }

    return model
      .findOneAndUpdate(query, replacement, updateOptions)
      .then((result) => {
        if (!result) {
          throw new NotFound(`No record found for id '${id}'`)
        }
        return result as Result
      })
      .catch(errorHandler)
  }

  async _update(id: AdapterId, data: Data, params: ServiceParams = {} as ServiceParams): Promise<Result> {
    if (id === null || Array.isArray(data)) {
      throw new BadRequest("You can not replace multiple instances. Did you mean 'patch'?")
    }

    const {
      query,
      filters: { $select }
    } = this.filterQuery(id, params)
    const model = await this.getModel(params)
    const replacement = this.normalizeId(id, data)

    if (params.pipeline) {
      const getParams = {
        ...params,
        query: {
          ...params.query,
          $select: [this.id]
        }
      }

      return this._get(id, getParams)
        .then(async () => {
          await model.replaceOne({ [this.id]: id }, replacement, params.mongodb)
          return this._get(id, {
            ...params,
            query: { $select }
          })
        })
        .catch(errorHandler)
    }

    const replaceOptions: FindOneAndReplaceOptions = {
      projection: this.getProjection($select),
      ...(params.mongodb as FindOneAndReplaceOptions),
      returnDocument: 'after'
    }

    return model
      .findOneAndReplace(query, replacement, replaceOptions)
      .then((result) => {
        if (!result) {
          throw new NotFound(`No record found for id '${id}'`)
        }
        return result as Result
      })
      .catch(errorHandler)
  }

  async _remove(id: null, params?: ServiceParams): Promise<Result[]>
  async _remove(id: AdapterId, params?: ServiceParams): Promise<Result>
  async _remove(id: NullableAdapterId, _params?: ServiceParams): Promise<Result | Result[]>
  async _remove(
    id: NullableAdapterId | ObjectId,
    params: ServiceParams = {} as ServiceParams
  ): Promise<Result | Result[]> {
    if (id === null && !this.allowsMulti('remove', params)) {
      throw new MethodNotAllowed('Can not remove multiple entries')
    }

    const model = await this.getModel(params)
    const { query } = this.filterQuery(id, params)
    const findParams = {
      ...params,
      paginate: false
    }

    if (id === null) {
      return this._find(findParams)
        .then(async (result) => {
          const idList = (result as Result[]).map((item: any) => item[this.id])
          await model.deleteMany({ [this.id]: { $in: idList } }, params.mongodb)
          return result
        })
        .catch(errorHandler)
    }

    if (params.pipeline) {
      return this._get(id, params)
        .then(async (result) => {
          await model.deleteOne({ [this.id]: id }, params.mongodb)
          return result
        })
        .catch(errorHandler)
    }

    const deleteOptions: FindOneAndDeleteOptions = {
      ...(params.mongodb as FindOneAndDeleteOptions),
      projection: this.getProjection(params.query?.$select)
    }

    return model
      .findOneAndDelete(query, deleteOptions)
      .then((result) => {
        if (!result) {
          throw new NotFound(`No record found for id '${id}'`)
        }
        return result as Result
      })
      .catch(errorHandler)
  }
}