teableio/teable

View on GitHub
apps/nestjs-backend/src/features/import/open-api/import.class.ts

Summary

Maintainability
C
1 day
Test Coverage
import { existsSync } from 'fs';
import { join } from 'path';
import { BadRequestException } from '@nestjs/common';
import { getUniqName, FieldType } from '@teable/core';
import type { IValidateTypes, IAnalyzeVo } from '@teable/openapi';
import { SUPPORTEDTYPE, importTypeMap } from '@teable/openapi';
import { zip, toString, intersection, chunk as chunkArray } from 'lodash';
import fetch from 'node-fetch';
import sizeof from 'object-sizeof';
import Papa from 'papaparse';
import * as XLSX from 'xlsx';
import type { ZodType } from 'zod';
import z from 'zod';
import { exceptionParse } from '../../../utils/exception-parse';
import { toLineDelimitedStream } from './delimiter-stream';

const validateZodSchemaMap: Record<IValidateTypes, ZodType> = {
  [FieldType.Checkbox]: z.union([z.string(), z.boolean()]).refine((value: unknown) => {
    if (typeof value === 'boolean') {
      return true;
    }
    if (
      typeof value === 'string' &&
      (value.toLowerCase() === 'false' || value.toLowerCase() === 'true')
    ) {
      return true;
    }
    return false;
  }),
  [FieldType.Date]: z.coerce.date(),
  [FieldType.Number]: z.coerce.number(),
  [FieldType.LongText]: z
    .string()
    .refine((value) => z.string().safeParse(value) && /\n/.test(value)),
  [FieldType.SingleLineText]: z.string(),
};

export interface IImportConstructorParams {
  url: string;
  type: SUPPORTEDTYPE;
  maxRowCount?: number;
  fileName?: string;
}

interface IParseResult {
  [x: string]: unknown[][];
}

export abstract class Importer {
  public static DEFAULT_ERROR_MESSAGE = 'unknown error';

  public static CHUNK_SIZE = 1024 * 1024 * 0.2;

  public static MAX_CHUNK_LENGTH = 500;

  public static DEFAULT_COLUMN_TYPE: IValidateTypes = FieldType.SingleLineText;

  // order make sence
  public static readonly SUPPORTEDTYPE: IValidateTypes[] = [
    FieldType.Checkbox,
    FieldType.Number,
    FieldType.Date,
    FieldType.LongText,
    FieldType.SingleLineText,
  ];

  constructor(public config: IImportConstructorParams) {}

  abstract parse(
    ...args: [
      options?: unknown,
      chunk?: (
        chunk: Record<string, unknown[][]>,
        onFinished?: () => void,
        onError?: (errorMsg: string) => void
      ) => Promise<void>,
    ]
  ): Promise<IParseResult>;

  private setFileNameFromHeader(fileName: string) {
    this.config.fileName = fileName;
  }

  getConfig() {
    return this.config;
  }

  async getFile() {
    const { url, type } = this.config;
    const { body: stream, headers } = await fetch(url);

    const supportType = importTypeMap[type].accept.split(',');

    const fileFormat = headers
      .get('content-type')
      ?.split(';')
      ?.map((item: string) => item.trim());

    // if (!fileFormat?.length) {
    //   throw new BadRequestException(
    //     `Input url is not a standard document service without right content-type`
    //   );
    // }

    if (fileFormat?.length && !intersection(fileFormat, supportType).length) {
      throw new BadRequestException(
        `File format is not supported, only ${supportType.join(',')} are supported,`
      );
    }

    const contentDisposition = headers.get('content-disposition');
    let fileName = 'Import Table.csv';

    if (contentDisposition) {
      const fileNameMatch =
        contentDisposition.match(/filename\*=UTF-8''([^;]+)/) ||
        contentDisposition.match(/filename="?([^"]+)"?/);
      if (fileNameMatch) {
        fileName = fileNameMatch[1];
      }
    }

    const finalFileName = fileName.split('.').shift() as string;

    this.setFileNameFromHeader(decodeURIComponent(finalFileName));

