Chocobozzz/PeerTube

View on GitHub
packages/server-commands/src/runners/runner-jobs-command.ts

Summary

Maintainability
D
1 day
Test Coverage
import { omit, pick, wait } from '@peertube/peertube-core-utils'
import {
  AbortRunnerJobBody,
  AcceptRunnerJobBody,
  AcceptRunnerJobResult,
  ErrorRunnerJobBody,
  HttpStatusCode,
  ListRunnerJobsQuery,
  RequestRunnerJobBody,
  RequestRunnerJobResult,
  ResultList,
  RunnerJobAdmin,
  RunnerJobLiveRTMPHLSTranscodingPayload,
  RunnerJobPayload,
  RunnerJobState,
  RunnerJobStateType,
  RunnerJobSuccessBody,
  RunnerJobSuccessPayload,
  RunnerJobType,
  RunnerJobUpdateBody,
  RunnerJobVODPayload,
  TranscriptionSuccess,
  VODHLSTranscodingSuccess,
  VODWebVideoTranscodingSuccess,
  isHLSTranscodingPayloadSuccess,
  isLiveRTMPHLSTranscodingUpdatePayload,
  isTranscriptionPayloadSuccess,
  isWebVideoOrAudioMergeTranscodingPayloadSuccess
} from '@peertube/peertube-models'
import { unwrapBody } from '../requests/index.js'
import { waitJobs } from '../server/jobs.js'
import { AbstractCommand, OverrideCommandOptions } from '../shared/index.js'

export class RunnerJobsCommand extends AbstractCommand {

  list (options: OverrideCommandOptions & ListRunnerJobsQuery = {}) {
    const path = '/api/v1/runners/jobs'

    return this.getRequestBody<ResultList<RunnerJobAdmin>>({
      ...options,

      path,
      query: pick(options, [ 'start', 'count', 'sort', 'search', 'stateOneOf' ]),
      implicitToken: true,
      defaultExpectedStatus: HttpStatusCode.OK_200
    })
  }

  cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'

    return this.postBodyRequest({
      ...options,

      path,
      implicitToken: true,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  deleteByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID

    return this.deleteRequest({
      ...options,

      path,
      implicitToken: true,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  // ---------------------------------------------------------------------------

  request (options: OverrideCommandOptions & RequestRunnerJobBody) {
    const path = '/api/v1/runners/jobs/request'

    return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
      ...options,

      path,
      fields: pick(options, [ 'runnerToken', 'jobTypes' ]),
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.OK_200
    }))
  }

  async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
    const { availableJobs } = await this.request({
      ...options,

      jobTypes: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]
    })

    return { availableJobs } as RequestRunnerJobResult<RunnerJobVODPayload>
  }

  async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
    const { availableJobs } = await this.request({
      ...options,

      jobTypes: [ 'live-rtmp-hls-transcoding' ]
    })

    return { availableJobs } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
  }

  // ---------------------------------------------------------------------------

  accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'

    return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
      ...options,

      path,
      fields: pick(options, [ 'runnerToken' ]),
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.OK_200
    }))
  }

  abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'

    return this.postBodyRequest({
      ...options,

      path,
      fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'

    const { payload } = options
    const attaches: { [id: string]: any } = {}
    let payloadWithoutFiles = payload

    if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
      if (payload.masterPlaylistFile) {
        attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
      }

      attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
      attaches[`payload[videoChunkFile]`] = payload.videoChunkFile

      payloadWithoutFiles = omit(payloadWithoutFiles, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
    }

    return this.postUploadRequest({
      ...options,

      path,
      fields: {
        ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),

        payload: payloadWithoutFiles
      },
      attaches,
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'

    return this.postBodyRequest({
      ...options,

      path,
      fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
    const { payload } = options

    const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
    const attaches: { [id: string]: any } = {}
    let payloadWithoutFiles = payload

    if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
      attaches[`payload[videoFile]`] = payload.videoFile

      payloadWithoutFiles = omit(payloadWithoutFiles as VODWebVideoTranscodingSuccess, [ 'videoFile' ])
    }

    if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
      attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile

      payloadWithoutFiles = omit(payloadWithoutFiles as VODHLSTranscodingSuccess, [ 'resolutionPlaylistFile' ])
    }

    if (isTranscriptionPayloadSuccess(payload) && payload.vttFile) {
      attaches[`payload[vttFile]`] = payload.vttFile

      payloadWithoutFiles = omit(payloadWithoutFiles as TranscriptionSuccess, [ 'vttFile' ])
    }

    return this.postUploadRequest({
      ...options,

      path,
      attaches,
      fields: {
        ...pick(options, [ 'jobToken', 'runnerToken' ]),

        payload: payloadWithoutFiles
      },
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
    })
  }

  getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
    const { host, protocol, pathname } = new URL(options.url)

    return this.postBodyRequest({
      url: `${protocol}//${host}`,
      path: pathname,

      fields: pick(options, [ 'jobToken', 'runnerToken' ]),
      implicitToken: false,
      defaultExpectedStatus: HttpStatusCode.OK_200
    })
  }

  // ---------------------------------------------------------------------------

  async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
    const { availableJobs } = await this.request(options)

    const job = options.type
      ? availableJobs.find(j => j.type === options.type)
      : availableJobs[0]

    return this.accept({ ...options, jobUUID: job.uuid })
  }

  async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
    let jobUUID = jobUUIDToProcess

    if (!jobUUID) {
      const { availableJobs } = await this.request({ runnerToken })
      jobUUID = availableJobs[0].uuid
    }

    const { job } = await this.accept({ runnerToken, jobUUID })
    const jobToken = job.jobToken

    const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
    await this.success({ runnerToken, jobUUID, jobToken, payload })

    await waitJobs([ this.server ])

    return job
  }

  async cancelAllJobs (options: { state?: RunnerJobStateType } = {}) {
    const { state } = options

    const { data } = await this.list({ count: 100 })

    const allowedStates = new Set<RunnerJobStateType>([
      RunnerJobState.PENDING,
      RunnerJobState.PROCESSING,
      RunnerJobState.WAITING_FOR_PARENT_JOB
    ])

    for (const job of data) {
      if (state && job.state.id !== state) continue
      else if (allowedStates.has(job.state.id) !== true) continue

      await this.cancelByAdmin({ jobUUID: job.uuid })
    }
  }

  async getJob (options: OverrideCommandOptions & { uuid: string }) {
    const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })

    return data.find(j => j.uuid === options.uuid)
  }

  async requestLiveJob (runnerToken: string) {
    let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []

    while (availableJobs.length === 0) {
      const result = await this.requestLive({ runnerToken })
      availableJobs = result.availableJobs

      if (availableJobs.length === 1) break

      await wait(150)
    }

    return availableJobs[0]
  }
}