master-of-zen/Av1an

View on GitHub
av1an-core/src/context.rs

Summary

Maintainability
Test Coverage
use std::borrow::Cow;
use std::cmp::Reverse;
use std::collections::BTreeSet;
use std::convert::TryInto;
use std::ffi::OsString;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{exit, Command, Stdio};
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
use std::sync::{mpsc, Arc};
use std::{cmp, fs, iter, thread};

use ansi_term::{Color, Style};
use anyhow::{bail, Context};
use av1_grain::TransferFunction;
use crossbeam_utils;
use itertools::Itertools;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::ChildStderr;

use crate::broker::{Broker, EncoderCrash};
use crate::chunk::Chunk;
use crate::concat::{self, ConcatMethod};
use crate::ffmpeg::{compose_ffmpeg_pipe, num_frames};
use crate::progress_bar::{
  finish_progress_bar, inc_bar, inc_mp_bar, init_multi_progress_bar, init_progress_bar,
  reset_bar_at, reset_mp_bar_at, set_audio_size, update_mp_chunk, update_mp_msg,
  update_progress_bar_estimates,
};
use crate::scene_detect::av_scenechange_detect;
use crate::scenes::{Scene, ZoneOptions};
use crate::settings::{EncodeArgs, InputPixelFormat};
use crate::split::{extra_splits, segment, write_scenes_to_file};
use crate::vapoursynth::create_vs_file;
use crate::{
  create_dir, determine_workers, get_done, init_done, into_vec, read_chunk_queue, save_chunk_queue,
  vmaf, ChunkMethod, ChunkOrdering, DashMap, DoneJson, Input, SplitMethod, Verbosity,
};

pub struct Av1anContext {
  pub frames: usize,
  pub vs_script: Option<PathBuf>,
  pub args: EncodeArgs,
}

impl Av1anContext {
  pub fn new(mut args: EncodeArgs) -> anyhow::Result<Self> {
    args.validate()?;
    let mut this = Self {
      frames: 0,
      vs_script: None,
      args,
    };
    this.initialize()?;
    Ok(this)
  }

  /// Initialize logging routines and create temporary directories
  fn initialize(&mut self) -> anyhow::Result<()> {
    ffmpeg::init()?;
    ffmpeg::util::log::set_level(ffmpeg::util::log::level::Level::Fatal);

    if !self.args.resume && Path::new(&self.args.temp).is_dir() {
      fs::remove_dir_all(&self.args.temp)
        .with_context(|| format!("Failed to remove temporary directory {:?}", &self.args.temp))?;
    }

    create_dir!(Path::new(&self.args.temp))?;
    create_dir!(Path::new(&self.args.temp).join("split"))?;
    create_dir!(Path::new(&self.args.temp).join("encode"))?;

    debug!("temporary directory: {}", &self.args.temp);

    let done_path = Path::new(&self.args.temp).join("done.json");
    let done_json_exists = done_path.exists();
    let chunks_json_exists = Path::new(&self.args.temp).join("chunks.json").exists();

    if self.args.resume {
      match (done_json_exists, chunks_json_exists) {
        // both files exist, so there is no problem
        (true, true) => {}
        (false, true) => {
          info!(
            "resume was set but done.json does not exist in temporary directory {:?}",
            &self.args.temp
          );
          self.args.resume = false;
        }
        (true, false) => {
          info!(
            "resume was set but chunks.json does not exist in temporary directory {:?}",
            &self.args.temp
          );
          self.args.resume = false;
        }
        (false, false) => {
          info!(
            "resume was set but neither chunks.json nor done.json exist in temporary directory {:?}",
            &self.args.temp
          );
          self.args.resume = false;
        }
      }
    }

    if self.args.resume && done_json_exists {
      let done =
        fs::read_to_string(done_path).with_context(|| "Failed to read contents of done.json")?;
      let done: DoneJson =
        serde_json::from_str(&done).with_context(|| "Failed to parse done.json")?;
      self.frames = done.frames.load(atomic::Ordering::Relaxed);

      // frames need to be recalculated in this case
      if self.frames == 0 {
        self.frames = self.args.input.frames()?;
        done.frames.store(self.frames, atomic::Ordering::Relaxed);
      }

      init_done(done);
    } else {
      init_done(DoneJson {
        frames: AtomicUsize::new(0),
        done: DashMap::new(),
        audio_done: AtomicBool::new(false),
      });

      let mut done_file = File::create(&done_path).unwrap();
      done_file.write_all(serde_json::to_string(get_done())?.as_bytes())?;
    };

    Ok(())
  }

