Vizzuality/landgriffon

View on GitHub
api/src/modules/scenario-interventions/scenario-intervention.repository.ts

Summary

Maintainability
D
2 days
Test Coverage
A
100%
import { DataSource, InsertResult, QueryRunner, Repository } from 'typeorm';
import { ScenarioIntervention } from 'modules/scenario-interventions/scenario-intervention.entity';
import { SourcingLocation } from 'modules/sourcing-locations/sourcing-location.entity';
import { SourcingRecord } from 'modules/sourcing-records/sourcing-record.entity';
import { IndicatorRecord } from 'modules/indicator-records/indicator-record.entity';
import {
  Injectable,
  Logger,
  ServiceUnavailableException,
} from '@nestjs/common';
import { v4 as uuidv4 } from 'uuid';

import {
  REPLACED_ADMIN_REGIONS_TABLE_NAME,
  REPLACED_BUSINESS_UNITS_TABLE_NAME,
  REPLACED_MATERIALS_TABLE_NAME,
  REPLACED_PRODUCERS_TABLE_NAME,
  REPLACED_T1SUPPLIERS_TABLE_NAME,
  ReplacedAdminRegion,
  ReplacedBusinessUnits,
  ReplacedMaterial,
  ReplacedProducer,
  ReplacedT1Supplier,
} from 'modules/scenario-interventions/intermediate-table-names/intermediate.table.names';
import { chunk } from 'lodash';
import * as config from 'config';
import { IMPACT_VIEW_NAME } from 'modules/impact/views/impact.materialized-view.entity';
import { ImpactService } from 'modules/impact/impact.service';

@Injectable()
export class ScenarioInterventionRepository extends Repository<ScenarioIntervention> {
  constructor(
    private dataSource: DataSource,
    private impactService: ImpactService,
  ) {
    super(ScenarioIntervention, dataSource.createEntityManager());
  }

  logger: Logger = new Logger(ScenarioInterventionRepository.name);

  async getScenarioInterventionsByScenarioId(
    scenarioId: string,
  ): Promise<ScenarioIntervention[]> {
    // TODO: Join with suppliers and selecting supplier field commented out due to performance issues
    //       This needs to be restored
    return (
      this.createQueryBuilder('intervention')
        .select([
          'intervention.id',
          'intervention.title',
          'intervention.description',
          'intervention.type',
          'intervention.startYear',
          'intervention.status',
          'intervention.endYear',
          'intervention.percentage',
          'replacedAdminRegions.id',
          'replacedAdminRegions.name',
          'newAdminRegion.id',
          'newAdminRegion.name',
          'replacedMaterials.id',
          'replacedMaterials.name',
          'newMaterial.id',
          'newMaterial.name',
          'replacedBusinessUnits.id',
          'replacedBusinessUnits.name',
          'newBusinessUnit.id',
          'newBusinessUnit.name',
          // 'replacedSuppliers.id',
          // 'replacedSuppliers.name',
        ])
        .leftJoin('intervention.replacedAdminRegions', 'replacedAdminRegions')
        .leftJoin('intervention.newAdminRegion', 'newAdminRegion')
        .leftJoin('intervention.replacedMaterials', 'replacedMaterials')
        .leftJoin('intervention.newMaterial', 'newMaterial')
        .leftJoin('intervention.replacedBusinessUnits', 'replacedBusinessUnits')
        .leftJoin('intervention.newBusinessUnit', 'newBusinessUnit')
        // .leftJoin('intervention.replacedSuppliers', 'replacedSuppliers')

        .where('intervention.scenarioId = :scenarioId', { scenarioId })
        .orderBy('intervention.createdAt', 'DESC')
        .getMany()
    );
  }

  // TODO: This is a workaround to bypass TypeORM's issues when bulk inserting. This could be possibly fixed
  //       By upgrading TypeORM version

