integreat-io/integreat

View on GitHub
src/dispatch.ts

Summary

Maintainability
B
6 hrs
Test Coverage
import { nanoid } from 'nanoid'
import pProgress from 'p-progress'
import debugLib from 'debug'
import { QUEUE_SYMBOL } from './handlers/index.js'
import setupGetService from './utils/getService.js'
import {
  setErrorOnAction,
  setResponseOnAction,
  setOptionsOnAction,
} from './utils/action.js'
import { createErrorResponse, setOrigin } from './utils/response.js'
import type {
  Dispatch,
  HandlerDispatch,
  Middleware,
  Action,
  Response,
  Ident,
  ActionHandler,
  ActionHandlerResources,
  GetService,
  HandlerOptions,
} from './types.js'
import type Service from './service/Service.js'
import type Schema from './schema/Schema.js'
import type Endpoint from './service/Endpoint.js'

const debug = debugLib('great')

export interface Resources {
  handlers: Record<string, ActionHandler>
  schemas: Map<string, Schema>
  services: Record<string, Service>
  middleware?: Middleware[]
  options: HandlerOptions
}

export const compose = (...fns: Middleware[]): Middleware =>
  fns.reduce(
    (f, g) =>
      (...args) =>
        f(g(...args)),
  )

// Uses `queue` flag from mutated action if it is set, otherwise uses falls
// back to `queue` flag from original action. Also requires that a queue service
// is configured.
const shouldQueue = (mutatedAction: Action, options: HandlerOptions) =>
  !!options.queueService && mutatedAction.meta?.queue

function getActionHandlerFromType(
  type: string | symbol | undefined,
  handlers: Record<string | symbol, ActionHandler>,
) {
  if (type) {
    // eslint-disable-next-line security/detect-object-injection
    const handler = handlers[type]
    if (typeof handler === 'function') {
      return handler
    }
  }
  return undefined
}

/**
 * Rename `service` to `targetService` and set id and cid if not already set.
 *
 * Note: We're also removing `meta.authorized`.This is really not needed
 * anymore, as we're using a symbol for marking actions as authorized. We're
 * still keeping it for now for good measures.
 */
function cleanUpActionAndSetIds({
  payload: { service, ...payload },
  meta: { auth, ...meta } = {},
  ...action
}: Action) {
  const id = meta?.id || nanoid()
  const cid = meta?.cid || id

  return {
    ...action,
    payload: { ...(service && { targetService: service }), ...payload },
    meta: { ...meta, id, cid, dispatchedAt: Date.now() },
  }
}

const cleanUpResponseAndSetAccessAndOrigin = (
  response: Response,
  ident?: Ident,
) => ({
  ...response,
  access: { ident, ...response.access },
  ...(response.status !== 'ok'
    ? { origin: response.origin || 'dispatch' }
    : {}),
})

const removeQueueFlag = ({
  meta: { queue, ...meta } = {},
  ...action
}: Action) => ({ ...action, meta })

const adjustActionAfterIncomingMutation = (
  { payload: { sourceService, ...payload } = {}, ...action }: Action,
  originalAction: Action,
) => ({
  ...action,
  payload,
  meta: {
    ...action.meta,
    queue: action.meta?.queue ?? originalAction.meta?.queue,
  },
})

async function mutateIncomingAction(action: Action, getService: GetService) {
  const { sourceService } = action.payload
  if (typeof sourceService !== 'string') {
    return { action }
  }

  const service = getService(undefined, sourceService)
  if (!service) {
    return {
      action: setErrorOnAction(
        action,
        `Source service '${sourceService}' not found`,
        'dispatch',
        'badrequest',
      ),
    }
  }
  const endpoint = await service.endpointFromAction(action, true)
  if (!endpoint) {
    return {
      action: setErrorOnAction(
        action,
        `No matching endpoint for incoming mapping on service '${sourceService}'`,
        'dispatch',
        'badrequest',
      ),
    }
  }

  let mutatedAction
  const validateResponse = await endpoint.validateAction(action)
  if (validateResponse) {
    mutatedAction = setResponseOnAction(action, validateResponse)
  } else {
    mutatedAction = adjustActionAfterIncomingMutation(
      await service.mutateIncomingRequest(
        setOptionsOnAction(action, endpoint),
        endpoint,
      ),
      action,
    )
  }
  return { action: mutatedAction, service, endpoint }
}

async function mutateIncomingResponse(
  action: Action,
  service?: Service,
  endpoint?: Endpoint,
): Promise<Response> {
  return service && endpoint
    ? await service.mutateIncomingResponse(
        setOptionsOnAction(action, endpoint),
        endpoint,
      ) // Mutate if this is an incoming action
    : action.response || {}
}

async function handleAction(
  handlerType: string | symbol,
  action: Action,
  resources: ActionHandlerResources,
  handlers: Record<string, ActionHandler>,
): Promise<Response> {
  // Find handler ...
  const handler = getActionHandlerFromType(handlerType, handlers)
  if (!handler) {
    return createErrorResponse(
      `No handler for ${String(handlerType)} action`,
      'dispatch',
      'badrequest',
    )
  }

  // ... and pass it the action
  return setOrigin(
    await handler(action, resources),
    typeof handlerType === 'string'
      ? `handler:${handlerType}`
      : 'handler:queue',
  )
}

/**
 * Setup and return dispatch function. The dispatch function will pass an action
 * through the middleware before sending it to the relevant action handler. When
 * an action has a specified `sourceService`, any action data will be mapped as
 * incoming from that service before the middleware, and will be mapped back to
 * that service in the response.
 */
export default function createDispatch({
  handlers = {},
  schemas = new Map(),
  services = {},
  middleware = [],
  options,
}: Resources): Dispatch {
  // Prepare resources for the dispatch function
  const getService = setupGetService(schemas, services)
  const middlewareFn =
    middleware.length > 0
      ? compose(...middleware)
      : (next: HandlerDispatch) => async (action: Action) => next(action)

  // Create dispatch function
  const dispatch = (originalAction: Action | null) =>
    pProgress(async (setProgress) => {
      debug('Dispatch: %o', originalAction)
      if (!originalAction) {
        return {
          status: 'noaction',
          error: 'Dispatched no action',
          origin: 'dispatch',
        }
      }

      let response
      const {
        action,
        service: incomingService,
        endpoint: incomingEndpoint,
      } = await mutateIncomingAction(
        cleanUpActionAndSetIds(originalAction),
        getService,
      )

      if (action.response?.status) {
        // Stop here if the mutation set a response
        response = action.response
      } else {
        const resources = { dispatch, getService, options, setProgress }

        try {
          if (shouldQueue(action, options)) {
            // Use queue handler if queue flag is set and there is a queue
            // service. Bypass middleware
            response = await handleAction(
              QUEUE_SYMBOL,
              action,
              resources,
              handlers,
            )
          } else {
            // Send action through middleware before sending to the relevant
            // handler
            const next = async (action: Action) =>
              handleAction(action.type, action, resources, handlers)
            response = setOrigin(
              await middlewareFn(next)(removeQueueFlag(action)),
              'middleware:dispatch',
            )
          }
        } catch (err) {
          response = createErrorResponse(
            `Error thrown in dispatch: ${
              err instanceof Error ? err.message : String(err)
            }`,
            'dispatch',
          )
        }
      }

      return cleanUpResponseAndSetAccessAndOrigin(
        await mutateIncomingResponse(
          setResponseOnAction(action, response),
          incomingService,
          incomingEndpoint,
        ),
        action.meta?.ident,
      )
    })

  return dispatch
}