taskrabbit/elasticsearch-dump

View on GitHub
bin/multielasticdump

Summary

Maintainability
Test Coverage
#!/usr/bin/env node

// I am a wrapper around basic Elasticdump
// I will source all indices from your Elasticserach server and dump them out to .json and .mapping.json files
// I probably only work on *nix hosts
// dump --input must be a URL and --output must be a path on this system
// index --input must be a path on this system --output must be URL

const argv = require('minimist')(process.argv)
const fs = require('fs')
const os = require('os')
const path = require('path')
const async = require('async')
const request = require('request')
const s3urls = require('s3urls')
const { fork } = require('child_process')
const url = require('url')
const _ = require('lodash')
const ArgParser = require(path.join(__dirname, '..', 'lib', 'argv.js'))
const addAuth = require(path.join(__dirname, '..', 'lib', 'add-auth.js'))
const versionCheck = require(path.join(__dirname, '..', 'lib', 'version-check.js'))
const AWS = require('aws-sdk')
require('aws-sdk/lib/maintenance_mode_message').suppress = true;
const initAws = require(path.join(__dirname, '..', 'lib', 'init-aws.js'))
const requestUtils = require(path.join(__dirname, '..', 'lib', 'request.js'))
const aws4signer = require(path.join(__dirname, '..', 'lib', 'aws4signer.js'))
const util = require('util')

const options = {}
let matchedIndexes = []
let working = 0
let complete = 0
let indexCounter = 0
let workTimeout

const defaults = {
  debug: true,
  parallel: os.cpus().length,
  match: '^.*$',
  matchType: 'alias',
  order: 'asc',
  input: null,
  output: null,
  scrollId: null,
  scrollTime: '10m',
  'scroll-with-post': false,
  timeout: null,
  limit: 100,
  offset: 0,
  size: -1,
  direction: 'dump', // default to dump
  'support-big-int': false,
  'big-int-fields': '',
  ignoreAnalyzer: true,
  ignoreChildError: false,
  ignoreData: false,
  ignoreMapping: false,
  ignoreSettings: false,
  ignoreTemplate: false,
  ignoreAlias: true,
  ignoreIndex: true,
  ignoreType: [],
  includeType: null,
  interval: 1000,
  delete: false,
  prefix: '',
  suffix: '',
  transform: null,
  headers: null,
  searchBody: null,
  searchWithTemplate: null,
  cert: null,
  key: null,
  pass: null,
  ca: null,
  tlsAuth: false,
  'input-cert': null,
  'input-key': null,
  'input-pass': null,
  'input-ca': null,
  'output-cert': null,
  'output-key': null,
  'output-pass': null,
  'output-ca': null,
  httpAuthFile: null,
  concurrency: 1,
  carryoverConcurrencyCount: true,
  intervalCap: 5,
  concurrencyInterval: 5000,
  overwrite: false,
  fsCompress: false,
  awsChain: false,
  awsAccessKeyId: null,
  awsSecretAccessKey: null,
  awsIniFileProfile: null,
  awsService: null,
  awsRegion: null,
  awsUrlRegex: null,
  fileSize: null,
  s3AccessKeyId: null,
  s3SecretAccessKey: null,
  s3Region: null,
  s3Endpoint: null,
  s3SSLEnabled: true,
  s3ForcePathStyle: false,
  s3Compress: false,
  s3ServerSideEncryption: null,
  s3SSEKMSKeyId: null,
  s3ACL: null,
  quiet: false
}

versionCheck()

const args = new ArgParser({ options })
args.parse(argv, defaults)

_.split(options.ignoreType, ',').forEach(field => {
  const key = `ignore${_.upperFirst(field)}`
  if (_.has(options, key)) {
    options[key] = true
  }
})

if (options.includeType) {
  const ignoreKeys = _.keys(options).filter((key) => key.startsWith('ignore'))
  const includedKeys = _.split(options.includeType, ',')
  ignoreKeys.forEach(k => {
    options[k] = _.indexOf(includedKeys, _.toLower(/^ignore(.*)$/.exec(k)[1])) <= -1
  })
}

