teableio/teable

View on GitHub
apps/nestjs-backend/src/features/import/open-api/import-open-api.service.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import { Worker } from 'worker_threads';
import { Injectable, Logger, BadRequestException } from '@nestjs/common';
import type { IFieldRo } from '@teable/core';
import {
  FieldType,
  FieldKeyType,
  getRandomString,
  getActionTriggerChannel,
  getTableImportChannel,
} from '@teable/core';
import { PrismaService } from '@teable/db-main-prisma';
import type {
  IAnalyzeRo,
  IImportOptionRo,
  IInplaceImportOptionRo,
  IImportColumn,
} from '@teable/openapi';
import { toString } from 'lodash';
import { ClsService } from 'nestjs-cls';
import type { CreateOp } from 'sharedb';
import type { LocalPresence } from 'sharedb/lib/client';

import { ShareDbService } from '../../../share-db/share-db.service';
import type { IClsStore } from '../../../types/cls';
import { NotificationService } from '../../notification/notification.service';
import { RecordOpenApiService } from '../../record/open-api/record-open-api.service';
import { DEFAULT_VIEWS, DEFAULT_FIELDS } from '../../table/constant';
import { TableOpenApiService } from '../../table/open-api/table-open-api.service';
import { importerFactory, getWorkerPath } from './import.class';
import type { CsvImporter, ExcelImporter } from './import.class';

@Injectable()
export class ImportOpenApiService {
  private logger = new Logger(ImportOpenApiService.name);
  constructor(
    private readonly tableOpenApiService: TableOpenApiService,
    private readonly cls: ClsService<IClsStore>,
    private readonly prismaService: PrismaService,
    private readonly recordOpenApiService: RecordOpenApiService,
    private readonly notificationService: NotificationService,
    private readonly shareDbService: ShareDbService
  ) {}

  async analyze(analyzeRo: IAnalyzeRo) {
    const { attachmentUrl, fileType } = analyzeRo;

    const importer = importerFactory(fileType, {
      url: attachmentUrl,
      type: fileType,
    });

    return await importer.genColumns();
  }

  async createTableFromImport(baseId: string, importRo: IImportOptionRo, maxRowCount?: number) {
    const userId = this.cls.get('user.id');
    const { attachmentUrl, fileType, worksheets, notification = false, tz } = importRo;

    const importer = importerFactory(fileType, {
      url: attachmentUrl,
      type: fileType,
      maxRowCount,
    });

    // only record base table info, not include records
    const tableResult = [];

    for (const [sheetKey, value] of Object.entries(worksheets)) {
      const { importData, useFirstRowAsHeader, columns, name } = value;

      const columnInfo = columns.length ? columns : [...DEFAULT_FIELDS];
      const fieldsRo = columnInfo.map((col, index) => {
        const result: IFieldRo & {
          isPrimary?: boolean;
        } = {
          ...col,
        };

        if (index === 0) {
          result.isPrimary = true;
        }

        // Date Field should have default tz
        if (col.type === FieldType.Date) {
          result.options = {
            formatting: {
              timeZone: tz,
              date: 'YYYY-MM-DD',
              time: 'None',
            },
          };
        }

        return result;
      });

      // create table with column
      const table = await this.tableOpenApiService.createTable(baseId, {
        name: name,
        fields: fieldsRo,
        views: DEFAULT_VIEWS,
        records: [],
      });

      tableResult.push(table);

      const { fields } = table;

      // if columns is empty, then skip import data
      importData &&
        columns.length &&
        this.importRecords(
          baseId,
          table,
          userId,
          importer,
          { skipFirstNLines: useFirstRowAsHeader ? 1 : 0, sheetKey, notification },
          {
            columnInfo: columns,
            fields: fields.map((f) => ({ id: f.id, type: f.type })),
          }
        );
    }
    return tableResult;
  }

  async inplaceImportTable(
    baseId: string,
    tableId: string,
    inplaceImportRo: IInplaceImportOptionRo,
    maxRowCount?: number
  ) {
    const userId = this.cls.get('user.id');
    const { attachmentUrl, fileType, insertConfig, notification = false } = inplaceImportRo;

    const { sourceColumnMap, sourceWorkSheetKey, excludeFirstRow } = insertConfig;

    const tableRaw = await this.prismaService.tableMeta
      .findUnique({
        where: { id: tableId, deletedTime: null },
        select: { name: true },
      })
      .catch(() => {
        throw new BadRequestException('table is not found');
      });

    const fieldRaws = await this.prismaService.field.findMany({
      where: { tableId, deletedTime: null, hasError: null },
      select: {
        id: true,
        type: true,
      },
    });

    if (!tableRaw || !fieldRaws) {
      return;
    }

    const importer = importerFactory(fileType, {
      url: attachmentUrl,
      type: fileType,
      maxRowCount,
    });

    this.importRecords(
      baseId,
      { id: tableId, name: tableRaw.name },
      userId,
      importer,
      { skipFirstNLines: excludeFirstRow ? 1 : 0, sheetKey: sourceWorkSheetKey, notification },
      {
        sourceColumnMap,
        fields: fieldRaws as { id: string; type: FieldType }[],
      }
    );
  }

