integreat-io/integreat

View on GitHub
src/jobs/Step.ts

Summary

Maintainability
A
0 mins
Test Coverage
import mapTransform from 'map-transform'
import pLimit from 'p-limit'
import { ensureArray } from '../utils/array.js'
import { isObject, isOkResponse } from '../utils/is.js'
import xor from '../utils/xor.js'
import {
  setDataOnActionPayload,
  setResponseOnAction,
  setOriginOnAction,
  setErrorOnAction,
  setMetaOnAction,
} from '../utils/action.js'
import { combineResponses, setOrigin } from '../utils/response.js'
import validateFilters from '../utils/validateFilters.js'
import prepareValidator from '../utils/validation.js'
import { populateActionAfterMutation } from '../utils/mutationHelpers.js'
import type {
  TransformObject,
  Pipeline,
  DataMapper,
  InitialState,
} from 'map-transform/types.js'
import type {
  Action,
  Response,
  Meta,
  HandlerDispatch,
  Condition,
  MapOptions,
  ValidateObject,
} from '../types.js'
import type { JobStepDef } from './types.js'

export const breakSymbol = Symbol('break')

type ArrayElement<ArrayType extends readonly unknown[]> = ArrayType[number]

interface Validator {
  (actionResponses: Record<string, Action>): Promise<[Response | null, boolean]>
}

export interface ResponsesObject extends Record<string, Action> {
  [breakSymbol]?: boolean
}

const adjustPrevalidationResponse = ({
  break: _,
  message,
  ...response
}: Response & { message?: string; break?: boolean }) =>
  message
    ? isOkResponse(response)
      ? { ...response, warning: message }
      : { ...response, error: message }
    : response

const setOkStatusOnErrorResponse = ({
  error,
  origin,
  ...response
}: Response = {}) => ({
  ...response,
  status: 'ok',
  ...(error ? { warning: error } : {}), // Turn an error into a warning
})

const ensureOkResponse = (response?: Response): Response =>
  isOkResponse(response) ? response! : setOkStatusOnErrorResponse(response)

const adjustValidationResponse = (
  responses: Response[],
  response: Response | undefined,
) =>
  responses.length > 0
    ? combineResponses(responses) // One or more conditions failed, so we'll return the combined response
    : response
      ? ensureOkResponse(response) // Always return an ok response when we have a response (only happens for postconditions)
      : null

const addModify = (mutation: ArrayElement<Pipeline>) =>
  isObject(mutation) ? { $modify: true, ...mutation } : mutation

// Insert `'$action'` as the first step in a pipeline to get the action we're
// mutating from.
const putMutationInPipeline = (
  mutation: TransformObject | Pipeline,
  useMagic: boolean,
) =>
  Array.isArray(mutation)
    ? ['$action', ...mutation.map(addModify)]
    : useMagic
      ? ['$action', { '.': '.', ...mutation }]
      : ['$action', addModify(mutation)]

const putMutationInPipelineForCondition = (
  conditionObject: ValidateObject,
): ValidateObject => ({
  ...conditionObject,
  condition: Array.isArray(conditionObject.condition)
    ? ['$action', ...conditionObject.condition]
    : conditionObject.condition
      ? ['$action', conditionObject.condition]
      : null,
})

const getThisResponse = (actionResponses: Record<string, Action>) =>
  actionResponses.$action?.response

const getPreviousResponse = (
  actionResponses: Record<string, Action>,
  prevStepId?: string,
) => (prevStepId ? actionResponses[prevStepId]?.response : undefined) // eslint-disable-line security/detect-object-injection