    return { stream, fileName: finalFileName };
  }

  async genColumns() {
    const supportTypes = Importer.SUPPORTEDTYPE;
    const parseResult = await this.parse();
    const { fileName, type } = this.config;
    const result: IAnalyzeVo['worksheets'] = {};

    for (const [sheetName, cols] of Object.entries(parseResult)) {
      const zipColumnInfo = zip(...cols);
      const existNames: string[] = [];
      const calculatedColumnHeaders = zipColumnInfo.map((column, index) => {
        let isColumnEmpty = true;
        let validatingFieldTypes = [...supportTypes];
        for (let i = 0; i < column.length; i++) {
          if (validatingFieldTypes.length <= 1) {
            break;
          }

          // ignore empty value and first row causing first row as header
          if (column[i] === '' || column[i] == null || i === 0) {
            continue;
          }

          // when the whole columns aren't empty should flag
          isColumnEmpty = false;

          // when one of column's value validates long text, then break;
          if (validateZodSchemaMap[FieldType.LongText].safeParse(column[i]).success) {
            validatingFieldTypes = [FieldType.LongText];
            break;
          }

          const matchTypes = validatingFieldTypes.filter((type) => {
            const schema = validateZodSchemaMap[type];
            return schema.safeParse(column[i]).success;
          });

          validatingFieldTypes = matchTypes;
        }

        // empty columns should be default type
        validatingFieldTypes = !isColumnEmpty
          ? validatingFieldTypes
          : [Importer.DEFAULT_COLUMN_TYPE];

        const name = getUniqName(toString(column?.[0]).trim() || `Field ${index}`, existNames);

        existNames.push(name);

        return {
          type: validatingFieldTypes[0] || Importer.DEFAULT_COLUMN_TYPE,
          name: name.toString(),
        };
      });

      result[sheetName] = {
        name: type === SUPPORTEDTYPE.EXCEL ? sheetName : fileName ? fileName : sheetName,
        columns: calculatedColumnHeaders,
      };
    }

    return {
      worksheets: result,
    };
  }
}

export class CsvImporter extends Importer {
  public static readonly CHECK_LINES = 5000;
  public static readonly DEFAULT_SHEETKEY = 'Import Table';

  parse(): Promise<IParseResult>;
  parse(
    options: Papa.ParseConfig & { skipFirstNLines: number; key: string },
    chunk: (chunk: Record<string, unknown[][]>) => Promise<void>,
    onFinished?: () => void,
    onError?: (errorMsg: string) => void
  ): Promise<void>;
  async parse(
    ...args: [
      options?: Papa.ParseConfig & { skipFirstNLines: number; key: string },
      chunkCb?: (chunk: Record<string, unknown[][]>) => Promise<void>,
      onFinished?: () => void,
      onError?: (errorMsg: string) => void,
    ]
  ): Promise<unknown> {
    const [options, chunkCb, onFinished, onError] = args;
    const { stream } = await this.getFile();

    // chunk parse
    if (options && chunkCb) {
      return new Promise((resolve, reject) => {
        let isFirst = true;
        let recordBuffer: unknown[][] = [];
        let isAbort = false;
        let totalRowCount = 0;

        // TODO optimize toLineDelimitedStream slow down the import speed.
        Papa.parse(toLineDelimitedStream(stream), {
          download: false,
          dynamicTyping: true,
          chunk: (chunk, parser) => {
            (async () => {
              const newChunk = [...chunk.data] as unknown[][];
              if (isFirst && options.skipFirstNLines) {
                newChunk.splice(0, 1);
                isFirst = false;
              }

              recordBuffer.push(...newChunk);
              totalRowCount += newChunk.length;

              if (this.config.maxRowCount && totalRowCount > this.config.maxRowCount) {
                isAbort = true;
                recordBuffer = [];
                onError?.('please upgrade your plan to import more records');
                parser.abort();
              }

              if (
                recordBuffer.length >= Importer.MAX_CHUNK_LENGTH ||
                sizeof(recordBuffer) > Importer.CHUNK_SIZE
              ) {
                parser.pause();
                try {
                  await chunkCb({ [CsvImporter.DEFAULT_SHEETKEY]: recordBuffer });
                } catch (e) {
                  isAbort = true;
                  recordBuffer = [];
                  const error = exceptionParse(e as Error);
                  onError?.(error?.message || Importer.DEFAULT_ERROR_MESSAGE);
                  parser.abort();
                }
                recordBuffer = [];
                parser.resume();
              }
            })();
          },
          complete: () => {
            (async () => {
              try {
                recordBuffer.length &&
                  (await chunkCb({ [CsvImporter.DEFAULT_SHEETKEY]: recordBuffer }));
              } catch (e) {
                isAbort = true;
                recordBuffer = [];
                const error = exceptionParse(e as Error);
                onError?.(error?.message || Importer.DEFAULT_ERROR_MESSAGE);
              }
              !isAbort && onFinished?.();
              resolve({});
            })();
          },
          error: (e) => {
            onError?.(e?.message || Importer.DEFAULT_ERROR_MESSAGE);
            reject(e);
          },
        });
      });
    } else {
      return new Promise((resolve, reject) => {
        Papa.parse(stream, {
          download: false,
          dynamicTyping: true,
          preview: CsvImporter.CHECK_LINES,
          complete: (result) => {
            resolve({
              [CsvImporter.DEFAULT_SHEETKEY]: result.data,
            });
          },
          error: (err) => {
            reject(err);
          },
        });
      });
    }
  }
}

