dataplug-io/sql-dataplug

View on GitHub
lib/commands/serialize.js

Summary

Maintainability
C
1 day
Test Coverage
const _ = require('lodash')
const chalk = require('chalk')
const Promise = require('bluebird')
const moment = require('moment')
const logger = require('winston')
const { FlattenedTransformStream } = require('@dataplug/dataplug-flatters')
const { SqlSerializerStream, SqlCommandsWriter } = require('@dataplug/dataplug-sql')
const { JsonStreamReader } = require('@dataplug/dataplug-json')
const { Progress } = require('@dataplug/dataplug-cli')

let declaration = {
  command: 'serialize',
  description: 'Transforms input stream to SQL-insert commands'
}
declaration.builder = (yargs) => yargs
  .option('progress', {
    alias: 'p',
    describe: 'Show progress in console'
  })
  .option('dialect', {
    alias: 'l',
    describe: 'SQL dialect to use',
    enum: [
      'postgres', 'postgresql', 'pg'
    ],
    required: true
  })
  .option('truncate', {
    describe: 'Truncate entity table prior to first insert',
    type: 'boolean',
    default: false
  })
  .option('missing', {
    alias: 'm',
    describe: 'Column name to use as "missing" marker',
    type: 'string'
  })
  .option('timestamp', {
    alias: 't',
    describe: 'Column name to use as "timestamp" marker',
    type: 'string'
  })
  .option('delele', {
    alias: 'd',
    describe: 'Delete entries by identity prior to inserting',
    type: 'boolean',
    default: false
  })
  .option('on-conflict', {
    describe: 'Behavior on conflict',
    enum: [
      'update',
      'skip',
      'fail'
    ],
    default: 'fail'
  })
  .option('chunks-delimiter', {
    description: 'String used as a chunks delimiter',
    type: 'string',
    default: '\\n'
  })
  .option('queries-delimiter', {
    description: 'String used as a queries delimiter',
    type: 'string',
    default: '\\n'
  })
  .coerce(['chunks-delimiter', 'queries-delimiter'], value => JSON.parse(JSON.stringify(value).replace(/\\\\/g, '\\')))
  .option('abort', {
    alias: 'a',
    describe: 'Abort on any error',
    type: 'boolean',
    default: false
  })
declaration.handler = (argv, collection) => {
  const progress = !argv.progress ? null : new Progress({
    sqlized: (value) => chalk.yellow('?') + ` sqlized: ${value}`
  })
  if (progress) {
    progress.start()
  }

  let prologueModifier = null
  if (argv.missing) {
    prologueModifier = _.assign({}, prologueModifier, _.set({}, argv.missing, true))
  }

  let dataModifier = null
  let metadataModifier = null
  if (argv.missing) {
    dataModifier = _.assign({}, dataModifier, _.set({}, argv.missing, false))
  }
  if (argv.timestamp) {
    dataModifier = _.assign({}, dataModifier, _.set({}, argv.timestamp, moment.utc().format()))
  }
  if (dataModifier) {
    dataModifier = (data) => {
      return dataModifier
        ? _.merge({}, data, dataModifier)
        : data
    }

    metadataModifier = (entity) => {
      const metadata = {
        fields: {}
      }
      if (argv.missing) {
        metadata.fields[argv.missing] = {
          type: 'boolean'
        }
      }
      if (argv.timestamp) {
        metadata.fields[argv.timestamp] = {
          type: 'timestamp'
        }
      }
      return metadata
    }
  }

  const reader = new JsonStreamReader()
  const transform = new FlattenedTransformStream(dataModifier, metadataModifier, argv.abort)
  const writer = new SqlSerializerStream(argv.dialect, collection.name, undefined, {
    prologue: argv.truncate
      ? 'truncate'
      : (prologueModifier !== null ? prologueModifier : undefined),
    preprocessor: argv.delete ? 'delete-by-identity' : undefined,
    postprocessor: (argv.onConflict !== 'fail')
      ? `${argv.onConflict}-on-conflict`
      : undefined,
    abortOnError: argv.abort
  })
  const emitter = new SqlCommandsWriter({
    queriesDelimiter: argv.queriesDelimiter,
    chunksDelimiter: argv.chunksDelimiter,
    abortOnError: argv.abort
  })

  if (progress) {
    progress.sqlized = 0
  }

  new Promise((resolve, reject) => {
    process.stdin
      .on('error', (error) => {
        logger.log('error', 'Error while reading data from stdin:', error)
        reject(error)
      })
      .pipe(reader)
      .on('error', (error) => {
        logger.log('error', 'Error while deserializing data as JSON:', error)
        reject(error)
      })
      .pipe(transform)
      .on('error', (error) => {
        logger.log('error', 'Error while transform data:', error)
        reject(error)
      })
      .on('data', () => {
        if (progress) {
          progress.sqlized += 1
        }
      })
      .pipe(writer)
      .on('error', (error) => {
        logger.log('error', 'Error while serialize data as SQL:', error)
        reject(error)
      })
      .pipe(emitter)
      .on('error', (error) => {
        logger.log('error', 'Error while emitting SQL statements:', error)
        reject(error)
      })
      .pipe(process.stdout)
      .on('error', (error) => {
        logger.log('error', 'Error while writing data to stdout:', error)
        reject(error)
      })
      .on('unpipe', () => {
        resolve()
      })
  })
  .then(() => {
    if (progress) {
      progress.cancel()
    }
  })
  .catch((error) => {
    if (progress) {
      progress.cancel()
    }
    logger.log('error', chalk.red('!'), 'Aborted due to:', error)
    process.exit(70)
  })
}

module.exports = declaration