apps/peertube-runner/src/server/process/shared/process-live.ts
import { wait } from '@peertube/peertube-core-utils'
import {
ffprobePromise,
getVideoStreamBitrate,
getVideoStreamDimensionsInfo,
hasAudioStream,
hasVideoStream
} from '@peertube/peertube-ffmpeg'
import {
LiveRTMPHLSTranscodingSuccess,
LiveRTMPHLSTranscodingUpdatePayload,
PeerTubeProblemDocument,
RunnerJobLiveRTMPHLSTranscodingPayload,
ServerErrorCode
} from '@peertube/peertube-models'
import { buildUUID } from '@peertube/peertube-node-utils'
import { FSWatcher, watch } from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { ensureDir, remove } from 'fs-extra/esm'
import { readFile } from 'fs/promises'
import { basename, join } from 'path'
import { ConfigManager } from '../../../shared/config-manager.js'
import { logger } from '../../../shared/index.js'
import { buildFFmpegLive, ProcessOptions } from './common.js'
type CustomLiveRTMPHLSTranscodingUpdatePayload =
Omit<LiveRTMPHLSTranscodingUpdatePayload, 'resolutionPlaylistFile'> & { resolutionPlaylistFile?: [ Buffer, string ] | Blob | string }
export class ProcessLiveRTMPHLSTranscoding {
private readonly outputPath: string
private readonly fsWatchers: FSWatcher[] = []
// Playlist name -> chunks
private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
private readonly playlistsCreated = new Set<string>()
private allPlaylistsCreated = false
private latestFilteredPlaylistContent: { [name: string]: string } = {}
private ffmpegCommand: FfmpegCommand
private ended = false
private errored = false
constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
}
process () {
const job = this.options.job
const payload = job.payload
return new Promise<void>(async (res, rej) => {
try {
await ensureDir(this.outputPath)
logger.info(`Probing ${payload.input.rtmpUrl}`)
const probe = await ffprobePromise(payload.input.rtmpUrl)
logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
const hasVideo = await hasVideoStream(payload.input.rtmpUrl, probe)
const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
this.fsWatchers.push(m3u8Watcher)
const tsWatcher = watch(this.outputPath + '/*.ts')
this.fsWatchers.push(tsWatcher)
m3u8Watcher.on('change', p => {
logger.debug(`${p} m3u8 playlist changed`)
})
m3u8Watcher.on('add', p => {
this.playlistsCreated.add(p)
if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) {
this.allPlaylistsCreated = true
logger.info('All m3u8 playlists are created.')
}
})
tsWatcher.on('add', async p => {
try {
await this.sendPendingChunks()
} catch (err) {
this.onUpdateError({ err, rej, res })
}
const playlistName = this.getPlaylistIdFromTS(p)
const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
pendingChunks.push(p)
this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
})
tsWatcher.on('unlink', p => {
this.sendDeletedChunkUpdate(p)
.catch(err => this.onUpdateError({ err, rej, res }))
})
this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
inputUrl: payload.input.rtmpUrl,
outPath: this.outputPath,
masterPlaylistName: 'master.m3u8',
segmentListSize: payload.output.segmentListSize,
segmentDuration: payload.output.segmentDuration,
toTranscode: payload.output.toTranscode,
splitAudioAndVideo: true,
bitrate,
ratio,
hasAudio,
hasVideo,
probe
})
logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
this.onFFmpegError({ err, stdout, stderr })
res()
})
this.ffmpegCommand.on('end', () => {
this.onFFmpegEnded()
.catch(err => logger.error({ err }, 'Error in FFmpeg end handler'))
res()
})
this.ffmpegCommand.run()
} catch (err) {
rej(err)
}
})
}
// ---------------------------------------------------------------------------
private onUpdateError (options: {
err: Error
res: () => void
rej: (reason?: any) => void
}) {
const { err, res, rej } = options
if (this.errored) return
if (this.ended) return
this.errored = true
this.ffmpegCommand.kill('SIGINT')
const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code
if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
logger.info('Stopping transcoding as the job is not in processing state anymore')
this.sendSuccess()
.catch(err => logger.error({ err }, 'Cannot send success'))
res()
} else {
logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding')
this.sendError(err)
.catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
rej(err)
}
this.cleanup()
}
// ---------------------------------------------------------------------------
private onFFmpegError (options: {
err: any
stdout: string
stderr: string
}) {
const { err, stdout, stderr } = options
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
if (this.errored) return
if (this.ended) return
this.errored = true
logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.')
this.sendError(err)
.catch(subErr => logger.error({ err: subErr }, 'Cannot send error'))
this.cleanup()
}
private async sendError (err: Error) {
await this.options.server.runnerJobs.error({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
message: err.message
})
}
// ---------------------------------------------------------------------------
private async onFFmpegEnded () {
if (this.ended) return
this.ended = true
logger.info('FFmpeg ended, sending success to server')
// Wait last ffmpeg chunks generation
await wait(1500)
this.sendSuccess()
.catch(err => logger.error({ err }, 'Cannot send success'))
this.cleanup()
}
private async sendSuccess () {
const successBody: LiveRTMPHLSTranscodingSuccess = {}
await this.options.server.runnerJobs.success({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload: successBody
})
}
// ---------------------------------------------------------------------------
private sendDeletedChunkUpdate (deletedChunk: string): Promise<any> {
if (this.ended) return Promise.resolve()
logger.debug(`Sending removed live chunk ${deletedChunk} update`)
const videoChunkFilename = basename(deletedChunk)
let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = {
type: 'remove-chunk',
videoChunkFilename
}
if (this.allPlaylistsCreated) {
const playlistName = this.getPlaylistName(videoChunkFilename)
payload = {
...payload,
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
resolutionPlaylistFilename: playlistName,
resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName)
}
}
return this.updateWithRetry(payload)
}
private async sendPendingChunks (): Promise<any> {
if (this.ended) return Promise.resolve()
const parallelPromises: Promise<any>[] = []
for (const playlist of this.pendingChunksPerPlaylist.keys()) {
let sequentialPromises: Promise<any>
for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
logger.debug(`Sending added live chunk ${chunk} update`)
const videoChunkFilename = basename(chunk)
const payloadBuilder = async () => {
let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = {
type: 'add-chunk',
videoChunkFilename,
videoChunkFile: chunk
}
if (this.allPlaylistsCreated) {
const playlistName = this.getPlaylistName(videoChunkFilename)
await this.updatePlaylistContent(playlistName, videoChunkFilename)
payload = {
...payload,
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
resolutionPlaylistFilename: playlistName,
resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName)
}
}
return payload
}
const p = payloadBuilder().then(p => this.updateWithRetry(p))
if (!sequentialPromises) sequentialPromises = p
else sequentialPromises = sequentialPromises.then(() => p)
}
parallelPromises.push(sequentialPromises)
this.pendingChunksPerPlaylist.set(playlist, [])
}
await Promise.all(parallelPromises)
}
private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
if (this.ended || this.errored) return
try {
await this.options.server.runnerJobs.update({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload: payload as any
})
} catch (err) {
if (currentTry >= 3) throw err
if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
logger.warn({ err }, 'Will retry update after error')
await wait(250)
return this.updateWithRetry(payload, currentTry + 1)
}
}
private getPlaylistName (videoChunkFilename: string) {
return `${videoChunkFilename.split('-')[0]}.m3u8`
}
private getPlaylistIdFromTS (segmentPath: string) {
const playlistIdMatcher = /^([\d+])-/
return basename(segmentPath).match(playlistIdMatcher)[1]
}
private async updatePlaylistContent (playlistName: string, latestChunkFilename: string) {
const m3u8Path = join(this.outputPath, playlistName)
const playlistContent = await readFile(m3u8Path, 'utf-8')
// Remove new chunk references, that will be processed later
this.latestFilteredPlaylistContent[playlistName] = playlistContent
.substring(0, playlistContent.lastIndexOf(latestChunkFilename) + latestChunkFilename.length) + '\n'
}
private buildPlaylistFileParam (playlistName: string) {
return [
Buffer.from(this.latestFilteredPlaylistContent[playlistName], 'utf-8'),
join(this.outputPath, 'master.m3u8')
] as [ Buffer, string ]
}
// ---------------------------------------------------------------------------
private cleanup () {
logger.debug(`Cleaning up job ${this.options.job.uuid}`)
for (const fsWatcher of this.fsWatchers) {
fsWatcher.close()
.catch(err => logger.error({ err }, 'Cannot close watcher'))
}
remove(this.outputPath)
.catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`))
}
}