viddo/atom-textual-velocity

View on GitHub
lib/epics/fileReadsEpic.js

Summary

Maintainability
B
4 hrs
Test Coverage
/* @flow */

import path from "path";
import { bindNodeCallback, empty, from, merge, of } from "rxjs";
import {
  catchError,
  debounceTime,
  filter,
  map,
  mergeMap,
  take
} from "rxjs/operators";
import { ofType } from "redux-observable";
import * as A from "../actions";
import fileReaders from "../FileReaders";
import takeUntilDispose from "../takeUntilDispose";
import { showWarningNotification } from "../showWarningNotification";

import type { Action } from "../actions";
import type { FileReadResult, RawFile } from "../../flow-types/File";
import type { IFileReader } from "../../flow-types/IFileReader";
import type { State } from "../../flow-types/State";

const CONCURRENT_FILE_PARSINGS = 3;
const CONCURRENT_FILE_READS = 3;

export default function fileReadsEpic(
  action$: rxjs$Observable<Action>,
  state$: reduxRxjs$StateObservable<State>
) {
  return merge(
    action$.pipe(
      ofType(A.READ_DIR_DONE),
      mergeMap(action => {
        // not really necessary other than to make flowtype happy (since already filtered by ofType)
        if (action.type !== A.READ_DIR_DONE) return empty();

        return from(action.rawFiles).pipe(
          mergeMap(rawFile => {
            const state = state$.value;

            let fileReadersArray = fileReaders.slice(0);
            const note = state.notes[rawFile.filename];
            if (
              note &&
              note.stats.mtime.getTime() === rawFile.stats.mtime.getTime()
            ) {
              fileReadersArray = fileReadersArray.filter(
                fileReader => note[fileReader.notePropName] === undefined
              );
              if (fileReadersArray.length === 0) return empty(); // e.g. a cached note that have all read values already
            }

            return readFilesAsObservable(fileReadersArray, rawFile, state.dir);
          }, CONCURRENT_FILE_PARSINGS)
        );
      })
    ),

    action$.pipe(
      filter(action => {
        switch (action.type) {
          case A.FILE_ADDED:
          case A.FILE_CHANGED:
            return true;
          default:
            return false;
        }
      }),
      mergeMap(action => {
        switch (action.type) {
          case A.FILE_ADDED:
          case A.FILE_CHANGED: {
            return readFilesAsObservable(
              fileReaders,
              action.rawFile,
              state$.value.dir
            );
          }

          default:
            return empty();
        }
      })
    ),

    action$.pipe(
      filter(action => {
        switch (action.type) {
          case A.READ_DIR_DONE:
          case A.FILE_READ:
            return true;
          default:
            return false;
        }
      }),
      debounceTime(200),
      map(() => {
        const state = state$.value;
        if (state.loading.status === "readingFiles") {
          const { readyCount, totalCount } = state.loading;
          if (readyCount === totalCount) {
            return A.readFilesDone();
          }
        }
      }),
      filter(Boolean),
      take(1)
    )
  ).pipe(filter(Boolean), takeUntilDispose(action$));
}

function readFilesAsObservable(
  fileReadersArray: IFileReader[],
  rawFile: RawFile,
  dir: string
) {
  return from(fileReadersArray).pipe(
    mergeMap(fileReader => {
      const result: FileReadResult = {
        filename: rawFile.filename,
        notePropName: fileReader.notePropName,
        value: null
      };

      const filePath = path.join(dir, rawFile.filename);

      const readAsObservable = bindNodeCallback(
        fileReader.read.bind(fileReader)
      );
      return readAsObservable(filePath, rawFile.stats).pipe(
        map(readFileValue => {
          result.value = readFileValue !== undefined ? readFileValue : null; // make sure value cannot be undefined
          return A.fileRead(result);
        }),
        catchError(error => {
          showWarningNotification(`Failed to read file ${filePath}`, error);
          return of(
            A.fileReadFailed({
              filename: rawFile.filename,
              notePropName: fileReader.notePropName
            })
          );
        })
      );
    }, CONCURRENT_FILE_READS)
  );
}