bin/multielasticdump
#!/usr/bin/env node
// I am a wrapper around basic Elasticdump
// I will source all indices from your Elasticserach server and dump them out to .json and .mapping.json files
// I probably only work on *nix hosts
// dump --input must be a URL and --output must be a path on this system
// index --input must be a path on this system --output must be URL
const argv = require('minimist')(process.argv)
const fs = require('fs')
const os = require('os')
const path = require('path')
const async = require('async')
const request = require('request')
const s3urls = require('s3urls')
const { fork } = require('child_process')
const url = require('url')
const _ = require('lodash')
const ArgParser = require(path.join(__dirname, '..', 'lib', 'argv.js'))
const addAuth = require(path.join(__dirname, '..', 'lib', 'add-auth.js'))
const versionCheck = require(path.join(__dirname, '..', 'lib', 'version-check.js'))
const AWS = require('aws-sdk')
require('aws-sdk/lib/maintenance_mode_message').suppress = true;
const initAws = require(path.join(__dirname, '..', 'lib', 'init-aws.js'))
const requestUtils = require(path.join(__dirname, '..', 'lib', 'request.js'))
const aws4signer = require(path.join(__dirname, '..', 'lib', 'aws4signer.js'))
const util = require('util')
const options = {}
let matchedIndexes = []
let working = 0
let complete = 0
let indexCounter = 0
let workTimeout
const defaults = {
debug: true,
parallel: os.cpus().length,
match: '^.*$',
matchType: 'alias',
order: 'asc',
input: null,
output: null,
scrollId: null,
scrollTime: '10m',
'scroll-with-post': false,
timeout: null,
limit: 100,
offset: 0,
size: -1,
direction: 'dump', // default to dump
'support-big-int': false,
'big-int-fields': '',
ignoreAnalyzer: true,
ignoreChildError: false,
ignoreData: false,
ignoreMapping: false,
ignoreSettings: false,
ignoreTemplate: false,
ignoreAlias: true,
ignoreIndex: true,
ignoreType: [],
includeType: null,
interval: 1000,
delete: false,
prefix: '',
suffix: '',
transform: null,
headers: null,
searchBody: null,
searchWithTemplate: null,
cert: null,
key: null,
pass: null,
ca: null,
tlsAuth: false,
'input-cert': null,
'input-key': null,
'input-pass': null,
'input-ca': null,
'output-cert': null,
'output-key': null,
'output-pass': null,
'output-ca': null,
httpAuthFile: null,
concurrency: 1,
carryoverConcurrencyCount: true,
intervalCap: 5,
concurrencyInterval: 5000,
overwrite: false,
fsCompress: false,
awsChain: false,
awsAccessKeyId: null,
awsSecretAccessKey: null,
awsIniFileProfile: null,
awsService: null,
awsRegion: null,
awsUrlRegex: null,
fileSize: null,
s3AccessKeyId: null,
s3SecretAccessKey: null,
s3Region: null,
s3Endpoint: null,
s3SSLEnabled: true,
s3ForcePathStyle: false,
s3Compress: false,
s3ServerSideEncryption: null,
s3SSEKMSKeyId: null,
s3ACL: null,
quiet: false
}
versionCheck()
const args = new ArgParser({ options })
args.parse(argv, defaults)
_.split(options.ignoreType, ',').forEach(field => {
const key = `ignore${_.upperFirst(field)}`
if (_.has(options, key)) {
options[key] = true
}
})
if (options.includeType) {
const ignoreKeys = _.keys(options).filter((key) => key.startsWith('ignore'))
const includedKeys = _.split(options.includeType, ',')
ignoreKeys.forEach(k => {
options[k] = _.indexOf(includedKeys, _.toLower(/^ignore(.*)$/.exec(k)[1])) <= -1
})
}
const commonParams = [
`--headers=${options.headers}`,
`--cert=${options.cert}`,
`--key=${options.key}`,
`--pass=${options.pass}`,
`--ca=${options.ca}`,
`--tlsAuth=${options.tlsAuth}`,
`--input-cert=${options['input-cert']}`,
`--input-key=${options['input-key']}`,
`--input-pass=${options['input-pass']}`,
`--input-ca=${options['input-ca']}`,
`--output-cert=${options['output-cert']}`,
`--output-key=${options['output-key']}`,
`--output-pass=${options['output-pass']}`,
`--output-ca=${options['output-ca']}`,
`--httpAuthFile=${options.httpAuthFile}`,
`--concurrency=${options.concurrency}`,
`--carryoverConcurrencyCount=${options.carryoverConcurrencyCount}`,
`--intervalCap=${options.intervalCap}`,
`--concurrencyInterval=${options.concurrencyInterval}`,
`--overwrite=${options.overwrite}`,
`--fsCompress=${options.fsCompress}`,
`--awsChain=${options.awsChain}`,
`--awsAccessKeyId=${options.awsAccessKeyId}`,
`--awsSecretAccessKey=${options.awsSecretAccessKey}`,
`--awsIniFileProfile=${options.awsIniFileProfile}`,
`--awsService=${options.awsService}`,
`--awsRegion=${options.awsRegion}`,
`--awsUrlRegex=${options.awsUrlRegex}`,
`--fileSize=${options.fileSize}`,
`--s3AccessKeyId=${options.s3AccessKeyId}`,
`--s3SecretAccessKey=${options.s3SecretAccessKey}`,
`--s3Region=${options.s3Region}`,
`--s3Endpoint=${options.s3Endpoint}`,
`--s3SSLEnabled=${options.s3SSLEnabled}`,
`--s3ForcePathStyle=${options.s3ForcePathStyle}`,
`--s3Compress=${options.s3Compress}`,
`--s3ServerSideEncryption=${options.s3ServerSideEncryption}`,
`--s3SSEKMSKeyId=${options.s3SSEKMSKeyId}`,
`--s3ACL=${options.s3ACL}`,
`--quiet=${options.quiet}`,
`--prefix=${options.prefix}`,
`--suffix=${options.suffix}`,
`--scroll-with-post=${options['scroll-with-post']}`
]
const fileExt = options.fsCompress ? 'json.gz' : 'json'
const validateDirectory = (options, field) => {
if (options[field].startsWith('s3://')) {
return
}
let isDir
try {
isDir = fs.lstatSync(options[field]).isDirectory()
} catch (e) {
// Handle error
if (e.code === 'ENOENT') {
// no such file or directory
console.error(`Directory --${field} : \`${options[field]}\` does not exists`)
process.exit(1)
} else {
// do something else
}
}
if (!isDir) {
console.error(`--${field} ${options[field]} is a not directory`)
process.exit(1)
}
}
const generatePath = (direction, index, order) => {
return {
[`${order}`]: `${options[order]}/${encodeURIComponent(index).toLowerCase()}`,
[`${direction}Data`]: `${options[direction]}/${index}.${fileExt}`,
[`${direction}Index`]: `${options[direction]}/${index}.index.${fileExt}`,
[`${direction}Mapping`]: `${options[direction]}/${index}.mapping.${fileExt}`,
[`${direction}Alias`]: `${options[direction]}/${index}.alias.${fileExt}`,
[`${direction}Analyzer`]: `${options[direction]}/${index}.analyzer.${fileExt}`,
[`${direction}Settings`]: `${options[direction]}/${index}.settings.${fileExt}`,
[`${direction}Template`]: `${options[direction]}/${index}.template.${fileExt}`
}
}
const _fork = (params = [], file = 'elasticdump') => {
args.log('debug', `fork: ${path.join(__dirname, file)} ${params.concat(commonParams)}`)
return fork(path.join(__dirname, file), params.concat(commonParams))
}
const attachListeners = (clazz, cb) => {
clazz.on('close', code => {
if (code !== 0) {
if (!options.ignoreChildError) {
return cb(new Error('CHILD PROCESS EXITED WITH ERROR. Stopping process'))
} else {
return cb()
}
} else {
return cb()
}
}).on('error', error => args.log('error', error))
}
const listFiles = (dir, callback) => {
if (s3urls.valid(dir)) {
initAws(options)
const s3 = new AWS.S3()
const { Bucket, Key: Prefix } = s3urls.fromUrl(dir)
s3.listObjectsV2({ Bucket, Prefix }, (err, data) => {
if (data) {
data = data.Contents.map(item => item.Key.slice(Prefix.length))
}
callback(err, data)
})
} else {
fs.readdir(dir, callback)
}
}
const elasticRequest = (params, callback) => {
_.defaults(params, {
method: 'GET',
ignoreErrors: false,
qs: {}
})
let baseUrl = params.url
if (options.httpAuthFile) {
baseUrl = addAuth(params.url, options.httpAuthFile)
}
const reqUrl = new URL(baseUrl)
reqUrl.pathname = params.path
reqUrl.search = new URLSearchParams(params.qs).toString()
const req = {
url: url.format(reqUrl),
method: params.method,
headers: Object.assign({
'User-Agent': 'elasticdump',
'Content-Type': 'application/json'
}, JSON.parse(options.headers) || {})
}
const type = options.direction === 'dump' ? 'input' : 'output'
if (options.tlsAuth) {
Object.assign(req,
requestUtils.applySSL([`${type}-cert`, `${type}-key`, `${type}-pass`, `${type}-ca`], { parent: { options } }))
Object.assign(req, requestUtils.applySSL(['cert', 'key', 'pass', 'ca'], { parent: { options } }))
}
util.callbackify(
async () => {
return await aws4signer(req, { options })
}
)(
(err, ret) => {
if (err) {
args.log('err', err)
process.exit(1)
}
args.log('debug', `${params.method} ${params.path}`)
request(req, (err, response) => {
if (err) {
args.log('err', err)
process.exit(1)
}
args.log('debug', `${params.method} ${params.path} -> ${response.statusCode} ${response.statusMessage}`)
if (response.statusCode >= 400 && !params.ignoreErrors) {
process.exit(1)
}
response = JSON.parse(response.body)
if ('error' in response && !params.ignoreErrors) {
args.log('err', response.error.reason)
process.exit(1)
}
if (callback) {
callback(response)
}
})
})
}
const deleteIndexes = (indexes, callback) => {
const req = {
method: 'DELETE',
url: options.output,
path: indexes.join(','),
ignoreErrors: false,
qs: { ignore_unavailable: true }
}
elasticRequest(req, callback)
}
if (!options.input) { throw new Error('--input is required') }
if (!options.output) { throw new Error('--output is required') }
args.log('info', `We are performing : ${options.direction}`)
args.log('info', `options: ${JSON.stringify(options)}`)
const matchRegExp = new RegExp(options.match, 'i')
if (options.direction === 'dump') {
validateDirectory(options, 'output')
const pathEnum = {
alias: '/_aliases',
datastream: '/_data_stream'
}
async.map(_.chain(options.matchType).split(',').uniq().value(), (type, callback) => {
let matchedIndices = []
if (pathEnum[type] === undefined) {
return callback(new Error(`invalid matchType ${type}`))
}
elasticRequest({ url: options.input, path: pathEnum[type] }, (response) => {
switch (type) {
case 'alias': {
let indexes = response
if (!Array.isArray(response)) {
indexes = Object.keys(response)
}
matchedIndices = indexes.filter(index => {
const aliases = Object.keys(response[index].aliases || {})
return matchRegExp.test(index) || aliases.some(alias => matchRegExp.test(alias))
})
break
}
case 'datastream': {
matchedIndices = response.data_streams.filter(stream => matchRegExp.test(stream.name)).map(stream => stream.name)
break
}
}
callback(null, matchedIndices)
})
}, (err, results) => {
if (err) {
args.log('err', err)
process.exit(1)
}
matchedIndexes = _.chain(results).flatten().orderBy(_.identity, [options.order]).value()
dumpWork()
})
}
if (options.direction === 'load') {
validateDirectory(options, 'input')
listFiles(options.input, (err, data) => {
if (err) {
args.log('error', err)
throw new Error('Something went wrong reading the list of files')
}
// args.log('info', data);
matchedIndexes = data.map(value => value
.replace(`.index.${fileExt}`, '')
.replace(`.mapping.${fileExt}`, '')
.replace(`.analyzer.${fileExt}`, '')
.replace(`.alias.${fileExt}`, '')
.replace(`.settings.${fileExt}`, '')
.replace(`.template.${fileExt}`, '')
.replace(`.${fileExt}`, '')
.replace('/', ''))
.filter(item => matchRegExp.test(item))
matchedIndexes = _.uniq(matchedIndexes)
args.log('info', `list of indexes${JSON.stringify(matchedIndexes)}`)
const next = () => {
loadWork()
}
if (options.delete) {
deleteIndexes(matchedIndexes, next)
} else {
next()
}
})
}
const dumpWork = () => {
clearTimeout(workTimeout)
if (complete === matchedIndexes.length) {
args.log('info', ' dumping all done ')
args.log('info', ' bye ')
process.exit()
} else if (working === options.parallel) {
workTimeout = setTimeout(dumpWork, options.interval)
} else {
dump()
workTimeout = setTimeout(dumpWork, options.interval)
}
}
const loadWork = () => {
clearTimeout(workTimeout)
if (complete === matchedIndexes.length) {
args.log('info', ' indexing all done ')
args.log('info', ' bye ')
process.exit()
} else if (working === options.parallel) {
workTimeout = setTimeout(loadWork, options.interval)
} else {
load()
workTimeout = setTimeout(loadWork, options.interval)
}
}
const dump = () => {
working++
const index = matchedIndexes[indexCounter]
if (!index) {
working--
return
}
indexCounter++
const {
input,
outputData,
outputIndex,
outputMapping,
outputAnalyzer,
outputAlias,
outputSettings,
outputTemplate
} = generatePath('output', index, 'input')
const jobs = []
jobs.push(done => {
if (options.ignoreTemplate) return done()
args.log('info', `dumping ${options.input} to ${outputTemplate}`)
const templateChild = _fork([
'--type=template',
`--input=${options.input}`,
`--output=${outputTemplate}`
])
attachListeners(templateChild, done)
})
jobs.push(done => {
if (options.ignoreIndex) return done()
args.log('info', `dumping ${input} to ${outputIndex}`)
const analyzerChild = _fork([
'--type=index',
`--input=${input}`,
`--output=${outputIndex}`
])
attachListeners(analyzerChild, done)
})
jobs.push(done => {
if (options.ignoreSettings) return done()
args.log('info', `dumping ${input} to ${outputSettings}`)
const settingsChild = _fork([
'--type=settings',
`--input=${input}`,
`--output=${outputSettings}`
])
attachListeners(settingsChild, done)
})
jobs.push(done => {
if (options.ignoreMapping) return done()
args.log('info', `dumping ${input} to ${outputMapping}`)
const mappingChild = _fork([
'--type=mapping',
`--input=${input}`,
`--output=${outputMapping}`
])
attachListeners(mappingChild, done)
})
jobs.push(done => {
if (options.ignoreAnalyzer) return done()
args.log('info', `analyzer ${input} to ${outputAnalyzer}`)
const analyzerChild = _fork([
'--type=analyzer',
`--input=${input}`,
`--output=${outputAnalyzer}`
])
attachListeners(analyzerChild, done)
})
jobs.push(done => {
if (options.ignoreAlias) return done()
args.log('info', `alias ${input} to ${outputAlias}`)
const aliasChild = _fork([
'--type=alias',
`--input=${input}`,
`--output=${outputAlias}`
])
attachListeners(aliasChild, done)
})
jobs.push(done => {
if (options.ignoreData) return done()
args.log('info', `dumping ${input} to ${outputData}`)
let _transform = []
if (options.transform) {
_transform = _.chain(options.transform)
.castArray()
.filter(_.negate(_.isEmpty))
.map(t => {
return `--transform=${t}`
})
.value()
}
const dataChild = _fork([
'--type=data',
`--input=${input}`,
`--output=${outputData}`,
`--scrollId=${options.scrollId}`,
`--scrollTime=${options.scrollTime}`,
`--limit=${options.limit}`,
`--offset=${options.offset}`,
`--size=${options.size}`,
`--searchBody=${options.searchBody}`,
`--searchWithTemplate=${options.searchWithTemplate}`,
`--support-big-int=${options['support-big-int']}`,
`--big-int-fields=${options['big-int-fields']}`
].concat(_transform))
attachListeners(dataChild, done)
})
async.series(jobs, error => {
if (error) {
args.log('error', error)
process.exit(1)
} else {
working--
complete++
}
})
}
const load = () => {
working++
const index = matchedIndexes[indexCounter]
if (!index) {
working--
return
}
args.log('info', `Working on ${index}`)
indexCounter++
const {
output,
inputData,
inputIndex,
inputMapping,
inputAnalyzer,
inputAlias,
inputSettings,
inputTemplate
} = generatePath('input', index, 'output')
const jobs = []
jobs.push(done => {
if (options.ignoreTemplate) return done()
args.log('info', `indexing template ${inputTemplate} to ${output}`)
const templateChild = _fork([
'--type=template',
`--input=${inputTemplate}`,
`--output=${output}`
])
attachListeners(templateChild, done)
})
jobs.push(done => {
if (options.ignoreIndex) return done()
args.log('info', `indexing ${inputIndex} to ${output}`)
const indexChild = _fork([
'--type=index',
`--input=${inputIndex}`,
`--output=${output}`
])
attachListeners(indexChild, done)
})
jobs.push(done => {
if (options.ignoreSettings) return done()
args.log('info', `indexing settings ${inputSettings} to ${output}`)
const settingsChild = _fork([
'--type=settings',
`--input=${inputSettings}`,
`--output=${output}`
])
attachListeners(settingsChild, done)
})
jobs.push(done => {
if (options.ignoreAnalyzer) return done()
args.log('info', `indexing analyzer ${inputAnalyzer} to ${output}`)
const analyzerChild = _fork([
'--type=analyzer',
`--input=${inputAnalyzer}`,
`--output=${output}`
])
attachListeners(analyzerChild, done)
})
jobs.push(done => {
if (options.ignoreMapping) return done()
args.log('info', `indexing mapping ${inputMapping} to ${output}`)
const mappingChild = _fork([
'--type=mapping',
`--input=${inputMapping}`,
`--output=${output}`
])
attachListeners(mappingChild, done)
})
jobs.push(done => {
if (options.ignoreAlias) return done()
args.log('info', `indexing alias ${inputAlias} to ${output}`)
const aliasChild = _fork([
'--type=alias',
`--input=${inputAlias}`,
`--output=${output}`
])
attachListeners(aliasChild, done)
})
jobs.push(done => {
if (options.ignoreData) return done()
args.log('info', `indexing data ${inputData} to ${output}`)
let _transform = []
if (options.transform) {
_transform = _.chain(options.transform)
.castArray()
.filter(_.negate(_.isEmpty))
.map(t => {
return `--transform=${t}`
})
.value()
}
const dataChild = _fork([
'--type=data',
`--input=${inputData}`,
`--output=${output}`,
`--timeout=${options.timeout}`,
`--limit=${options.limit}`,
`--offset=${options.offset}`,
`--size=${options.size}`,
`--support-big-int=${options['support-big-int']}`,
`--big-int-fields=${options['big-int-fields']}`
].concat(_transform))
attachListeners(dataChild, done)
})
async.series(jobs, error => {
if (error) {
args.log('error', error)
process.exit(1)
} else {
working--
complete++
}
})
}