integreat-io/integreat-transporter-mongodb

View on GitHub
src/getDocs.ts

Summary

Maintainability
C
1 day
Test Coverage
import assert from 'node:assert/strict'
import debug from 'debug'
import mapAny from 'map-any'
import { getProperty, setProperty } from 'dot-prop'
import prepareFilter from './utils/prepareFilter.js'
import prepareAggregation, {
  extractLookupPaths,
} from './utils/prepareAggregation.js'
import createPaging from './utils/createPaging.js'
import { normalizeItem } from './utils/serialize.js'
import { getCollection } from './send.js'
import { decodePageId } from './utils/pageId.js'
import { isObject } from './utils/is.js'
import type { FindCursor, AggregationCursor, MongoClient } from 'mongodb'
import type { Action, Response } from 'integreat'
import type {
  ServiceOptions,
  Payload,
  AggregationObject,
  QueryObject,
  ParsedPageId,
} from './types.js'

const debugMongo = debug('integreat:transporter:mongodb')

type Cursor = FindCursor | AggregationCursor

interface ItemWithIdObject extends Record<string, unknown> {
  _id: Record<string, unknown>
}

const resolveInternalId = (_id: unknown, id: unknown) =>
  isObject(_id) && id !== undefined
    ? id // When `_id` is an object (i.e. a compund id), and `id` is set, use `id` to not override intentional mapping
    : _id ?? id // Fall back to `id` if `_id` is not present

const useInternalId = ({
  _id,
  id,
  ...item
}: Record<string, unknown>): Record<string, unknown> => ({
  ...item,
  id: resolveInternalId(_id, id),
})

export const useInternalIdIfObject = (item: unknown) =>
  isObject(item) ? useInternalId(item) : item

const normalizeIdInItem = (lookupPaths: string[]) =>
  function normalizeIdInItem(item: unknown) {
    if (!isObject(item)) {
      return item
    }
    // Don't normalize the id if it's an compound id
    const normalized = isObject(item._id) ? item : useInternalId(item)

    if (lookupPaths.length > 0) {
      // Look for lookup paths and normalize the ids in the lookup result
      // We do this even for aggregations
      lookupPaths.forEach((lookupPath) => {
        const value = getProperty(normalized, lookupPath)
        setProperty(
          normalized,
          lookupPath,
          mapAny(useInternalIdIfObject, value),
        )
      })
    }

    return normalized
  }

function normalizeId(
  data: unknown[],
  useIdAsInternalId: boolean,
  lookupPaths: string[],
) {
  if (!useIdAsInternalId) {
    return data
  }
  return data.map(normalizeIdInItem(lookupPaths))
}

const getId = (data: Record<string, unknown>, useIdAsInternalId: boolean) =>
  useIdAsInternalId ? data._id : data.id

function compareIds(a: unknown, b: string | Record<string, unknown>) {
  if (isObject(a) && isObject(b)) {
    try {
      assert.deepEqual(a, b)
      return true
    } catch (err) {
      return false
    }
  }
  return a === b
}

// Move the cursor to the first doc after the `pageAfter`
// When no `pageAfter`, just start from the beginning
const moveToData = async (
  cursor: Cursor,
  useIdAsInternalId: boolean,
  isAggregation = false,
  pageAfter?: string | Record<string, unknown>,
) => {
  if (!pageAfter) {
    // Start from the beginning
    return true
  }

  let doc
  do {
    doc = await cursor.next()
  } while (
    doc &&
    !compareIds(
      normalizeItem(getId(doc, useIdAsInternalId || isAggregation)),
      pageAfter,
    )
  )

  return !!doc // false if the doc to start after is not found
}

const explodeId = ({ _id, ...item }: ItemWithIdObject) => ({
  _id,
  ...item,
  ..._id,
})

function mutateItem(item: unknown) {
  if (isObject(item) && isObject(item._id)) {
    if (item._id._bsontype === 'ObjectID') {
      // MongoDb id object
      return { ...item, _id: item._id.toString() }
    } else {
      return explodeId(item as ItemWithIdObject)
    }
  }
  return item
}

// Get one page of docs from where the cursor is
const getData = async (cursor: Cursor, pageSize: number) => {
  const data = []

  while (data.length < pageSize) {
    const doc = await cursor.next()
    if (!doc) {
      break
    }
    data.push(mutateItem(doc))
  }

  return data
}

