dataplug-io/dataplug-cli

View on GitHub
lib/commands/source.js

Summary

Maintainability
C
1 day
Test Coverage
const _ = require('lodash')
const chalk = require('chalk')
const logger = require('winston')
const { JsonStreamWriter } = require('@dataplug/dataplug-json')
const Progress = require('../progress')
const configDeclarationToYargs = require('../configDeclarationToYargs')

let declaration = {
  command: 'source',
  description: 'Streams collection data to stdout'
}
declaration.builder = (yargs, collection) => {
  yargs = yargs
    .option('indent', {
      alias: 'i',
      describe: 'Prettify output JSON using given amount of spaces',
      type: 'integer'
    })
    .coerce('indent', value => {
      value = Number.parseInt(value)
      return _.isNaN(value) ? undefined : value
    })
    .option('progress', {
      alias: 'p',
      describe: 'Show progress in console'
    })
    .option('abort', {
      alias: 'a',
      describe: 'Abort on any error',
      type: 'boolean',
      default: false
    })

  if (collection && collection.source) {
    yargs = configDeclarationToYargs(yargs, collection.source.configDeclaration)
  }

  return yargs
}
declaration.prerequisites = (collection) => {
  return collection.source
}
declaration.handler = (argv, collection) => {
  const progress = !argv.progress ? null : new Progress({
    supplied: (value) => chalk.green('↑') + ` supplied: ${value}`
  })
  if (progress) {
    progress.start()
  }

  const writer = new JsonStreamWriter(undefined, argv.indent ? _.repeat(' ', argv.indent) : null, argv.abort)
  if (progress) {
    progress.supplied = 0
  }

  collection.source.createOutput(argv)
    .then((sourceStreams) => new Promise((resolve, reject) => {
      sourceStreams = [].concat(sourceStreams)

      process.on('SIGINT', function () {
        _.forEach(sourceStreams, (sourceStream) => {
          if (_.isFunction(sourceStream.unpipe)) {
            sourceStream.unpipe()
          }
          sourceStream.destroy()
        })
      })

      _.forEach(sourceStreams, (sourceStream, index) => {
        sourceStream
          .on('error', (error) => {
            logger.log('error', 'Error while obtaining data:', error)
            reject(error)
          })

        if (index > 0) {
          sourceStreams[index - 1]
            .pipe(sourceStream)
        }
      })

      _.last(sourceStreams)
        .on('data', () => {
          if (progress) {
            progress.supplied += 1
          }
        })
        .pipe(writer)
        .on('error', (error) => {
          logger.log('error', 'Error while transforming data:', error)
          reject(error)
        })
        .on('unpipe', () => {
          writer.unpipe()
        })
        .pipe(process.stdout)
        .on('error', (error) => {
          logger.log('error', 'Error while writing data to stdout:', error)
          reject(error)
        })
        .on('unpipe', () => {
          resolve()
        })
    }))
    .then(() => {
      if (progress) {
        progress.cancel()
      }
    })
    .catch((error) => {
      if (progress) {
        progress.cancel()
      }
      logger.log('error', chalk.red('!'), 'Aborted due to:', error)
      process.exit(70)
    })
}

module.exports = declaration