function createConditionsValidator(
  conditions:
    | ValidateObject[]
    | Record<string, Condition | undefined>
    | undefined,
  mapOptions: MapOptions,
  isPreconditions: boolean,
  breakByDefault: boolean,
  prevStepId?: string,
): Validator {
  if (Array.isArray(conditions)) {
    // Validate condition pipelines
    const defaultFailStatus = isPreconditions ? 'noaction' : 'error'
    const validator = prepareValidator(
      conditions,
      mapOptions,
      defaultFailStatus,
      breakByDefault,
    )
    return async function validate(actionResponses) {
      const [responses, doBreak] = await validator(actionResponses)
      const stepResponse = getThisResponse(actionResponses)
      const response = adjustValidationResponse(responses, stepResponse)
      return [response, doBreak]
    }
  } else if (isObject(conditions)) {
    // Validate through filters. Only used for prevalidations. Will be deprecated
    const validator = validateFilters(conditions, true)
    return async function validate(actionResponses) {
      const responses = validator(actionResponses)
      const doBreak = responses.some(
        ({ break: doBreak = breakByDefault }) => doBreak,
      )

      return responses.length > 0
        ? [adjustPrevalidationResponse(combineResponses(responses)), doBreak] // We only return the first error here
        : [null, false]
    }
  } else if (xor(isPreconditions, breakByDefault)) {
    // We're using xor to check for not `isPreconditions` when `breakByDefault` is `true`
    // We have no conditions, so we'll just check if this or the previous step was ok (the former when `breakByDefault` is `true`)
    return async function validatePrevWasOk(actionResponses) {
      const response = breakByDefault
        ? getThisResponse(actionResponses)
        : getPreviousResponse(actionResponses, prevStepId)
      return response && !isOkResponse(response)
        ? [response, true]
        : [null, false]
    }
  } else {
    // Is postconditions and no conditions, so we'll always return ok
    return async function validateAlwaysOk(_actionResponses) {
      return [null, false]
    }
  }
}

function createPreconditionsValidator(
  preconditions: ValidateObject[] | undefined,
  validationFilters: Record<string, Condition | undefined> | undefined,
  mapOptions: MapOptions,
  breakByDefault: boolean,
  prevStepId?: string,
): Validator {
  const conditions = preconditions || validationFilters
  return createConditionsValidator(
    conditions,
    mapOptions,
    true,
    breakByDefault,
    prevStepId,
  )
}

function createPostconditionsValidator(
  conditions: ValidateObject[] | undefined,
  mapOptions: MapOptions,
  breakByDefault: boolean,
): Validator {
  if (Array.isArray(conditions)) {
    conditions = conditions.map(putMutationInPipelineForCondition)
  }
  return createConditionsValidator(
    conditions,
    mapOptions,
    false,
    breakByDefault,
  )
}

export function getLastJobWithResponse(
  steps: Step[],
  actionResponses: Record<string, Action>,
) {
  for (let i = steps.length - 1; i > -1; i--) {
    const step = steps.at(i) as Step // TS: We know this is a step
    // const ids = Array.isArray(step) ? step.map((f) => f.id) : [step.id]
    const actionResponse = actionResponses[step.id] || {}
    if (actionResponse.response?.status) {
      return actionResponse.response
    }
  }
  return undefined
}

const setResponseStatus = (response: Response = {}): Response => ({
  ...response,
  status: response.status ? response.status : response.error ? 'error' : 'ok',
})

async function mutateAction(
  action: Action,
  mutator: DataMapper<InitialState> | undefined,
  actionsWithResponses: Record<string, Action>,
): Promise<Action> {
  if (mutator) {
    const responsesIncludingAction = {
      ...actionsWithResponses,
      $action: action,
    }
    return (await mutator(responsesIncludingAction)) as Action
  } else {
    return action
  }
}

export async function mutateResponse(
  action: Action,
  responses: Record<string, Action>,
  origin: string,
  postmutator?: DataMapper<InitialState>,
): Promise<Action> {
  const { response: mutatedResponse } = await mutateAction(
    action,
    postmutator,
    responses,
  )
  return setOriginOnAction(
    populateActionAfterMutation(
      action,
      setResponseOnAction(action, setResponseStatus(mutatedResponse)),
    ),
    origin,
    true, // Prefix any existing origin
  )
}

function responseFromSteps(
  actionResponses: Record<string, Action>,
): Action | undefined {
  const errorResponses = Object.values(actionResponses)
    .map(({ response }) => response)
    .filter((response): response is Response => !isOkResponse(response))
  if (errorResponses.length === 0) {
    return { response: { status: 'ok' } } as Action // Allow an action with only a response here
  } else {
    return {
      response: {
        status: 'error',
        responses: errorResponses,
      },
    } as Action // Allow an action with only a response here
  }
}