export class ExcelImporter extends Importer {
  public static readonly SUPPORTEDTYPE: IValidateTypes[] = [
    FieldType.Checkbox,
    FieldType.Number,
    FieldType.Date,
    FieldType.SingleLineText,
    FieldType.LongText,
  ];

  parse(): Promise<IParseResult>;
  parse(
    options: { skipFirstNLines: number; key: string },
    chunk: (chunk: Record<string, unknown[][]>) => Promise<void>,
    onFinished?: () => void,
    onError?: (errorMsg: string) => void
  ): Promise<void>;

  async parse(
    options?: { skipFirstNLines: number; key: string },
    chunk?: (chunk: Record<string, unknown[][]>) => Promise<void>,
    onFinished?: () => void,
    onError?: (errorMsg: string) => void
  ): Promise<unknown> {
    const { stream: fileSteam } = await this.getFile();

    const asyncRs = async (stream: NodeJS.ReadableStream): Promise<IParseResult> =>
      new Promise((res, rej) => {
        const buffers: Buffer[] = [];
        stream.on('data', function (data) {
          buffers.push(data);
        });
        stream.on('end', function () {
          const buf = Buffer.concat(buffers);
          const workbook = XLSX.read(buf, { dense: true });
          const result: IParseResult = {};
          Object.keys(workbook.Sheets).forEach((name) => {
            result[name] = workbook.Sheets[name]['!data']?.map((item) =>
              item.map((v) => v.w)
            ) as unknown[][];
          });
          res(result);
        });
        stream.on('error', (e) => {
          onError?.(e?.message || Importer.DEFAULT_ERROR_MESSAGE);
          rej(e);
        });
      });

    const parseResult = await asyncRs(fileSteam);

    if (options && chunk) {
      const { skipFirstNLines, key } = options;
      const chunks = parseResult[key];
      const parseResults = chunkArray(chunks, Importer.MAX_CHUNK_LENGTH);

      if (this.config.maxRowCount && chunks.length > this.config.maxRowCount) {
        onError?.('Please upgrade your plan to import more records');
        return;
      }

      for (let i = 0; i < parseResults.length; i++) {
        const currentChunk = parseResults[i];
        if (i === 0 && skipFirstNLines) {
          currentChunk.splice(0, 1);
        }
        try {
          await chunk({ [key]: currentChunk });
        } catch (e) {
          onError?.((e as Error)?.message || Importer.DEFAULT_ERROR_MESSAGE);
        }
      }
      onFinished?.();
    }

    return parseResult;
  }
}

export const importerFactory = (type: SUPPORTEDTYPE, config: IImportConstructorParams) => {
  switch (type) {
    case SUPPORTEDTYPE.CSV:
      return new CsvImporter(config);
    case SUPPORTEDTYPE.EXCEL:
      return new ExcelImporter(config);
    default:
      throw new Error('not support');
  }
};

export const getWorkerPath = (fileName: string) => {
  // there are two possible paths for worker
  const workerPath = join(__dirname, 'worker', `${fileName}.js`);
  const workerPath2 = join(process.cwd(), 'dist', 'worker', `${fileName}.js`);

  if (existsSync(workerPath)) {
    return workerPath;
  } else {
    return workerPath2;
  }
};