integreat-io/integreat-transporter-mongodb

View on GitHub
src/setDocs.ts

Summary

Maintainability
D
2 days
Test Coverage
/* eslint-disable security/detect-object-injection */
import debug from 'debug'
import prepareFilter from './utils/prepareFilter.js'
import { serializeItem } from './utils/serialize.js'
import { isObject, isObjectWithId, ObjectWithId } from './utils/is.js'
import { ensureArray } from './utils/array.js'
import { getCollection } from './send.js'
import type { Action, Response } from 'integreat'
import type {
  Collection,
  MongoClient,
  MongoBulkWriteError,
  WriteError,
  InsertOneResult,
  UpdateResult,
  DeleteResult,
  BulkWriteResult,
  AnyBulkWriteOperation,
} from 'mongodb'
import type { ServiceOptions } from './types.js'

interface ItemResponse {
  id?: string | string[]
  modifiedCount: number
  insertedCount: number
  deletedCount: number
  status: string
  error?: string
}

interface Operation {
  id: string
  filter: Record<string, unknown> | null
  update: Record<string, unknown>
  updateMany?: boolean
}

interface OperationError {
  error: string
}

const debugMongo = debug('integreat:transporter:mongodb')

const isDelete = (action: Action) => action.type === 'DELETE'
const isUpdate = (action: Action) => action.type === 'UPDATE'
const isUpdateMany = (operations: Operation[]) =>
  operations.some((operation) => operation.updateMany === true)

const extractQuery = (action: Action) =>
  Array.isArray(action.meta?.options?.query)
    ? action.meta.options.query
    : undefined

const summarizeResponses = (responses: ItemResponse[]) =>
  responses.reduce(
    (response, { modifiedCount, insertedCount, deletedCount }) => ({
      modifiedCount: response.modifiedCount + modifiedCount,
      insertedCount: response.insertedCount + insertedCount,
      deletedCount: response.deletedCount + deletedCount,
    }),
    { modifiedCount: 0, insertedCount: 0, deletedCount: 0 },
  )

const createErrorFromIds = (
  errors: (readonly [string | string[] | undefined, string | undefined])[],
  actionName: string,
) =>
  `Error ${actionName} item${errors.length === 1 ? '' : 's'} ${errors
    .flatMap(([id]) =>
      id ? ensureArray(id).map((id) => `'${id}'`) : "'<no id>'",
    )
    .join(', ')} in mongodb: ${errors.map(([_id, error]) => error).join(' | ')}`

const createResponse = (
  oneOrMoreResponses: ItemResponse | ItemResponse[],
  action: Action,
): Response => {
  const responses = ensureArray(oneOrMoreResponses)
  const errors = responses
    .filter((reponse) => reponse.status !== 'ok')
    .map((response) => [response.id, response.error] as const)
  return {
    ...action.response,
    status:
      errors.length === 0
        ? 'ok'
        : responses.length === 1
          ? responses[0].status
          : 'error',
    ...(responses && { data: summarizeResponses(responses) }),
    ...(errors.length > 0
      ? {
          error: createErrorFromIds(
            errors,
            isDelete(action) ? 'deleting' : 'updating',
          ),
        }
      : {}),
  }
}

const createOkResponse = (
  result: BulkWriteResult | UpdateResult | InsertOneResult | DeleteResult,
  id?: string,
) => ({
  id,
  modifiedCount: (result as UpdateResult | BulkWriteResult).modifiedCount ?? 0,
  insertedCount:
    ((result as BulkWriteResult).insertedCount ||
      (result as UpdateResult | BulkWriteResult).upsertedCount) ??
    ((result as InsertOneResult).insertedId ? 1 : 0),
  deletedCount: (result as DeleteResult | BulkWriteResult).deletedCount ?? 0,
  status: 'ok',
})

const createErrorResponse = (
  status: string,
  error: string,
  id?: string | string[],
) => ({
  id,
  modifiedCount: 0,
  insertedCount: 0,
  deletedCount: 0,
  status,
  error,
})

const removeId = ({ id, ...item }: ObjectWithId) => item

function createUpdateByQueryOperation(
  item: Record<string, unknown>,
  action: Action,
  useIdAsInternalId: boolean,
) {
  const {
    payload: { data, ...params },
    meta: { options: { keepUndefined = false, appendOnly = false } = {} } = {},
  } = action
  const filter = prepareFilter(
    extractQuery(action), // We know we have a query here, or else we wouldn't be here
    params,
    undefined,
    useIdAsInternalId,
    !!appendOnly,
  )
  const update = {
    $set: {
      ...(serializeItem(item, keepUndefined === true) as Record<
        string,
        unknown
      >),
    },
  }

  return { filter, update, updateMany: true }
}

const createOperation = (action: Action, useIdAsInternalId: boolean) =>
  function createOperation(item: unknown): Operation | OperationError {
    if (!isObjectWithId(item)) {
      return { error: 'Only object data with an id may be sent to MongoDB' }
    }

    const {
      payload: { data, ...params },
      meta: {
        options: { keepUndefined = false, appendOnly = false } = {},
      } = {},
    } = action
    const options = action.meta?.options as ServiceOptions | undefined
    const id = String(item.id)
    const idKey = useIdAsInternalId ? '_id' : 'id'

    const fields = {
      ...(serializeItem(removeId(item), keepUndefined === true) as Record<
        string,
        unknown
      >),
      [idKey]: id,
    }

    const filter = prepareFilter(
      options?.query,
      { ...params, id: item.id },
      undefined,
      useIdAsInternalId,
      !!appendOnly,
    )
    const update = filter ? { $set: fields } : fields

    return { filter, update, id }
  }