  pub fn encode_file(&mut self) -> anyhow::Result<()> {
    let initial_frames = get_done()
      .done
      .iter()
      .map(|ref_multi| ref_multi.frames)
      .sum::<usize>();

    let vspipe_cache =
        // Technically we should check if the vapoursynth cache file exists rather than !self.resume,
        // but the code still works if we are resuming and the cache file doesn't exist (as it gets
        // generated when vspipe is first called), so it's not worth adding all the extra complexity.
        if (self.args.input.is_vapoursynth()
            || (self.args.input.is_video()
            && matches!(self.args.chunk_method, ChunkMethod::LSMASH | ChunkMethod::FFMS2 | ChunkMethod::DGDECNV | ChunkMethod::BESTSOURCE)))
            && !self.args.resume
        {
          self.vs_script = Some(match &self.args.input {
            Input::VapourSynth(path) => path.clone(),
            Input::Video(path) => create_vs_file(&self.args.temp, path, self.args.chunk_method)?,
          });

          let vs_script = self.vs_script.clone().unwrap();
          Some({
            thread::spawn(move || {
              Command::new("vspipe")
                  .arg("-i")
                  .arg(vs_script)
                  .args(["-i", "-"])
                  .stdout(Stdio::piped())
                  .stderr(Stdio::piped())
                  .spawn()
                  .unwrap()
                  .wait()
                  .unwrap()
            })
          })
        } else {
          None
        };

    let res = self.args.input.resolution()?;
    let fps = self.args.input.frame_rate()?;
    let format = self.args.input.pixel_format()?;
    let tfc = self
      .args
      .input
      .transfer_function_params_adjusted(&self.args.video_params)?;
    info!(
      "Input: {}x{} @ {:.3} fps, {}, {}",
      res.0,
      res.1,
      fps,
      format,
      match tfc {
        TransferFunction::SMPTE2084 => "HDR",
        TransferFunction::BT1886 => "SDR",
      }
    );

    let splits = self.split_routine()?;

    if self.args.sc_only {
      debug!("scene detection only");

      if let Err(e) = fs::remove_dir_all(&self.args.temp) {
        warn!("Failed to delete temp directory: {}", e);
      }

      exit(0);
    }

    let (chunk_queue, total_chunks) = self.load_or_gen_chunk_queue(&splits)?;

    if self.args.resume {
      let chunks_done = get_done().done.len();
      info!(
        "encoding resumed with {}/{} chunks completed ({} remaining)",
        chunks_done,
        chunk_queue.len() + chunks_done,
        chunk_queue.len()
      );
    }

    if let Some(vspipe_cache) = vspipe_cache {
      vspipe_cache.join().unwrap();
    }

    crossbeam_utils::thread::scope(|s| -> anyhow::Result<()> {
      // vapoursynth audio is currently unsupported
      let audio_thread = if self.args.input.is_video()
        && (!self.args.resume || !get_done().audio_done.load(atomic::Ordering::SeqCst))
      {
        let input = self.args.input.as_video_path();
        let temp = self.args.temp.as_str();
        let audio_params = self.args.audio_params.as_slice();
        Some(s.spawn(move |_| {
          let audio_output = crate::ffmpeg::encode_audio(input, temp, audio_params);
          get_done().audio_done.store(true, atomic::Ordering::SeqCst);

          let progress_file = Path::new(temp).join("done.json");
          let mut progress_file = File::create(progress_file).unwrap();
          progress_file
            .write_all(serde_json::to_string(get_done()).unwrap().as_bytes())
            .unwrap();

          if let Some(ref audio_output) = audio_output {
            let audio_size = audio_output.metadata().unwrap().len();
            set_audio_size(audio_size);
          }

          audio_output.is_some()
        }))
      } else {
        None
      };

      if self.args.workers == 0 {
        self.args.workers = determine_workers(self.args.encoder) as usize;
      }
      self.args.workers = cmp::min(self.args.workers, chunk_queue.len());

      if atty::is(atty::Stream::Stderr) {
        eprintln!(
          "{}{} {} {}{} {} {}{} {}\n{}: {}",
          Color::Green.bold().paint("Q"),
          Color::Green.paint("ueue"),
          Color::Green.bold().paint(format!("{}", chunk_queue.len())),
          Color::Blue.bold().paint("W"),
          Color::Blue.paint("orkers"),
          Color::Blue.bold().paint(format!("{}", self.args.workers)),
          Color::Purple.bold().paint("P"),
          Color::Purple.paint("asses"),
          Color::Purple.bold().paint(format!("{}", self.args.passes)),
          Style::default().bold().paint("Params"),
          Style::default()
            .dimmed()
            .paint(self.args.video_params.join(" "))
        );
      } else {
        eprintln!(
          "Queue {} Workers {} Passes {}\nParams: {}",
          chunk_queue.len(),
          self.args.workers,
          self.args.passes,
          self.args.video_params.join(" ")
        );
      }

      if self.args.verbosity == Verbosity::Normal {
        init_progress_bar(self.frames as u64, initial_frames as u64);
        reset_bar_at(initial_frames as u64);
      } else if self.args.verbosity == Verbosity::Verbose {
        init_multi_progress_bar(
          self.frames as u64,
          self.args.workers,
          total_chunks,
          initial_frames as u64,
        );
        reset_mp_bar_at(initial_frames as u64);
      }

      if !get_done().done.is_empty() {
        let frame_rate = self.args.input.frame_rate()?;
        update_progress_bar_estimates(frame_rate, self.frames, self.args.verbosity);
      }

      let broker = Broker {
        chunk_queue,
        project: self,
      };

      let (tx, rx) = mpsc::channel();
      let handle = s.spawn(|_| {
        broker.encoding_loop(tx, self.args.set_thread_affinity);
      });

      // Queue::encoding_loop only sends a message if there was an error (meaning a chunk crashed)
      // more than MAX_TRIES. So, we have to explicitly exit the program if that happens.
      if rx.recv().is_ok() {
        exit(1);
      }

      handle.join().unwrap();

      finish_progress_bar();

      // TODO add explicit parameter to concatenation functions to control whether audio is also muxed in
      let _audio_output_exists =
        audio_thread.map_or(false, |audio_thread| audio_thread.join().unwrap());

      debug!("encoding finished, concatenating with {}", self.args.concat);

      match self.args.concat {
        ConcatMethod::Ivf => {
          concat::ivf(
            &Path::new(&self.args.temp).join("encode"),
            self.args.output_file.as_ref(),
          )?;
        }
        ConcatMethod::MKVMerge => {
          concat::mkvmerge(
            self.args.temp.as_ref(),
            self.args.output_file.as_ref(),
            self.args.encoder,
            total_chunks,
          )?;
        }
        ConcatMethod::FFmpeg => {
          concat::ffmpeg(self.args.temp.as_ref(), self.args.output_file.as_ref())?;
        }
      }

      if let Some(ref tq) = self.args.target_quality {
        let mut temp_res = tq.vmaf_res.to_string();
        if tq.vmaf_res == "inputres" {
          let inputres = self.args.input.resolution()?;
          temp_res.push_str(&format!(
            "{}x{}",
            &inputres.0.to_string(),
            &inputres.1.to_string()
          ));
          temp_res.to_string();
        } else {
          temp_res = tq.vmaf_res.to_string();
        }

        if self.args.vmaf {
          if let Err(e) = vmaf::plot(
            self.args.output_file.as_ref(),
            &self.args.input,
            tq.model.as_deref(),
            temp_res.as_str(),
            tq.vmaf_scaler.as_str(),
            1,
            tq.vmaf_filter.as_deref(),
            tq.vmaf_threads,
          ) {
            error!("VMAF calculation failed with error: {}", e);
          }
        }
      }

      if !Path::new(&self.args.output_file).exists() {
        warn!(
          "Concatenation failed for unknown reasons! Temp folder will not be deleted: {}",
          &self.args.temp
        );
      } else if !self.args.keep {
        if let Err(e) = fs::remove_dir_all(&self.args.temp) {
          warn!("Failed to delete temp directory: {}", e);
        }
      }

      Ok(())
    })
    .unwrap()?;

    Ok(())
  }