const commonParams = [
  `--headers=${options.headers}`,
  `--cert=${options.cert}`,
  `--key=${options.key}`,
  `--pass=${options.pass}`,
  `--ca=${options.ca}`,
  `--tlsAuth=${options.tlsAuth}`,
  `--input-cert=${options['input-cert']}`,
  `--input-key=${options['input-key']}`,
  `--input-pass=${options['input-pass']}`,
  `--input-ca=${options['input-ca']}`,
  `--output-cert=${options['output-cert']}`,
  `--output-key=${options['output-key']}`,
  `--output-pass=${options['output-pass']}`,
  `--output-ca=${options['output-ca']}`,
  `--httpAuthFile=${options.httpAuthFile}`,
  `--concurrency=${options.concurrency}`,
  `--carryoverConcurrencyCount=${options.carryoverConcurrencyCount}`,
  `--intervalCap=${options.intervalCap}`,
  `--concurrencyInterval=${options.concurrencyInterval}`,
  `--overwrite=${options.overwrite}`,
  `--fsCompress=${options.fsCompress}`,
  `--awsChain=${options.awsChain}`,
  `--awsAccessKeyId=${options.awsAccessKeyId}`,
  `--awsSecretAccessKey=${options.awsSecretAccessKey}`,
  `--awsIniFileProfile=${options.awsIniFileProfile}`,
  `--awsService=${options.awsService}`,
  `--awsRegion=${options.awsRegion}`,
  `--awsUrlRegex=${options.awsUrlRegex}`,
  `--fileSize=${options.fileSize}`,
  `--s3AccessKeyId=${options.s3AccessKeyId}`,
  `--s3SecretAccessKey=${options.s3SecretAccessKey}`,
  `--s3Region=${options.s3Region}`,
  `--s3Endpoint=${options.s3Endpoint}`,
  `--s3SSLEnabled=${options.s3SSLEnabled}`,
  `--s3ForcePathStyle=${options.s3ForcePathStyle}`,
  `--s3Compress=${options.s3Compress}`,
  `--s3ServerSideEncryption=${options.s3ServerSideEncryption}`,
  `--s3SSEKMSKeyId=${options.s3SSEKMSKeyId}`,
  `--s3ACL=${options.s3ACL}`,
  `--quiet=${options.quiet}`,
  `--prefix=${options.prefix}`,
  `--suffix=${options.suffix}`,
  `--scroll-with-post=${options['scroll-with-post']}`
]

const fileExt = options.fsCompress ? 'json.gz' : 'json'

const validateDirectory = (options, field) => {
  if (options[field].startsWith('s3://')) {
    return
  }

  let isDir
  try {
    isDir = fs.lstatSync(options[field]).isDirectory()
  } catch (e) {
    // Handle error
    if (e.code === 'ENOENT') {
      // no such file or directory
      console.error(`Directory --${field} : \`${options[field]}\` does not exists`)
      process.exit(1)
    } else {
      // do something else
    }
  }

  if (!isDir) {
    console.error(`--${field} ${options[field]} is a not directory`)
    process.exit(1)
  }
}

const generatePath = (direction, index, order) => {
  return {
    [`${order}`]: `${options[order]}/${encodeURIComponent(index).toLowerCase()}`,
    [`${direction}Data`]: `${options[direction]}/${index}.${fileExt}`,
    [`${direction}Index`]: `${options[direction]}/${index}.index.${fileExt}`,
    [`${direction}Mapping`]: `${options[direction]}/${index}.mapping.${fileExt}`,
    [`${direction}Alias`]: `${options[direction]}/${index}.alias.${fileExt}`,
    [`${direction}Analyzer`]: `${options[direction]}/${index}.analyzer.${fileExt}`,
    [`${direction}Settings`]: `${options[direction]}/${index}.settings.${fileExt}`,
    [`${direction}Template`]: `${options[direction]}/${index}.template.${fileExt}`
  }
}

const _fork = (params = [], file = 'elasticdump') => {
  args.log('debug', `fork: ${path.join(__dirname, file)} ${params.concat(commonParams)}`)
  return fork(path.join(__dirname, file), params.concat(commonParams))
}

const attachListeners = (clazz, cb) => {
  clazz.on('close', code => {
    if (code !== 0) {
      if (!options.ignoreChildError) {
        return cb(new Error('CHILD PROCESS EXITED WITH ERROR.  Stopping process'))
      } else {
        return cb()
      }
    } else {
      return cb()
    }
  }).on('error', error => args.log('error', error))
}