function createOperations(
  data: unknown,
  action: Action,
  useIdAsInternalId: boolean,
) {
  const query = extractQuery(action)
  if (
    isUpdate(action) &&
    Array.isArray(query) &&
    query.length > 0 &&
    isObject(data)
  ) {
    return [createUpdateByQueryOperation(data, action, useIdAsInternalId)]
  } else {
    return ensureArray(data).map(createOperation(action, useIdAsInternalId))
  }
}

async function deleteWithQuery(
  action: Action,
  collection: Collection,
): Promise<Response> {
  const {
    payload: { data, ...params },
  } = action
  const options = action.meta?.options as ServiceOptions | undefined
  const filter = prepareFilter(
    options?.query,
    params,
    undefined,
    !!options?.appendOnly,
  )

  if (!filter || Object.keys(filter).length === 0) {
    return { status: 'noaction', error: 'No query to delete with' }
  } else {
    try {
      const result = await collection.deleteMany(filter)
      return createOkResponse(result)
    } catch (error) {
      return createErrorResponse('error', (error as Error).message)
    }
  }
}

async function updateOne(
  operation: Operation,
  collection: Collection,
  action: Action,
) {
  try {
    debugMongo(
      '%s with filter %o',
      isDelete(action) ? 'Delete' : 'Update',
      operation.filter,
    )
    if (!operation.filter) {
      if (isDelete(action)) {
        // TODO: Write a test for this branch
        return createErrorResponse(
          'error',
          'No filter to delete with',
          operation.id,
        )
      } else {
        return createOkResponse(await collection.insertOne(operation.update))
      }
    }
    if (isUpdate(action)) {
      const count = await collection.countDocuments(operation.filter)
      if (count === 0) {
        return createErrorResponse(
          'notfound',
          'No documents found with the given filter',
          operation.id,
        )
      }
    }

    const result = isDelete(action)
      ? await collection.deleteOne(operation.filter)
      : await collection.updateOne(operation.filter, operation.update, {
          upsert: true,
        })
    return createOkResponse(result)
  } catch (error) {
    return createErrorResponse('error', (error as Error).message, operation.id)
  }
}

async function updateMany(
  operations: Operation[],
  collection: Collection,
  action: Action,
) {
  try {
    debugMongo(
      isDelete(action) ? 'Delete with filters %o' : 'Update with filters %o',
      operations.map((op) => op.filter),
    )

    if (isUpdate(action)) {
      const count = await collection.countDocuments({
        $or: operations.map((op) => op.filter as Record<string, unknown>), // TODO: Filter should never be null here, but should we still handle it?
      })
      if (count < operations.length) {
        return isUpdateMany(operations)
          ? createErrorResponse(
              'noaction',
              'No documents were matched by the given filter',
            )
          : createErrorResponse(
              'notfound',
              'One or more documents were not found with the given filter',
              operations.map((op) => op.id),
            )
      }
    }

    const bulkOperations = operations.map(({ filter, update }) =>
      isDelete(action)
        ? { deleteOne: { filter } }
        : filter
          ? isUpdate(action)
            ? isUpdateMany(operations)
              ? { updateMany: { filter, update } } // Use updateMany if we're updating many, typically with a query
              : { updateOne: { filter, update } } // Use updateOne if we're updating one, typically with an id
            : { updateOne: { filter, update, upsert: true } } // We're upserting, as this may be a new doc or an updated doc
          : { insertOne: { document: update } },
    )
    const result = await collection.bulkWrite(
      bulkOperations as AnyBulkWriteOperation[], // TODO: Filter should never be null here, but should we still handle it?
      { ordered: false },
    )
    return createOkResponse(result)
  } catch (error) {
    const result = (error as MongoBulkWriteError).result
    const okResponse = createOkResponse(result)
    const errorResponses = ensureArray(
      (error as MongoBulkWriteError).writeErrors as WriteError | WriteError[],
    ).map(({ index, errmsg }: WriteError) =>
      createErrorResponse(
        'error',
        errmsg || 'Unspecified MongoDB error',
        operations[index].id,
      ),
    )

    return [okResponse, ...errorResponses]
  }
}

// Update one or many (bulk)
async function update(
  operations: Operation[],
  collection: Collection,
  action: Action,
) {
  if (operations.length === 1 && !operations[0].updateMany) {
    return await updateOne(operations[0], collection, action)
  } else {
    return await updateMany(operations, collection, action)
  }
}

export default async function setDocs(
  action: Action,
  client: MongoClient,
  useIdAsInternalId = false,
): Promise<Response> {
  // Get the right collection
  const collection = getCollection(action, client)
  if (!collection) {
    return {
      ...action.response,
      status: 'error',
      error: 'Could not get the collection specified in the request',
    }
  }

  // Create operations from data
  const operations = createOperations(
    action.payload.data,
    action,
    useIdAsInternalId,
  )
  if (operations.length === 0) {
    if (action.type === 'DELETE' && !!extractQuery(action)) {
      return await deleteWithQuery(action, collection)
    } else {
      // No operations -- end right away
      return {
        ...action.response,
        status: 'noaction',
        error: 'No items to update',
        data: [],
      }
    }
  }

  // Check if there were any errors while creating operations
  const errors = operations.filter(
    (operation): operation is OperationError =>
      typeof (operation as OperationError).error === 'string',
  )
  if (errors.length > 0) {
    return createResponse(
      errors.map((operation) =>
        createErrorResponse('badrequest', operation.error!),
      ),
      action,
    )
  }

  // All good, let's update
  const responses = await update(operations as Operation[], collection, action)
  return createResponse(responses, action)
}