  fn read_queue_files(source_path: &Path) -> anyhow::Result<Vec<PathBuf>> {
    let mut queue_files = fs::read_dir(source_path)
      .with_context(|| format!("Failed to read queue files from source path {source_path:?}"))?
      .map(|res| res.map(|e| e.path()))
      .collect::<Result<Vec<_>, _>>()?;

    queue_files.retain(|file| {
      file.is_file() && matches!(file.extension().map(|ext| ext == "mkv"), Some(true))
    });
    concat::sort_files_by_filename(&mut queue_files);

    Ok(queue_files)
  }

  /// Returns the number of frames encoded if crashed, to reset the progress bar.
  pub fn create_pipes(
    &self,
    chunk: &Chunk,
    current_pass: u8,
    worker_id: usize,
    padding: usize,
  ) -> Result<(), (Box<EncoderCrash>, u64)> {
    update_mp_chunk(worker_id, chunk.index, padding);

    let fpf_file = Path::new(&chunk.temp)
      .join("split")
      .join(format!("{}_fpf", chunk.name()));

    let video_params = chunk.video_params.clone();

    let mut enc_cmd = if chunk.passes == 1 {
      chunk
        .encoder
        .compose_1_1_pass(video_params, chunk.output(), chunk.frames())
    } else if current_pass == 1 {
      chunk
        .encoder
        .compose_1_2_pass(video_params, fpf_file.to_str().unwrap(), chunk.frames())
    } else {
      chunk.encoder.compose_2_2_pass(
        video_params,
        fpf_file.to_str().unwrap(),
        chunk.output(),
        chunk.frames(),
      )
    };

    if let Some(per_shot_target_quality_cq) = chunk.tq_cq {
      enc_cmd = chunk
        .encoder
        .man_command(enc_cmd, per_shot_target_quality_cq as usize);
    }

    let rt = tokio::runtime::Builder::new_current_thread()
      .enable_io()
      .build()
      .unwrap();

    let (source_pipe_stderr, ffmpeg_pipe_stderr, enc_output, enc_stderr, frame) =
      rt.block_on(async {
        let mut source_pipe = if let [source, args @ ..] = &*chunk.source_cmd {
          tokio::process::Command::new(source)
            .args(args)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .unwrap()
        } else {
          unreachable!()
        };

        let source_pipe_stdout: Stdio = source_pipe.stdout.take().unwrap().try_into().unwrap();

        let source_pipe_stderr = source_pipe.stderr.take().unwrap();

        // converts the pixel format
        let create_ffmpeg_pipe = |pipe_from: Stdio, source_pipe_stderr: ChildStderr| {
          let ffmpeg_pipe = compose_ffmpeg_pipe(
            self.args.ffmpeg_filter_args.as_slice(),
            self.args.output_pix_format.format,
          );

          let mut ffmpeg_pipe = if let [ffmpeg, args @ ..] = &*ffmpeg_pipe {
            tokio::process::Command::new(ffmpeg)
              .args(args)
              .stdin(pipe_from)
              .stdout(Stdio::piped())
              .stderr(Stdio::piped())
              .spawn()
              .unwrap()
          } else {
            unreachable!()
          };

          let ffmpeg_pipe_stdout: Stdio = ffmpeg_pipe.stdout.take().unwrap().try_into().unwrap();
          let ffmpeg_pipe_stderr = ffmpeg_pipe.stderr.take().unwrap();
          (
            ffmpeg_pipe_stdout,
            source_pipe_stderr,
            Some(ffmpeg_pipe_stderr),
          )
        };

        let (y4m_pipe, source_pipe_stderr, mut ffmpeg_pipe_stderr) =
          if self.args.ffmpeg_filter_args.is_empty() {
            match &self.args.input_pix_format {
              InputPixelFormat::FFmpeg { format } => {
                if self.args.output_pix_format.format == *format {
                  (source_pipe_stdout, source_pipe_stderr, None)
                } else {
                  create_ffmpeg_pipe(source_pipe_stdout, source_pipe_stderr)
                }
              }
              InputPixelFormat::VapourSynth { bit_depth } => {
                if self.args.output_pix_format.bit_depth == *bit_depth {
                  (source_pipe_stdout, source_pipe_stderr, None)
                } else {
                  create_ffmpeg_pipe(source_pipe_stdout, source_pipe_stderr)
                }
              }
            }
          } else {
            create_ffmpeg_pipe(source_pipe_stdout, source_pipe_stderr)
          };

        let mut source_reader = BufReader::new(source_pipe_stderr).lines();
        let ffmpeg_reader = ffmpeg_pipe_stderr
          .take()
          .map(|stderr| BufReader::new(stderr).lines());

        let pipe_stderr = Arc::new(parking_lot::Mutex::new(String::with_capacity(128)));
        let p_stdr2 = Arc::clone(&pipe_stderr);

        let ffmpeg_stderr = if ffmpeg_reader.is_some() {
          Some(Arc::new(parking_lot::Mutex::new(String::with_capacity(
            128,
          ))))
        } else {
          None
        };

        let f_stdr2 = ffmpeg_stderr.clone();

        tokio::spawn(async move {
          while let Some(line) = source_reader.next_line().await.unwrap() {
            p_stdr2.lock().push_str(&line);
            p_stdr2.lock().push('\n');
          }
        });
        if let Some(mut ffmpeg_reader) = ffmpeg_reader {
          let f_stdr2 = f_stdr2.unwrap();
          tokio::spawn(async move {
            while let Some(line) = ffmpeg_reader.next_line().await.unwrap() {
              f_stdr2.lock().push_str(&line);
              f_stdr2.lock().push('\n');
            }
          });
        }

        let mut enc_pipe = if let [encoder, args @ ..] = &*enc_cmd {
          tokio::process::Command::new(encoder)
            .args(args)
            .stdin(y4m_pipe)
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .unwrap()
        } else {
          unreachable!()
        };

        let mut frame = 0;

        let mut reader = BufReader::new(enc_pipe.stderr.take().unwrap());

        let mut buf = Vec::with_capacity(128);
        let mut enc_stderr = String::with_capacity(128);

        while let Ok(read) = reader.read_until(b'\r', &mut buf).await {
          if read == 0 {
            break;
          }

          if let Ok(line) = simdutf8::basic::from_utf8_mut(&mut buf) {
            if self.args.verbosity == Verbosity::Verbose && !line.contains('\n') {
              update_mp_msg(worker_id, line.trim().to_string());
            }
            // This needs to be done before parse_encoded_frames, as it potentially
            // mutates the string
            enc_stderr.push_str(line);
            enc_stderr.push('\n');

            if current_pass == chunk.passes {
              if let Some(new) = chunk.encoder.parse_encoded_frames(line) {
                if new > frame {
                  if self.args.verbosity == Verbosity::Normal {
                    inc_bar(new - frame);
                  } else if self.args.verbosity == Verbosity::Verbose {
                    inc_mp_bar(new - frame);
                  }
                  frame = new;
                }
              }
            }
          }

          buf.clear();
        }

        let enc_output = enc_pipe.wait_with_output().await.unwrap();

        let source_pipe_stderr = pipe_stderr.lock().clone();
        let ffmpeg_pipe_stderr = ffmpeg_stderr.map(|x| x.lock().clone());
        (
          source_pipe_stderr,
          ffmpeg_pipe_stderr,
          enc_output,
          enc_stderr,
          frame,
        )
      });

    if !enc_output.status.success() {
      return Err((
        Box::new(EncoderCrash {
          exit_status: enc_output.status,
          source_pipe_stderr: source_pipe_stderr.into(),
          ffmpeg_pipe_stderr: ffmpeg_pipe_stderr.map(Into::into),
          stderr: enc_stderr.into(),
          stdout: enc_output.stdout.into(),
        }),
        frame,
      ));
    }

    if current_pass == chunk.passes {
      let encoded_frames = num_frames(chunk.output().as_ref());

      let err_str = match encoded_frames {
        Ok(encoded_frames) if !chunk.ignore_frame_mismatch && encoded_frames != chunk.frames() => {
          Some(format!(
            "FRAME MISMATCH: chunk {}: {encoded_frames}/{} (actual/expected frames)",
            chunk.index,
            chunk.frames()
          ))
        }
        Err(error) => Some(format!(
          "FAILED TO COUNT FRAMES: chunk {}: {error}",
          chunk.index
        )),
        _ => None,
      };

      if let Some(err_str) = err_str {
        return Err((
          Box::new(EncoderCrash {
            exit_status: enc_output.status,
            source_pipe_stderr: source_pipe_stderr.into(),
            ffmpeg_pipe_stderr: ffmpeg_pipe_stderr.map(Into::into),
            stderr: enc_stderr.into(),
            stdout: err_str.into(),
          }),
          frame,
        ));
      }
    }

    Ok(())
  }