const getPage = async (
  cursor: Cursor,
  useIdAsInternalId: boolean,
  { pageSize = Infinity, pageOffset, pageAfter }: Payload,
  pageId?: ParsedPageId,
) => {
  if (typeof pageOffset === 'number') {
    cursor.skip(pageOffset)
  } else {
    const after = pageAfter || pageId?.id

    // When pageAfter is set – loop until we find the doc with that `id`
    debugMongo('Moving to cursor %s', after)
    const foundFirst = await moveToData(
      cursor,
      useIdAsInternalId,
      pageId?.isAgg,
      after,
    )

    if (!foundFirst) {
      return []
    }
  }

  // Get the number of docs specified with pageSize - or the rest of the docs
  debugMongo('Getting %s items', pageSize)
  return getData(cursor, pageSize)
}

const appendToAggregation = (
  aggregation: AggregationObject[],
  query?: QueryObject[],
  sort?: Record<string, 1 | -1>,
) =>
  [
    query ? { type: 'query', query } : undefined,
    sort ? { type: 'sort', sortBy: sort } : undefined,
    ...aggregation,
  ].filter(Boolean) as AggregationObject[]

const paramsFromPayload = ({ data, ...payload }: Payload) => payload

const prepareSort = (
  sort: Record<string, 1 | -1> | undefined,
  useIdAsInternalId: boolean,
) =>
  !useIdAsInternalId || !sort
    ? sort
    : Object.fromEntries(
        Object.entries(sort).map(([key, value]) => [
          key === 'id' ? '_id' : key,
          value,
        ]),
      )

export default async function getDocs(
  action: Action,
  client: MongoClient,
  useIdAsInternalId = false,
): Promise<Response> {
  const collection = getCollection(action, client)
  if (!collection) {
    debugMongo('Trying to get docs from unknown collection')
    return {
      ...action.response,
      status: 'error',
      error: 'Could not get the collection specified in the request',
    }
  }

  const { payload, meta: { options } = {} } = action
  const params = paramsFromPayload(payload)
  const pageId = decodePageId(payload.pageId)

  debugMongo('Incoming options %o', options)
  debugMongo('Incoming params %o', params)

  const {
    query,
    allowDiskUse = false,
    aggregation: aggregationObjects,
  } = options as ServiceOptions
  const sort = prepareSort((options as ServiceOptions).sort, useIdAsInternalId)

  // Prepare aggregation
  const aggregation = aggregationObjects
    ? prepareAggregation(
        appendToAggregation(aggregationObjects, query, sort),
        params,
        true, // addDefaultSorting,
        useIdAsInternalId,
      )
    : undefined
  const lookupPaths = extractLookupPaths(aggregationObjects)

  let cursor
  if (aggregation) {
    // Run aggregation
    debugMongo('Starting query with aggregation %o', aggregation)
    cursor = collection.aggregate(aggregation, { allowDiskUse })
  } else {
    // Prepare filter and run as query when not an aggregation
    const filter = prepareFilter(query, params, pageId, useIdAsInternalId)
    debugMongo('Starting query with filter %o', filter)
    cursor = collection.find(filter!, { allowDiskUse }) // TODO: We should never get null as filter here, but should we still handle it?
    debugMongo('Sorting with %o', sort)
    cursor = cursor.sort(sort ?? { _id: 1 })
  }

  debugMongo('Getting page')
  const data = await getPage(cursor, useIdAsInternalId, payload, pageId)
  debugMongo('Got result page with %s items', data.length)

  if (data.length === 0 && payload.id) {
    return {
      ...action.response,
      status: 'notfound',
      error: `Could not find '${payload.id}' of type '${payload.type}'`,
    }
  }

  let totalCount = data.length
  if (typeof payload.pageSize === 'number') {
    debugMongo('Counting documents')
    if (aggregation) {
      totalCount =
        isObject(data[0]) && typeof data[0].__totalCount === 'number'
          ? data[0].__totalCount // This is a special prop added in the aggregation
          : 0
    } else {
      const countFilter = prepareFilter(
        query,
        params,
        undefined,
        useIdAsInternalId,
      )
      totalCount = await collection.countDocuments(countFilter!) // TODO: We should never get null as filter here, but should we still handle it?
    }
  }

  debugMongo('Normalizing data')
  const normalizedData = normalizeId(
    data.map(normalizeItem),
    useIdAsInternalId,
    lookupPaths,
  )

  const response = {
    ...action.response,
    status: 'ok',
    data: normalizedData,
    params: { totalCount },
  }

  if (payload.pageSize) {
    debugMongo('Creating paging')
    response.paging = createPaging(
      normalizedData,
      payload,
      sort,
      aggregationObjects,
    )
  }

  return response
}