  private importRecords(
    baseId: string,
    table: { id: string; name: string },
    userId: string,
    importer: CsvImporter | ExcelImporter,
    options: { skipFirstNLines: number; sheetKey: string; notification: boolean },
    recordsCal: {
      columnInfo?: IImportColumn[];
      fields: { id: string; type: FieldType }[];
      sourceColumnMap?: Record<string, number | null>;
    }
  ) {
    const { sheetKey, notification } = options;
    const { columnInfo, fields, sourceColumnMap } = recordsCal;
    const localPresence = this.createImportPresence(table.id);
    this.setImportStatus(localPresence, true);

    const workerId = `worker_${getRandomString(8)}`;
    const path = getWorkerPath('parse');

    const worker = new Worker(path, {
      workerData: {
        config: importer.getConfig(),
        options: {
          key: options.sheetKey,
          notification: options.notification,
          skipFirstNLines: options.skipFirstNLines,
        },
        id: workerId,
      },
    });
    // record count for error notification
    let recordCount = 1;
    worker.on('message', async (result) => {
      const { type, data, chunkId, id } = result;
      switch (type) {
        case 'chunk': {
          this.setImportStatus(localPresence, true);
          const currentResult = (data as Record<string, unknown[][]>)[sheetKey];
          // fill data
          const records = currentResult.map((row) => {
            const res: { fields: Record<string, unknown> } = {
              fields: {},
            };
            // import new table
            if (columnInfo) {
              columnInfo.forEach((col, index) => {
                const { sourceColumnIndex } = col;
                // empty row will be return void row value
                const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
                res.fields[fields[index].id] = value?.toString();
              });
            }
            // inplace records
            if (sourceColumnMap) {
              for (const [key, value] of Object.entries(sourceColumnMap)) {
                if (value !== null) {
                  const { type } = fields.find((f) => f.id === key) || {};
                  // link value should be string
                  res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
                }
              }
            }
            return res;
          });
          recordCount += records.length;
          if (records.length === 0) {
            return;
          }
          try {
            const createFn = columnInfo
              ? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
              : this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
            workerId === id &&
              (await createFn(table.id, {
                fieldKeyType: FieldKeyType.Id,
                typecast: true,
                records,
              }));
            worker.postMessage({ type: 'done', chunkId });
            this.updateRowCount(table.id);
          } catch (e) {
            const error = e as Error;
            this.logger.error(error?.message, error?.stack);
            notification &&
              this.notificationService.sendImportResultNotify({
                baseId,
                tableId: table.id,
                toUserId: userId,
                message: `❌ ${table.name} import aborted: ${error.message} fail row range: [${recordCount - records.length}, ${recordCount - 1}]. Please check the data for this range and retry.
                `,
              });
            worker.terminate();
            throw e;
          }
          break;
        }
        case 'finished':
          workerId === id &&
            notification &&
            this.notificationService.sendImportResultNotify({
              baseId,
              tableId: table.id,
              toUserId: userId,
              message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
            });
          worker.terminate();
          break;
        case 'error':
          workerId === id &&
            notification &&
            this.notificationService.sendImportResultNotify({
              baseId,
              tableId: table.id,
              toUserId: userId,
              message: `❌ ${table.name} import failed: ${data}`,
            });
          worker.terminate();
          break;
      }
    });
    worker.on('error', (e) => {
      notification &&
        this.notificationService.sendImportResultNotify({
          baseId,
          tableId: table.id,
          toUserId: userId,
          message: `❌ ${table.name} import failed: ${e.message}`,
        });
      worker.terminate();
    });
    worker.on('exit', (code) => {
      this.logger.log(`Worker stopped with exit code ${code}`);
      this.setImportStatus(localPresence, false);
    });
  }

  private updateRowCount(tableId: string) {
    const channel = getActionTriggerChannel(tableId);
    const presence = this.shareDbService.connect().getPresence(channel);
    const localPresence = presence.create(tableId);
    localPresence.submit(['addRecord'], (error) => {
      error && this.logger.error(error);
    });

    const updateEmptyOps = {
      src: 'unknown',
      seq: 1,
      m: {
        ts: Date.now(),
      },
      create: {
        type: 'json0',
        data: undefined,
      },
      v: 0,
    } as CreateOp;
    this.shareDbService.publishRecordChannel(tableId, updateEmptyOps);
  }

  private setImportStatus(
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    presence: LocalPresence<any>,
    loading: boolean
  ) {
    presence.submit(
      {
        loading,
      },
      (error) => {
        error && this.logger.error(error);
      }
    );
  }

  private createImportPresence(tableId: string) {
    const channel = getTableImportChannel(tableId);
    const presence = this.shareDbService.connect().getPresence(channel);
    return presence.create(channel);
  }
}