  fn create_encoding_queue(&mut self, scenes: &[Scene]) -> anyhow::Result<Vec<Chunk>> {
    let mut chunks = match &self.args.input {
      Input::Video(_) => match self.args.chunk_method {
        ChunkMethod::FFMS2
        | ChunkMethod::LSMASH
        | ChunkMethod::DGDECNV
        | ChunkMethod::BESTSOURCE => {
          let vs_script = self.vs_script.as_ref().unwrap().as_path();
          self.create_video_queue_vs(scenes, vs_script)
        }
        ChunkMethod::Hybrid => self.create_video_queue_hybrid(scenes)?,
        ChunkMethod::Select => self.create_video_queue_select(scenes),
        ChunkMethod::Segment => self.create_video_queue_segment(scenes)?,
      },
      Input::VapourSynth(vs_script) => self.create_video_queue_vs(scenes, vs_script.as_path()),
    };

    match self.args.chunk_order {
      ChunkOrdering::LongestFirst => {
        chunks.sort_unstable_by_key(|chunk| Reverse(chunk.frames()));
      }
      ChunkOrdering::ShortestFirst => {
        chunks.sort_unstable_by_key(Chunk::frames);
      }
      ChunkOrdering::Sequential => {
        // Already in order
      }
      ChunkOrdering::Random => {
        chunks.shuffle(&mut thread_rng());
      }
    }

    Ok(chunks)
  }

