huridocs/uwazi

View on GitHub
app/api/csv/csv.ts

Summary

Maintainability
A
45 mins
Test Coverage
A
100%
import csvtojson from 'csvtojson';

import { Readable } from 'stream';
import importFile, { ImportFile } from './importFile';

type CSVRow = { [k: string]: string };

const DELIMITERS = [',', ';'];

const peekHeaders = async (readSource: Readable | string): Promise<string[]> => {
  const readStream =
    typeof readSource === 'string' ? await importFile(readSource).readStream() : readSource;
  let headers: string[] = [];
  const stream = csvtojson().fromStream(readStream);
  await stream.on('header', async h => {
    headers = h;
    await stream.end();
  });

  return headers;
};

class ValidateFormatError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'ValidateFormatError';
  }
}

type ValidateHeaderOptions = {
  column_number?: number;
  required_headers?: string[];
};

type ValidateCSVBodyOptions = {
  no_empty_values?: boolean;
};

type ValidateFormatOptions = ValidateHeaderOptions & ValidateCSVBodyOptions;

const csv = (readStream: Readable, stopOnError = false) => ({
  reading: false,
  onRowCallback: async (_row: CSVRow, _index: number) => {},
  onErrorCallback: async (_error: Error, _row: CSVRow, _index: number) => {},

  onRow(onRowCallback: (_row: CSVRow, _index: number) => Promise<void>) {
    this.onRowCallback = onRowCallback;
    return this;
  },

  onError(onErrorCallback: (_error: Error, _row: CSVRow, _index: number) => Promise<void>) {
    this.onErrorCallback = onErrorCallback;
    return this;
  },

  async read() {
    this.reading = true;
    return csvtojson({ delimiter: DELIMITERS, flatKeys: true })
      .fromStream(readStream)
      .subscribe(async (row: CSVRow, index) => {
        if (!this.reading) {
          return;
        }
        try {
          await this.onRowCallback(row, index);
        } catch (e) {
          await this.onErrorCallback(e, row, index);
          if (stopOnError) {
            this.reading = false;
            readStream.unpipe();
            readStream.destroy();
          }
        }
      });
  },
});

const validateHeader = async (file: ImportFile, options: ValidateHeaderOptions) => {
  const headerOptions = options.required_headers || options.column_number;
  if (!headerOptions) return;

  const header = await peekHeaders(await file.readStream());

  if (options.column_number) {
    if (header.length !== options.column_number) {
      throw new ValidateFormatError(
        `Expected ${options.column_number} columns, but found ${header.length}.`
      );
    }
  }

  if (options.required_headers) {
    const headerSet = new Set(header);
    const missingHeaders = options.required_headers.filter(name => !headerSet.has(name));
    if (missingHeaders.length) {
      throw new ValidateFormatError(`Missing required headers: ${missingHeaders.join(', ')}.`);
    }
  }
};

const validateCSVBody = async (file: ImportFile, options: ValidateCSVBodyOptions) => {
  const bodyOptions = options.no_empty_values;
  if (!bodyOptions) return;

  const readStream = await file.readStream();
  const csvObj = csv(readStream, true);

  csvObj.onRow(async (row: CSVRow, index: number) => {
    if (options.no_empty_values) {
      Object.entries(row).forEach(([header, value]) => {
        if (!value) {
          throw new ValidateFormatError(`Empty value at row ${index + 1}, column "${header}".`);
        }
      });
    }
  });

  csvObj.onError(async (e: Error) => {
    throw e;
  });

  await csvObj.read();
};

const validateFormat = async (filePath: string, options: ValidateFormatOptions) => {
  const file = importFile(filePath);
  await validateHeader(file, options);
  await validateCSVBody(file, options);
};

export default csv;
export type { CSVRow, ValidateFormatOptions };
export { peekHeaders, validateFormat, ValidateFormatError };