const listFiles = (dir, callback) => {
  if (s3urls.valid(dir)) {
    initAws(options)
    const s3 = new AWS.S3()
    const { Bucket, Key: Prefix } = s3urls.fromUrl(dir)
    s3.listObjectsV2({ Bucket, Prefix }, (err, data) => {
      if (data) {
        data = data.Contents.map(item => item.Key.slice(Prefix.length))
      }
      callback(err, data)
    })
  } else {
    fs.readdir(dir, callback)
  }
}

const elasticRequest = (params, callback) => {
  _.defaults(params, {
    method: 'GET',
    ignoreErrors: false,
    qs: {}
  })

  let baseUrl = params.url
  if (options.httpAuthFile) {
    baseUrl = addAuth(params.url, options.httpAuthFile)
  }
  const reqUrl = new URL(baseUrl)
  reqUrl.pathname = params.path
  reqUrl.search = new URLSearchParams(params.qs).toString()

  const req = {
    url: url.format(reqUrl),
    method: params.method,
    headers: Object.assign({
      'User-Agent': 'elasticdump',
      'Content-Type': 'application/json'
    }, JSON.parse(options.headers) || {})
  }

  const type = options.direction === 'dump' ? 'input' : 'output'

  if (options.tlsAuth) {
    Object.assign(req,
      requestUtils.applySSL([`${type}-cert`, `${type}-key`, `${type}-pass`, `${type}-ca`], { parent: { options } }))

    Object.assign(req, requestUtils.applySSL(['cert', 'key', 'pass', 'ca'], { parent: { options } }))
  }

  util.callbackify(
    async () => {
      return await aws4signer(req, { options })
    }
  )(
    (err, ret) => {
      if (err) {
        args.log('err', err)
        process.exit(1)
      }
      args.log('debug', `${params.method} ${params.path}`)
      request(req, (err, response) => {
        if (err) {
          args.log('err', err)
          process.exit(1)
        }
        args.log('debug', `${params.method} ${params.path} -> ${response.statusCode} ${response.statusMessage}`)
        if (response.statusCode >= 400 && !params.ignoreErrors) {
          process.exit(1)
        }
        response = JSON.parse(response.body)
        if ('error' in response && !params.ignoreErrors) {
          args.log('err', response.error.reason)
          process.exit(1)
        }

        if (callback) {
          callback(response)
        }
      })
    })
}

const deleteIndexes = (indexes, callback) => {
  const req = {
    method: 'DELETE',
    url: options.output,
    path: indexes.join(','),
    ignoreErrors: false,
    qs: { ignore_unavailable: true }
  }

  elasticRequest(req, callback)
}

if (!options.input) { throw new Error('--input is required') }
if (!options.output) { throw new Error('--output is required') }
args.log('info', `We are performing : ${options.direction}`)
args.log('info', `options: ${JSON.stringify(options)}`)

const matchRegExp = new RegExp(options.match, 'i')
if (options.direction === 'dump') {
  validateDirectory(options, 'output')

  const pathEnum = {
    alias: '/_aliases',
    datastream: '/_data_stream'
  }

  async.map(_.chain(options.matchType).split(',').uniq().value(), (type, callback) => {
    let matchedIndices = []
    if (pathEnum[type] === undefined) {
      return callback(new Error(`invalid matchType ${type}`))
    }

    elasticRequest({ url: options.input, path: pathEnum[type] }, (response) => {
      switch (type) {
        case 'alias': {
          let indexes = response
          if (!Array.isArray(response)) {
            indexes = Object.keys(response)
          }
          matchedIndices = indexes.filter(index => {
            const aliases = Object.keys(response[index].aliases || {})
            return matchRegExp.test(index) || aliases.some(alias => matchRegExp.test(alias))
          })
          break
        }
        case 'datastream': {
          matchedIndices = response.data_streams.filter(stream => matchRegExp.test(stream.name)).map(stream => stream.name)
          break
        }
      }
      callback(null, matchedIndices)
    })
  }, (err, results) => {
    if (err) {
      args.log('err', err)
      process.exit(1)
    }
    matchedIndexes = _.chain(results).flatten().orderBy(_.identity, [options.order]).value()
    dumpWork()
  })
}