function generateIterateResponse(action: Action, responses: ResponsesObject) {
  const actionsWithResponses = Object.values(responses)
  const errorResponses = actionsWithResponses
    .map(({ response }) => response)
    .filter(
      (response): response is Response => !!response && !isOkResponse(response),
    )
  const status = errorResponses.length === 0 ? 'ok' : 'error'
  const error = errorResponses
    .map((response) =>
      response ? `[${response.status}] ${response.error}` : undefined,
    )
    .filter(Boolean)
    .join(' | ')
  const data = actionsWithResponses.flatMap(({ response }) => response?.data)
  return setResponseOnAction(action, {
    status,
    data,
    ...(error && { error, responses: errorResponses }),
  })
}

export const prepareMutation = (
  pipeline: TransformObject | Pipeline,
  mapOptions: MapOptions,
  useMagic = false,
) => mapTransform(putMutationInPipeline(pipeline, useMagic), mapOptions)

function getIterateMutator(step: JobStepDef, mapOptions: MapOptions) {
  const pipeline = step.iterate || step.iteratePath
  if (pipeline) {
    return mapTransform(pipeline, mapOptions)
  } else {
    return undefined
  }
}

export function getPrevStepId(
  index: number,
  steps: (JobStepDef | JobStepDef[])[],
) {
  const prevStep = index > 0 ? steps[index - 1] : undefined
  return Array.isArray(prevStep)
    ? prevStep
        .map((step) => step?.id)
        .filter(Boolean)
        .join(':')
    : prevStep?.id
}

async function runOneAction(action: Action, dispatch: HandlerDispatch) {
  try {
    return setResponseOnAction(action, await dispatch(action))
  } catch (error) {
    return setErrorOnAction(action, error)
  }
}

export default class Step {
  id: string
  #action?: Action
  #subSteps?: Step[]
  #validatePreconditions: Validator = async () => [null, false]
  #validatePostconditions: Validator = async () => [null, false]
  #premutator?: DataMapper<InitialState>
  #postmutator?: DataMapper<InitialState>
  #iterateMutator?: DataMapper<InitialState>
  #iterateConcurrency?: number