  fn calc_split_locations(&self) -> anyhow::Result<(Vec<Scene>, usize)> {
    let zones = self.parse_zones()?;

    Ok(match self.args.split_method {
      SplitMethod::AvScenechange => av_scenechange_detect(
        &self.args.input,
        self.args.encoder,
        self.frames,
        self.args.min_scene_len,
        self.args.verbosity,
        self.args.scaler.as_str(),
        self.args.sc_pix_format,
        self.args.sc_method,
        self.args.sc_downscale_height,
        &zones,
      )?,
      SplitMethod::None => {
        let mut scenes = Vec::with_capacity(2 * zones.len() + 1);
        let mut frames_processed = 0;
        for zone in zones {
          let end_frame = zone.end_frame;

          if end_frame > frames_processed {
            scenes.push(Scene {
              start_frame: frames_processed,
              end_frame: zone.start_frame,
              zone_overrides: None,
            });
          }

          scenes.push(zone);

          frames_processed += end_frame;
        }
        if self.frames > frames_processed {
          scenes.push(Scene {
            start_frame: frames_processed,
            end_frame: self.frames,
            zone_overrides: None,
          });
        }

        (scenes, self.args.input.frames()?)
      }
    })
  }

  fn parse_zones(&self) -> anyhow::Result<Vec<Scene>> {
    let mut zones = Vec::new();
    if let Some(ref zones_file) = self.args.zones {
      let input = fs::read_to_string(zones_file)?;
      for zone_line in input.lines().map(str::trim).filter(|line| !line.is_empty()) {
        zones.push(Scene::parse_from_zone(zone_line, self)?);
      }
      zones.sort_unstable_by_key(|zone| zone.start_frame);
      let mut segments = BTreeSet::new();
      for zone in &zones {
        if segments.contains(&zone.start_frame) {
          bail!("Zones file contains overlapping zones");
        }
        segments.extend(zone.start_frame..zone.end_frame);
      }
    }
    Ok(zones)
  }

