dataplug-io/dataplug

View on GitHub
src/replicate.ts

Summary

Maintainability
A
1 hr
Test Coverage
// Copyright (C) 2017-2019 Brainbean Apps OU (https://brainbeanapps.com).
// License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import { last, first, forEach, flatten, isFunction } from 'lodash'
import check from 'check-types'
import { Promise as BluebirdPromise } from 'bluebird'
import * as logger from 'winston'
import { Transform } from 'stream'

/**
 * Replicates data using specified chain
 *
 * @param chain Array that starts from Readable, and ends with Writable with Transform(s) in-between
 */
export default async function replicate(
  chain: Array<
    NodeJS.ReadableStream | NodeJS.WritableStream | Transform | Promise<any>
  >,
) {
  check.assert.nonEmptyArray(chain)
  check.assert.greaterOrEqual(chain.length, 2)

  let resolvedChain: any
  return BluebirdPromise.all(chain)
    .then(
      chain =>
        new Promise((resolve, reject) => {
          resolvedChain = chain = flatten(chain)

          forEach(chain, (item, index) => {
            item.on('error', (error: Error) => {
              reject(error)
            })

            if (index > 0) {
              chain[index - 1].pipe(item)
            }
          })

          if (chain) {
            last(chain).on('finish', () => {
              resolve()
            })
            first(chain).resume()
          }
        }),
    )
    .catch(error => {
      logger.log('error', 'Error while replicating:', error)

      forEach(resolvedChain, item => {
        if (isFunction(item.unpipe)) {
          item.unpipe()
        }
        item.destroy()
      })

      throw error
    })
}