if (options.direction === 'load') {
  validateDirectory(options, 'input')
  listFiles(options.input, (err, data) => {
    if (err) {
      args.log('error', err)
      throw new Error('Something went wrong reading the list of files')
    }
    // args.log('info', data);
    matchedIndexes = data.map(value => value
      .replace(`.index.${fileExt}`, '')
      .replace(`.mapping.${fileExt}`, '')
      .replace(`.analyzer.${fileExt}`, '')
      .replace(`.alias.${fileExt}`, '')
      .replace(`.settings.${fileExt}`, '')
      .replace(`.template.${fileExt}`, '')
      .replace(`.${fileExt}`, '')
      .replace('/', ''))
      .filter(item => matchRegExp.test(item))
    matchedIndexes = _.uniq(matchedIndexes)
    args.log('info', `list of indexes${JSON.stringify(matchedIndexes)}`)

    const next = () => {
      loadWork()
    }

    if (options.delete) {
      deleteIndexes(matchedIndexes, next)
    } else {
      next()
    }
  })
}

const dumpWork = () => {
  clearTimeout(workTimeout)
  if (complete === matchedIndexes.length) {
    args.log('info', ' dumping all done ')
    args.log('info', ' bye ')
    process.exit()
  } else if (working === options.parallel) {
    workTimeout = setTimeout(dumpWork, options.interval)
  } else {
    dump()
    workTimeout = setTimeout(dumpWork, options.interval)
  }
}

const loadWork = () => {
  clearTimeout(workTimeout)
  if (complete === matchedIndexes.length) {
    args.log('info', ' indexing all done ')
    args.log('info', ' bye ')
    process.exit()
  } else if (working === options.parallel) {
    workTimeout = setTimeout(loadWork, options.interval)
  } else {
    load()
    workTimeout = setTimeout(loadWork, options.interval)
  }
}

const dump = () => {
  working++
  const index = matchedIndexes[indexCounter]

  if (!index) {
    working--
    return
  }

  indexCounter++

  const {
    input,
    outputData,
    outputIndex,
    outputMapping,
    outputAnalyzer,
    outputAlias,
    outputSettings,
    outputTemplate
  } = generatePath('output', index, 'input')

  const jobs = []

  jobs.push(done => {
    if (options.ignoreTemplate) return done()
    args.log('info', `dumping ${options.input} to ${outputTemplate}`)

    const templateChild = _fork([
      '--type=template',
      `--input=${options.input}`,
      `--output=${outputTemplate}`
    ])

    attachListeners(templateChild, done)
  })

  jobs.push(done => {
    if (options.ignoreIndex) return done()
    args.log('info', `dumping ${input} to ${outputIndex}`)

    const analyzerChild = _fork([
      '--type=index',
      `--input=${input}`,
      `--output=${outputIndex}`
    ])

    attachListeners(analyzerChild, done)
  })

  jobs.push(done => {
    if (options.ignoreSettings) return done()
    args.log('info', `dumping ${input} to ${outputSettings}`)

    const settingsChild = _fork([
      '--type=settings',
      `--input=${input}`,
      `--output=${outputSettings}`
    ])

    attachListeners(settingsChild, done)
  })

  jobs.push(done => {
    if (options.ignoreMapping) return done()
    args.log('info', `dumping ${input} to ${outputMapping}`)

    const mappingChild = _fork([
      '--type=mapping',
      `--input=${input}`,
      `--output=${outputMapping}`
    ])

    attachListeners(mappingChild, done)
  })

  jobs.push(done => {
    if (options.ignoreAnalyzer) return done()
    args.log('info', `analyzer ${input} to ${outputAnalyzer}`)

    const analyzerChild = _fork([
      '--type=analyzer',
      `--input=${input}`,
      `--output=${outputAnalyzer}`
    ])

    attachListeners(analyzerChild, done)
  })

  jobs.push(done => {
    if (options.ignoreAlias) return done()
    args.log('info', `alias ${input} to ${outputAlias}`)

    const aliasChild = _fork([
      '--type=alias',
      `--input=${input}`,
      `--output=${outputAlias}`
    ])

    attachListeners(aliasChild, done)
  })

  jobs.push(done => {
    if (options.ignoreData) return done()
    args.log('info', `dumping ${input} to ${outputData}`)

    let _transform = []

    if (options.transform) {
      _transform = _.chain(options.transform)
        .castArray()
        .filter(_.negate(_.isEmpty))
        .map(t => {
          return `--transform=${t}`
        })
        .value()
    }

    const dataChild = _fork([
      '--type=data',
      `--input=${input}`,
      `--output=${outputData}`,
      `--scrollId=${options.scrollId}`,
      `--scrollTime=${options.scrollTime}`,
      `--limit=${options.limit}`,
      `--offset=${options.offset}`,
      `--size=${options.size}`,
      `--searchBody=${options.searchBody}`,
      `--searchWithTemplate=${options.searchWithTemplate}`,
      `--support-big-int=${options['support-big-int']}`,
      `--big-int-fields=${options['big-int-fields']}`
    ].concat(_transform))

    attachListeners(dataChild, done)
  })

  async.series(jobs, error => {
    if (error) {
      args.log('error', error)
      process.exit(1)
    } else {
      working--
      complete++
    }
  })
}