  // If we are not resuming, then do scene detection. Otherwise: get scenes from
  // scenes.json and return that.
  fn split_routine(&mut self) -> anyhow::Result<Vec<Scene>> {
    let scene_file = self.args.scenes.as_ref().map_or_else(
      || Cow::Owned(Path::new(&self.args.temp).join("scenes.json")),
      |path| Cow::Borrowed(path.as_path()),
    );

    let used_existing_cuts;
    let (mut scenes, frames) =
      if (self.args.scenes.is_some() && scene_file.exists()) || self.args.resume {
        used_existing_cuts = true;
        crate::split::read_scenes_from_file(scene_file.as_ref())?
      } else {
        used_existing_cuts = false;
        self.frames = self.args.input.frames()?;
        self.calc_split_locations()?
      };
    self.frames = frames;
    get_done()
      .frames
      .store(self.frames, atomic::Ordering::SeqCst);

    // Add forced keyframes
    for kf in &self.args.force_keyframes {
      if let Some((scene_pos, s)) = scenes
        .iter_mut()
        .find_position(|s| (s.start_frame..s.end_frame).contains(kf))
      {
        if *kf == s.start_frame {
          // Already a keyframe
          continue;
        }
        // Split this scene into two scenes at the requested keyframe
        let mut new = s.clone();
        s.end_frame = *kf;
        new.start_frame = *kf;
        scenes.insert(scene_pos + 1, new);
      } else {
        warn!(
          "scene {} was requested as a forced keyframe but video has {} frames, ignoring",
          *kf, frames
        );
      }
    }

    let scenes_before = scenes.len();
    if !used_existing_cuts {
      if let Some(split_len @ 1..) = self.args.extra_splits_len {
        scenes = extra_splits(&scenes, self.frames, split_len);
        let scenes_after = scenes.len();
        info!(
          "scenecut: found {} scene(s) [with extra_splits ({} frames): {} scene(s)]",
          scenes_before, split_len, scenes_after
        );
      } else {
        info!("scenecut: found {} scene(s)", scenes_before);
      }
    }

    write_scenes_to_file(&scenes, self.frames, scene_file)?;

    Ok(scenes)
  }