  async saveNewIntervention(
    newIntervention: ScenarioIntervention,
  ): Promise<any> {
    const queryRunner: QueryRunner = this.dataSource.createQueryRunner();
    await queryRunner.connect();
    await queryRunner.startTransaction();
    const locations: SourcingLocation[] = [];
    const records: SourcingRecord[] = [];
    const impacts: IndicatorRecord[] = [];

    const dbConfig: any = config.get('db');
    const batchChunkSize: number = parseInt(`${dbConfig.batchChunkSize}`, 10);

    newIntervention.id = uuidv4();
    const {
      newSourcingLocations,
      replacedSourcingLocations,
      replacedAdminRegions,
      replacedMaterials,
      replacedBusinessUnits,
      replacedT1Suppliers,
      replacedProducers,
      ...remainingData
    } = newIntervention;
    const allLocations: SourcingLocation[] = replacedSourcingLocations.concat(
      newSourcingLocations ?? [],
    );
    for (const location of allLocations) {
      location.scenarioInterventionId = newIntervention.id;
      location.id = uuidv4();
      locations.push(location);
      for (const record of location.sourcingRecords) {
        record.sourcingLocationId = location.id;
        record.id = uuidv4();
        records.push(record);
        for (const impact of record.indicatorRecords) {
          impact.sourcingRecordId = record.id;
          impacts.push(impact);
        }
      }
    }

    const replacedT1SuppliersToSave: ReplacedT1Supplier[] = [];
    const replacedProducersToSave: ReplacedProducer[] = [];
    const replacedAdminRegionsToSave: ReplacedAdminRegion[] = [];
    const replacedBusinessUnitsToSave: ReplacedBusinessUnits[] = [];
    const replacedMaterialsToSave: ReplacedMaterial[] = [];

    if (replacedT1Suppliers?.length) {
      for (const supplier of replacedT1Suppliers) {
        replacedT1SuppliersToSave.push({
          t1SupplierId: supplier.id,
          scenarioInterventionId: newIntervention.id,
        });
      }
    }
    if (replacedProducers?.length) {
      for (const supplier of replacedProducers) {
        replacedProducersToSave.push({
          producerId: supplier.id,
          scenarioInterventionId: newIntervention.id,
        });
      }
    }
    if (replacedAdminRegions?.length) {
      for (const region of replacedAdminRegions) {
        replacedAdminRegionsToSave.push({
          adminRegionId: region.id,
          scenarioInterventionId: newIntervention.id,
        });
      }
    }
    if (replacedMaterials?.length) {
      for (const material of replacedMaterials) {
        replacedMaterialsToSave.push({
          materialId: material.id,
          scenarioInterventionId: newIntervention.id,
        });
      }
    }
    if (replacedBusinessUnits?.length) {
      for (const unit of replacedBusinessUnits) {
        replacedBusinessUnitsToSave.push({
          businessUnitId: unit.id,
          scenarioInterventionId: newIntervention.id,
        });
      }
    }

    try {
      this.logger.log(
        `Saving Scenario Intervention with Id: ${newIntervention.id}`,
      );
      const intervention: InsertResult = await queryRunner.manager.insert(
        ScenarioIntervention,
        remainingData,
      );
      this.logger.log(
        `Saving ${locations.length} Sourcing Locations for Intervention with Id: ${newIntervention.id}`,
      );
      const sourcingLocationInsertPromises: Array<Promise<InsertResult>> =
        this.createInsertPromises(
          locations,
          batchChunkSize,
          SourcingLocation,
          queryRunner,
        );

      this.logger.log(
        `Saving ${sourcingLocationInsertPromises.length} Sourcing Locations chunks`,
      );
      await Promise.all(sourcingLocationInsertPromises);

      this.logger.log(
        `Saving ${records.length} Sourcing Records for Intervention with Id: ${newIntervention.id}`,
      );
      const sourcingRecordInsertPromises: Array<Promise<InsertResult>> =
        this.createInsertPromises(
          records,
          batchChunkSize,
          SourcingRecord,
          queryRunner,
        );

      this.logger.log(
        `Saving ${sourcingRecordInsertPromises.length} Sourcing Records chunks`,
      );
      await Promise.all(sourcingRecordInsertPromises);
      this.logger.log(
        `Saving ${impacts.length} Indicator Records for Intervention with Id: ${newIntervention.id}`,
      );
      const indicatorRecordInsertPromises: Array<Promise<InsertResult>> =
        this.createInsertPromises(
          impacts,
          batchChunkSize,
          IndicatorRecord,
          queryRunner,
        );

      this.logger.log(
        `Saving ${indicatorRecordInsertPromises.length} Indicator Records chunks`,
      );
      await Promise.all(indicatorRecordInsertPromises);

      this.logger.log(`Replacing suppliers...`);
      await queryRunner.manager
        .createQueryBuilder()
        .insert()
        .into(REPLACED_T1SUPPLIERS_TABLE_NAME)
        .values(replacedT1SuppliersToSave)
        .execute();
      await queryRunner.manager
        .createQueryBuilder()
        .insert()
        .into(REPLACED_PRODUCERS_TABLE_NAME)
        .values(replacedProducersToSave)
        .execute();
      this.logger.log(`Replacing admin regions...`);
      await queryRunner.manager
        .createQueryBuilder()
        .insert()
        .into(REPLACED_ADMIN_REGIONS_TABLE_NAME)
        .values(replacedAdminRegionsToSave)
        .execute();
      this.logger.log(`Replacing materials...`);
      await queryRunner.manager
        .createQueryBuilder()
        .insert()
        .into(REPLACED_MATERIALS_TABLE_NAME)
        .values(replacedMaterialsToSave)
        .execute();
      this.logger.log(`Replacing business units...`);
      await queryRunner.manager
        .createQueryBuilder()
        .insert()
        .into(REPLACED_BUSINESS_UNITS_TABLE_NAME)
        .values(replacedBusinessUnitsToSave)
        .execute();
      this.logger.log('Committing transaction...');
      await queryRunner.commitTransaction();
      this.logger.warn(`REFRESHING ${IMPACT_VIEW_NAME} ON THE BACKGROUND...`);
      this.impactService.updateImpactView();
      this.logger.log('New Intervention Saving Finished');

      return intervention;
    } catch (err) {
      // rollback changes before throwing error
      await queryRunner.rollbackTransaction();
      this.logger.error('Intervention could not been saved: ' + err);
      throw new ServiceUnavailableException(
        'Intervention could not been saved: ' + err,
      );
    } finally {
      // release query runner which is manually created
      await queryRunner.release();
    }
  }

  private createInsertPromises(
    data: any,
    batchChunkSize: number,
    entity: any,
    queryRunner: QueryRunner,
  ): Array<Promise<InsertResult>> {
    const insertPromises: Array<Promise<InsertResult>> = [];

    for (const [index, dataChunk] of chunk(data, batchChunkSize).entries()) {
      this.logger.debug(
        `Inserting chunk #${index} (${dataChunk.length} items)...`,
      );

      insertPromises.push(
        queryRunner.manager
          .createQueryBuilder()
          .insert()
          .into(entity)
          .values(dataChunk)
          .execute(),
      );
    }

    return insertPromises;
  }
}