  constructor(
    stepDef: JobStepDef | JobStepDef[],
    mapOptions: MapOptions,
    breakByDefault = false,
    prevStepId?: string,
  ) {
    if (Array.isArray(stepDef)) {
      this.id = stepDef.map((step) => step.id).join(':')
      this.#subSteps = stepDef.map(
        (step, index, steps) =>
          new Step(
            step,
            mapOptions,
            breakByDefault,
            getPrevStepId(index, steps),
          ),
      )
    } else {
      this.id = stepDef.id
      this.#validatePreconditions = createPreconditionsValidator(
        stepDef.preconditions,
        stepDef.conditions,
        mapOptions,
        breakByDefault,
        prevStepId,
      )
      this.#validatePostconditions = createPostconditionsValidator(
        stepDef.postconditions,
        mapOptions,
        breakByDefault,
      )
      this.#action = stepDef.action
      const premutation = stepDef.premutation || stepDef.mutation
      const postmutation = stepDef.postmutation || stepDef.responseMutation
      this.#premutator = premutation
        ? prepareMutation(premutation, mapOptions, !!stepDef.mutation) // Set a flag for `mutation`, to signal that we want to use the obsolete "magic"
        : undefined
      this.#postmutator = postmutation
        ? prepareMutation(postmutation, mapOptions, !!stepDef.responseMutation) // Set a flag for `responseMutation`, to signal that we want to use the obsolete "magic"
        : undefined
      this.#iterateMutator = getIterateMutator(stepDef, mapOptions)
      this.#iterateConcurrency = stepDef.iterateConcurrency
    }
  }

  /**
   * Run the given action for every item in the items array.
   */
  async runIteration(
    actionResponses: Record<string, Action>,
    dispatch: HandlerDispatch,
    meta: Meta,
  ) {
    if (!this.#iterateMutator || !this.#action) {
      return undefined // Return undefined when there's no iterate mutator or action
    }
    const action = setMetaOnAction(this.#action, meta)

    const items = ensureArray(await this.#iterateMutator(actionResponses))
    const actions = items.map((item) => setDataOnActionPayload(action, item))
    const limit = pLimit(this.#iterateConcurrency ?? 1)

    return Object.fromEntries(
      (
        await Promise.all(
          actions.map((action) =>
            limit(async () =>
              runOneAction(
                await mutateAction(action, this.#premutator, actionResponses),
                dispatch,
              ),
            ),
          ),
        )
      ).map((response, index) => [
        `${this.id}_${index}`, // Set the id of each response as key for the object to be created
        setOriginOnAction(response, `${this.id}_${index}`),
      ]),
    )
  }

  /**
   * Run the action for this step.
   */
  async runAction(
    actionResponses: Record<string, Action>,
    dispatch: HandlerDispatch,
    meta: Meta,
  ): Promise<ResponsesObject> {
    if (!this.#action) {
      return {} // No action, return empty response object
    }
    const action = setMetaOnAction(this.#action, meta)

    // Run the action for every item in the array return by the iterate mutator.
    const responses = await this.runIteration(actionResponses, dispatch, meta)

    // If we got any responses, combine them into one response. Otherwise
    // just run the action, as no responses means there were no iterate mutator,
    // so nothing has been run yet.
    const responseAction = responses
      ? generateIterateResponse(action, responses)
      : await runOneAction(
          await mutateAction(action, this.#premutator, actionResponses),
          dispatch,
        )

    // Mutate the response and return together with any individual responses
    return {
      ...responses,
      [this.id]: await mutateResponse(
        responseAction,
        actionResponses,
        this.id,
        this.#postmutator,
      ),
    }
  }

  /**
   *  Run all sub steps in parallel.
   */
  async runSubSteps(
    actionResponses: Record<string, Action>,
    dispatch: HandlerDispatch,
    meta: Meta,
  ) {
    if (!this.#subSteps) {
      return {} // No sub steps, return empty response object
    }

    // TODO: Actually run all parallel steps, even if one fails
    const arrayOfResponses = await Promise.all(
      this.#subSteps.map((step) => step.run(meta, actionResponses, dispatch)),
    )
    const doBreak = arrayOfResponses.some(
      (response) => response[breakSymbol], // eslint-disable-line security/detect-object-injection
    )
    const responsesObj = Object.fromEntries(
      arrayOfResponses.flatMap((responses) => Object.entries(responses)),
    )
    const thisStep = responseFromSteps(responsesObj)
    return thisStep
      ? { ...responsesObj, [this.id]: thisStep, [breakSymbol]: doBreak }
      : { ...responsesObj, [breakSymbol]: doBreak }
  }

  /**
   * Run this step.
   */
  async run(
    meta: Meta,
    actionResponses: Record<string, Action>,
    dispatch: HandlerDispatch,
  ): Promise<ResponsesObject> {
    // First, check if the step preconditions are met. Break if configured to.
    const [preconditionsResponse, doPreBreak] =
      await this.#validatePreconditions(actionResponses)
    if (preconditionsResponse || doPreBreak) {
      return {
        [this.id]: {
          response: setOrigin(preconditionsResponse || {}, this.id),
        } as Action, // Allow an action with only a response here
        [breakSymbol]: doPreBreak,
      }
    }

    // Validated, so let's run the action or sub steps
    const responseObject: ResponsesObject = this.#action
      ? await this.runAction(actionResponses, dispatch, meta)
      : await this.runSubSteps(actionResponses, dispatch, meta)
    const thisActionResponse = responseObject[this.id] // This is the action with response for this step

    // Finally, check if the step postconditions are met. Break if configured to.
    const [postconditionsResponse, doPostBreak] =
      await this.#validatePostconditions({
        ...actionResponses,
        $action: thisActionResponse,
      })
    if (postconditionsResponse || doPostBreak) {
      return {
        ...responseObject,
        [this.id]: setResponseOnAction(
          thisActionResponse,
          setOrigin(postconditionsResponse || {}, this.id),
        ),
        [breakSymbol]: doPostBreak,
      }
    }

    // No post conditions, so return the action response object
    return responseObject
  }
}