  fn create_select_chunk(
    &self,
    index: usize,
    src_path: &Path,
    start_frame: usize,
    end_frame: usize,
    frame_rate: f64,
    overrides: Option<ZoneOptions>,
  ) -> anyhow::Result<Chunk> {
    assert!(
      start_frame < end_frame,
      "Can't make a chunk with <= 0 frames!"
    );

    let ffmpeg_gen_cmd: Vec<OsString> = into_vec![
      "ffmpeg",
      "-y",
      "-hide_banner",
      "-loglevel",
      "error",
      "-i",
      src_path,
      "-vf",
      format!(
        "select=between(n\\,{}\\,{}),setpts=PTS-STARTPTS",
        start_frame,
        end_frame - 1
      ),
      "-pix_fmt",
      self
        .args
        .output_pix_format
        .format
        .descriptor()
        .unwrap()
        .name(),
      "-strict",
      "-1",
      "-f",
      "yuv4mpegpipe",
      "-",
    ];

    let output_ext = self.args.encoder.output_extension();

    let mut chunk = Chunk {
      temp: self.args.temp.clone(),
      index,
      input: Input::Video(src_path.to_path_buf()),
      source_cmd: ffmpeg_gen_cmd,
      output_ext: output_ext.to_owned(),
      start_frame,
      end_frame,
      frame_rate,
      video_params: overrides.as_ref().map_or_else(
        || self.args.video_params.clone(),
        |ovr| ovr.video_params.clone(),
      ),
      passes: self.args.passes,
      encoder: self.args.encoder,
      noise_size: self.args.photon_noise_size,
      tq_cq: None,
      ignore_frame_mismatch: self.args.ignore_frame_mismatch,
    };
    chunk.apply_photon_noise_args(
      overrides.map_or(self.args.photon_noise, |ovr| ovr.photon_noise),
      self.args.chroma_noise,
    )?;
    if let Some(ref tq) = self.args.target_quality {
      tq.per_shot_target_quality_routine(&mut chunk)?;
    }
    Ok(chunk)
  }

  fn create_vs_chunk(
    &self,
    index: usize,
    vs_script: &Path,
    scene: &Scene,
    frame_rate: f64,
  ) -> anyhow::Result<Chunk> {
    // the frame end boundary is actually a frame that should be included in the next chunk
    let frame_end = scene.end_frame - 1;

    let vspipe_cmd_gen: Vec<OsString> = into_vec![
      "vspipe",
      vs_script,
      "-c",
      "y4m",
      "-",
      "-s",
      scene.start_frame.to_string(),
      "-e",
      frame_end.to_string(),
    ];

    let output_ext = self.args.encoder.output_extension();

    let mut chunk = Chunk {
      temp: self.args.temp.clone(),
      index,
      input: Input::VapourSynth(vs_script.to_path_buf()),
      source_cmd: vspipe_cmd_gen,
      output_ext: output_ext.to_owned(),
      start_frame: scene.start_frame,
      end_frame: scene.end_frame,
      frame_rate,
      video_params: scene.zone_overrides.as_ref().map_or_else(
        || self.args.video_params.clone(),
        |ovr| ovr.video_params.clone(),
      ),
      passes: self.args.passes,
      encoder: self.args.encoder,
      noise_size: self.args.photon_noise_size,
      tq_cq: None,
      ignore_frame_mismatch: self.args.ignore_frame_mismatch,
    };
    chunk.apply_photon_noise_args(
      scene
        .zone_overrides
        .as_ref()
        .map_or(self.args.photon_noise, |ovr| ovr.photon_noise),
      self.args.chroma_noise,
    )?;
    Ok(chunk)
  }

  fn create_video_queue_vs(&self, scenes: &[Scene], vs_script: &Path) -> Vec<Chunk> {
    let frame_rate = self.args.input.frame_rate().unwrap();
    let chunk_queue: Vec<Chunk> = scenes
      .iter()
      .enumerate()
      .map(|(index, scene)| {
        self
          .create_vs_chunk(index, vs_script, scene, frame_rate)
          .unwrap()
      })
      .collect();

    chunk_queue
  }

  fn create_video_queue_select(&self, scenes: &[Scene]) -> Vec<Chunk> {
    let input = self.args.input.as_video_path();
    let frame_rate = self.args.input.frame_rate().unwrap();

    let chunk_queue: Vec<Chunk> = scenes
      .iter()
      .enumerate()
      .map(|(index, scene)| {
        self
          .create_select_chunk(
            index,
            input,
            scene.start_frame,
            scene.end_frame,
            frame_rate,
            scene.zone_overrides.clone(),
          )
          .unwrap()
      })
      .collect();

    chunk_queue
  }

