taskrabbit/elasticsearch-dump

View on GitHub
lib/splitters/csvStreamSplitter.js

Summary

Maintainability
A
1 hr
Test Coverage
/**
 * Created by ferron on 10/3/18 1:04 PM
 * adapted from split-file-stream library
 * URL : https://github.com/dannycho7/split-file-stream/blob/master/index.js
 * credit to : @dannycho7
 */

const fs = require('fs')
const StreamSplitter = require('./streamSplitter')
const { PassThrough } = require('stream')
const zlib = require('zlib')
const jsonParser = require('../jsonparser.js')

class csvStreamSplitter extends StreamSplitter {
  constructor (file, options, context) {
    super(options)
    this._ctx = context
    this.rootFilePath = file
  }

  _outStreamCreate (partitionNum) {
    const _csvStream = this._ctx.createCsvStream()

    let _throughStream = new PassThrough()
    if (this.compress) {
      _throughStream = zlib.createGzip()
    }
    _csvStream.pipe(_throughStream).pipe(fs.createWriteStream(StreamSplitter.generateFilePath(this.rootFilePath, partitionNum, this.compress)))
    return _csvStream
  }

  sizeOf (chunk) {
    const s = jsonParser.stringify(chunk)
    return s && s.length > 0 ? s.length : 0
  }

  write (chunk) {
    if (!this.openStream) {
      this.currentOutStream = this._outStreamCreate(this.partitionNum)
      this.partitionNum++
      this.openStream = true
    }

    this.currentOutStream.write(chunk)

    if (this.splitBySize) {
      // poor man's sizeOf
      this.currentFileSize += this.sizeOf(chunk)
    } else {
      this.currentFileSize++
    }

    if (this.currentFileSize >= this.partitionStreamSize || !chunk) {
      this._endCurrentWriteStream()
    }
  }
}

module.exports = csvStreamSplitter