const load = () => {
  working++
  const index = matchedIndexes[indexCounter]

  if (!index) {
    working--
    return
  }

  args.log('info', `Working on ${index}`)

  indexCounter++

  const {
    output,
    inputData,
    inputIndex,
    inputMapping,
    inputAnalyzer,
    inputAlias,
    inputSettings,
    inputTemplate
  } = generatePath('input', index, 'output')

  const jobs = []

  jobs.push(done => {
    if (options.ignoreTemplate) return done()
    args.log('info', `indexing template ${inputTemplate} to ${output}`)

    const templateChild = _fork([
      '--type=template',
      `--input=${inputTemplate}`,
      `--output=${output}`
    ])

    attachListeners(templateChild, done)
  })

  jobs.push(done => {
    if (options.ignoreIndex) return done()
    args.log('info', `indexing ${inputIndex} to ${output}`)

    const indexChild = _fork([
      '--type=index',
      `--input=${inputIndex}`,
      `--output=${output}`
    ])

    attachListeners(indexChild, done)
  })

  jobs.push(done => {
    if (options.ignoreSettings) return done()
    args.log('info', `indexing settings ${inputSettings} to ${output}`)

    const settingsChild = _fork([
      '--type=settings',
      `--input=${inputSettings}`,
      `--output=${output}`
    ])

    attachListeners(settingsChild, done)
  })

  jobs.push(done => {
    if (options.ignoreAnalyzer) return done()
    args.log('info', `indexing analyzer ${inputAnalyzer} to ${output}`)

    const analyzerChild = _fork([
      '--type=analyzer',
      `--input=${inputAnalyzer}`,
      `--output=${output}`
    ])

    attachListeners(analyzerChild, done)
  })

  jobs.push(done => {
    if (options.ignoreMapping) return done()
    args.log('info', `indexing mapping ${inputMapping} to ${output}`)

    const mappingChild = _fork([
      '--type=mapping',
      `--input=${inputMapping}`,
      `--output=${output}`
    ])

    attachListeners(mappingChild, done)
  })

  jobs.push(done => {
    if (options.ignoreAlias) return done()
    args.log('info', `indexing alias ${inputAlias} to ${output}`)

    const aliasChild = _fork([
      '--type=alias',
      `--input=${inputAlias}`,
      `--output=${output}`
    ])

    attachListeners(aliasChild, done)
  })

  jobs.push(done => {
    if (options.ignoreData) return done()
    args.log('info', `indexing data ${inputData} to ${output}`)

    let _transform = []

    if (options.transform) {
      _transform = _.chain(options.transform)
        .castArray()
        .filter(_.negate(_.isEmpty))
        .map(t => {
          return `--transform=${t}`
        })
        .value()
    }

    const dataChild = _fork([
      '--type=data',
      `--input=${inputData}`,
      `--output=${output}`,
      `--timeout=${options.timeout}`,
      `--limit=${options.limit}`,
      `--offset=${options.offset}`,
      `--size=${options.size}`,
      `--support-big-int=${options['support-big-int']}`,
      `--big-int-fields=${options['big-int-fields']}`
    ].concat(_transform))

    attachListeners(dataChild, done)
  })

  async.series(jobs, error => {
    if (error) {
      args.log('error', error)
      process.exit(1)
    } else {
      working--
      complete++
    }
  })
}