  fn create_video_queue_segment(&self, scenes: &[Scene]) -> anyhow::Result<Vec<Chunk>> {
    let input = self.args.input.as_video_path();
    let frame_rate = self.args.input.frame_rate().unwrap();

    debug!("Splitting video");
    segment(
      input,
      &self.args.temp,
      &scenes
        .iter()
        .skip(1)
        .map(|scene| scene.start_frame)
        .collect::<Vec<usize>>(),
    );
    debug!("Splitting done");

    let source_path = Path::new(&self.args.temp).join("split");
    let queue_files = Self::read_queue_files(&source_path)?;

    assert!(
      !queue_files.is_empty(),
      "Error: No files found in temp/split, probably splitting not working"
    );

    let chunk_queue: Vec<Chunk> = queue_files
      .iter()
      .enumerate()
      .map(|(index, file)| {
        self
          .create_chunk_from_segment(
            index,
            file.as_path().to_str().unwrap(),
            frame_rate,
            scenes[index].zone_overrides.clone(),
          )
          .unwrap()
      })
      .collect();

    Ok(chunk_queue)
  }

  fn create_video_queue_hybrid(&self, scenes: &[Scene]) -> anyhow::Result<Vec<Chunk>> {
    let input = self.args.input.as_video_path();
    let frame_rate = self.args.input.frame_rate().unwrap();

    let keyframes = crate::ffmpeg::get_keyframes(input).unwrap();

    let to_split: Vec<usize> = keyframes
      .iter()
      .filter(|kf| scenes.iter().any(|scene| scene.start_frame == **kf))
      .copied()
      .collect();

    debug!("Segmenting video");
    segment(input, &self.args.temp, &to_split[1..]);
    debug!("Segment done");

    let source_path = Path::new(&self.args.temp).join("split");
    let queue_files = Self::read_queue_files(&source_path)?;

    let kf_list = to_split
      .iter()
      .copied()
      .chain(iter::once(self.frames))
      .tuple_windows();

    let mut segments = Vec::with_capacity(scenes.len());
    for (file, (x, y)) in queue_files.iter().zip(kf_list) {
      for s in scenes {
        let s0 = s.start_frame;
        let s1 = s.end_frame;
        if s0 >= x && s1 <= y && s0 < s1 {
          segments.push((file.as_path(), (s0 - x, s1 - x, s)));
        }
      }
    }

    let chunk_queue: Vec<Chunk> = segments
      .iter()
      .enumerate()
      .map(|(index, &(file, (start, end, scene)))| {
        self
          .create_select_chunk(
            index,
            file,
            start,
            end,
            frame_rate,
            scene.zone_overrides.clone(),
          )
          .unwrap()
      })
      .collect();

    Ok(chunk_queue)
  }

  fn create_chunk_from_segment(
    &self,
    index: usize,
    file: &str,
    frame_rate: f64,
    overrides: Option<ZoneOptions>,
  ) -> anyhow::Result<Chunk> {
    let ffmpeg_gen_cmd: Vec<OsString> = into_vec![
      "ffmpeg",
      "-y",
      "-hide_banner",
      "-loglevel",
      "error",
      "-i",
      file.to_owned(),
      "-strict",
      "-1",
      "-pix_fmt",
      self
        .args
        .output_pix_format
        .format
        .descriptor()
        .unwrap()
        .name(),
      "-f",
      "yuv4mpegpipe",
      "-",
    ];

    let output_ext = self.args.encoder.output_extension();

    let num_frames = num_frames(Path::new(file))?;

    let mut chunk = Chunk {
      temp: self.args.temp.clone(),
      input: Input::Video(PathBuf::from(file)),
      source_cmd: ffmpeg_gen_cmd,
      output_ext: output_ext.to_owned(),
      index,
      start_frame: 0,
      end_frame: num_frames,
      frame_rate,
      video_params: overrides.as_ref().map_or_else(
        || self.args.video_params.clone(),
        |ovr| ovr.video_params.clone(),
      ),
      passes: self.args.passes,
      encoder: self.args.encoder,
      noise_size: self.args.photon_noise_size,
      tq_cq: None,
      ignore_frame_mismatch: self.args.ignore_frame_mismatch,
    };
    chunk.apply_photon_noise_args(
      overrides.map_or(self.args.photon_noise, |ovr| ovr.photon_noise),
      self.args.chroma_noise,
    )?;
    Ok(chunk)
  }

  /// Returns unfinished chunks and number of total chunks
  fn load_or_gen_chunk_queue(&mut self, splits: &[Scene]) -> anyhow::Result<(Vec<Chunk>, usize)> {
    if self.args.resume {
      let mut chunks = read_chunk_queue(self.args.temp.as_ref())?;
      let num_chunks = chunks.len();

      let done = get_done();

      // only keep the chunks that are not done
      chunks.retain(|chunk| !done.done.contains_key(&chunk.name()));

      Ok((chunks, num_chunks))
    } else {
      let chunks = self.create_encoding_queue(splits)?;
      let num_chunks = chunks.len();
      save_chunk_queue(&self.args.temp, &chunks)?;
      Ok((chunks, num_chunks))
    }
  }
}