dataplug-io/dataplug-cli

View on GitHub
lib/commands/flatten.js

Summary

Maintainability
B
4 hrs
Test Coverage
const _ = require('lodash')
const chalk = require('chalk')
const Promise = require('bluebird')
const logger = require('winston')
const { StreamFlatter } = require('@dataplug/dataplug-flatters')
const { JsonStreamReader, JsonStreamWriter } = require('@dataplug/dataplug-json')
const Progress = require('../progress')

let declaration = {
  command: 'flatten',
  description: 'Streams data from stdin to stdout, flattening it in-between'
}
declaration.builder = (yargs) => yargs
  .option('metadata', {
    alias: 'm',
    describe: 'Include metadata in output stream',
    type: 'boolean',
    default: true
  })
  .option('name', {
    alias: 'n',
    describe: 'Name of collection to use instead of default',
    type: 'string'
  })
  .option('indent', {
    alias: 'i',
    describe: 'Prettify output JSON using given amount of spaces',
    type: 'integer'
  })
  .coerce('indent', value => {
    value = Number.parseInt(value)
    return _.isNaN(value) ? undefined : value
  })
  .option('progress', {
    alias: 'p',
    describe: 'Show progress in console'
  })
  .option('abort', {
    alias: 'a',
    describe: 'Abort on any error',
    type: 'boolean',
    default: false
  })
declaration.prerequisites = (collection) => {
  return collection.schema
}
declaration.handler = (argv, collection) => {
  const progress = !argv.progress ? null : new Progress({
    flattened: (value) => chalk.yellow('?') + ` flattened: ${value}`
  })
  if (progress) {
    progress.flattened = 0
    progress.start()
  }

  const reader = new JsonStreamReader()
  const flatter = new StreamFlatter(collection.schema, argv.name || collection.name, argv.metadata, undefined, argv.abort)
  const writer = new JsonStreamWriter(undefined, argv.indent ? _.repeat(' ', argv.indent) : null, argv.abort)

  new Promise((resolve, reject) => {
    process.stdin
      .on('error', (error) => {
        logger.log('error', 'Error while reading data from stdin:', error)
        reject(error)
      })
      .pipe(reader)
      .on('error', (error) => {
        logger.log('error', 'Error while deserializing data as JSON:', error)
        reject(error)
      })
      .pipe(flatter)
      .on('data', () => {
        if (progress) {
          progress.flattened += 1
        }
      })
      .on('error', (error) => {
        logger.log('error', 'Error while flattening data:', error)
        reject(error)
      })
      .pipe(writer)
      .on('error', (error) => {
        logger.log('error', 'Error while serializing data as JSON:', error)
        reject(error)
      })
      .pipe(process.stdout)
      .on('error', (error) => {
        logger.log('error', 'Error while writing data to stdout:', error)
        reject(error)
      })
      .on('unpipe', () => {
        resolve()
      })
  })
  .then(() => {
    if (progress) {
      progress.cancel()
    }
  })
  .catch((error) => {
    if (progress) {
      progress.cancel()
    }
    logger.log('error', chalk.red('!'), 'Aborted due to:', error)
    process.exit(70)
  })
}

module.exports = declaration