e-ucm/rage-analytics-backend

View on GitHub
bin/upgrade/transformers/elastic/transformToVersion2.js

Summary

Maintainability
F
1 wk
Test Coverage
/*
 * Copyright 2016 e-UCM (http://www.e-ucm.es/)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * This project has received funding from the European Union’s Horizon
 * 2020 research and innovation programme under grant agreement No 644187.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0 (link is external)
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

'use strict';

// For ES specific naming convention we need to do this
// jscs:disable requireCamelCaseOrUpperCaseIdentifiers
let ObjectID = require('mongodb').ObjectID;

let co = require('co');
let promiseRetry = require('promise-retry');

// Used for attempting
let defaultRetryOptions = {
    retries: 10,
    factor: 2,
    minTimeout: 1000,
    maxTimeout: 60000,
    randomize: true
};

let defaultTraceAttributes = [
    'name', 'timestamp', 'event',
    'target', 'type',
    'gameplayId', 'versionId', 'session',
    'firstSessionStarted', 'currentSessionStarted',
    'score', 'success', 'completion', 'response',
    'stored', 'gameplayId_hashCode', 'event_hashCode',
    'type_hashCode', 'target_hashCode'
];

let extensions = [];

let defaultTimeout = '8m';

let defaultSize = 500;

let defaultBaseMapping = {
    mappings: {
        traces: {
            properties: {
                event: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                event_hashCode: {
                    type: 'long'
                },
                name: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                gameplayId: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                gameplayId_hashCode: {
                    type: 'long'
                },
                versionId: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                target: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                target_hashCode: {
                    type: 'long'
                },
                timestamp: {
                    type: 'date'
                },
                stored: {
                    type: 'date'
                },
                firstSessionStarted: {
                    type: 'date'
                },
                currentSessionStarted: {
                    type: 'date'
                },
                type: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                },
                type_hashCode: {
                    type: 'long'
                },
                score: {
                    type: 'float'
                },
                session: {
                    type: 'float'
                },
                success: {
                    type: 'boolean'
                },
                completion: {
                    type: 'boolean'
                },
                response: {
                    type: 'text',
                    fields: {
                        keyword: {
                            type: 'keyword',
                            ignore_above: 256
                        }
                    }
                }
            }
        }
    }
};

String.prototype.replaceAll = function (search, replacement) {
    let target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

/**
 *      {
 *           "health": "yellow",
 *           "status": "open",
 *           "index": "index_name",
 *           "uuid": "L_P5yUBnRC-fzXVrtKkmpQ",
 *           "pri": "5",
 *           "rep": "1",
 *           "docs.count": "30",
 *           "docs.deleted": "0",
 *           "store.size": "206.1kb",
 *           "pri.store.size": "206.1kb"
 *       }
 * @type {{traces: Array, versions: Array, results: Array, opaqueValues: Array, games: Array, configs: {template: null, kibana: null, defaultKibanaIndex: null}, others: Array}}
 */
let indices = {
    traces: [],
    versions: [],
    results: [],
    opaqueValues: [],
    games: [],
    configs: {
        template: null,
        kibana: null,
        defaultKibanaIndex: null
    },
    others: [],
    backup: [],
    upgrade: [],
    deleted: {}
};

// Attempt
let at = function (promise) {
    return promiseRetry(defaultRetryOptions, function (retry, number) {
        return promise.catch(function (error) {
            if (error.statusCode === 404 ||
                error.status === 404) {
                throw error;
            }
            console.error('Error retrying ' + number +
                ' for promise ' + promise.name + ' logs: ' +
                JSON.stringify(error, null, 4));
            retry(error);
        }).then(function (resp) {
            if (resp && resp.errors) {
                console.error('Error on BULK errores, retrying ' + number +
                    ' for promise ' + promise.name + ' logs: ' +
                    JSON.stringify(resp, null, 4));
                return retry(resp.errors);
            }
            return resp;
        });
    });
};

function* scrollIndex(esClient, index, callback) {

    // First we do a search, and specify a scroll timeout
    let result = yield at(esClient.search({
        index: index,
        sort: ['_doc'],
        size: defaultSize,
        scroll: defaultTimeout
    }));

    let total = result.hits.hits.length;

    yield callback(result.hits);

    while (total < result.hits.total) {

        result = yield at(esClient.scroll({
            scrollId: result._scroll_id,
            scroll: defaultTimeout
        }));
        total += result.hits.hits.length;

        yield callback(result.hits);
    }
}

let reindexManually = function* (esClient, from, to) {
    let mapping = yield at(esClient.indices.getMapping({index: from}));

    if (!(yield at(esClient.indices.exists({index: to})))) {
        yield at(esClient.indices.create({index: to}));
    }

    for (let key in mapping) {
        let indexMapping = mapping[key].mappings;
        for (let type in indexMapping) {
            yield at(esClient.indices.putMapping({
                index: to,
                type: type,
                body: indexMapping[type]
            }));

        }

    }

    function* windowed(hits) {

        let bulkUpgradedTraces = [];
        for (let i = 0; i < hits.hits.length; ++i) {
            let hit = hits.hits[i];
            bulkUpgradedTraces.push({update: {_index: to, _type: hit._type, _id: hit._id}});
            bulkUpgradedTraces.push({doc: hit._source, doc_as_upsert: true});
        }

        if (bulkUpgradedTraces.length > 0) {
            yield at(esClient.bulk({body: bulkUpgradedTraces}));
        }
    }

    yield scrollIndex(esClient, from, windowed);
};

let copyIndexObjectWithPrefixTo = function (whereArray, whatIndex, withPrefixName) {

    let found = false;
    for (let k = 0; k < whereArray.length; ++k) {
        let hitIndex = whereArray[k];
        if (hitIndex.index === withPrefixName) {
            found = true;
            break;
        }
    }
    if (!found) {
        let upgradedIndex = Object.assign({}, whatIndex);
        upgradedIndex.index = withPrefixName;
        whereArray.push(upgradedIndex);
    }
};

let backUpIndex = function* (esClient, index) {
    let backedUpIndex = 'backup_' + index.index;

    yield reindexManually(esClient, index.index, backedUpIndex);

    copyIndexObjectWithPrefixTo(indices.backup, index, backedUpIndex);
};

function* belongsToCollection(mongodb, index, collection) {
    let objectIdIndex;
    try {
        objectIdIndex = new ObjectID(index);
    } catch (ex) {
        return false;
    }

    return (yield at(mongodb.collection(collection)
        .find({_id: objectIdIndex})
        .limit(1)
        .count())) > 0;
}

function backup(config, callback) {

    co(function* () {
        let esClient = config.elasticsearch.esClient;
        yield at(esClient.ping({requestTimeout: 2000}));
        yield at(esClient.indices.refresh({index: '_all'}));
        const responseIndices = yield at(esClient.cat.indices({format: 'json'}));

        for (let i = 0; i < responseIndices.length; i++) {
            let index = responseIndices[i];
            if (index.index) {
                if (index.index.indexOf('backup_') === 0) {
                    indices.backup.push(index);
                    continue;
                }
                if (index.index.indexOf('upgrade_') === 0) {
                    indices.upgrade.push(index);
                    continue;
                }
            }


            let indexName = index.index;
            if (indexName === '.kibana') {
                yield backUpIndex(esClient, index);
                indices.configs.kibana = index;
            } else if (indexName === '.template') {
                yield backUpIndex(esClient, index);
                indices.configs.template = index;
            } else if (indexName === 'default-kibana-index') {
                yield backUpIndex(esClient, index);
                indices.configs.defaultKibanaIndex = index;
            } else if (indexName.indexOf('.games') === 0) {
                yield backUpIndex(esClient, index);
                indices.games.push(index);
            } else if (indexName.indexOf('opaque-values-') === 0) {
                yield backUpIndex(esClient, index);
                indices.opaqueValues.push(index);
            } else if (indexName.indexOf('results-') === 0) {
                indices.results.push(index);
            } else if (yield belongsToCollection(config.mongodb.db,
                    index.index, 'sessions')) {
                yield backUpIndex(esClient, index);
                indices.traces.push(index);
            } else if (yield belongsToCollection(config.mongodb.db,
                    index.index, 'versions')) {
                yield backUpIndex(esClient, index);
                indices.versions.push(index);
            } else {
                indices.others.push(index);
            }
        }

    }).catch(function (err) {
        // Log any uncaught errors
        // co will not throw any errors you do not handle!!!
        // HANDLE ALL YOUR ERRORS!!!
        console.log('Error while backing up!');
        console.error(err.stack);
        callback(err, config);
    }).then(function () {
        callback(null, config);
    });
}

function* upgradeGameIndex(esClient, gameIndex) {

    let totalSessionsVis = yield at(esClient.get({
        index: gameIndex.index,
        type: 'visualization',
        id: 'TotalSessionPlayers-Cmn'
    })).catch(function (notFound) {
        console.log('No TotalSessionPlayers-Cmn ' +
            'visualization found for game index', gameIndex.index);
        console.error(notFound);
    });

    if (totalSessionsVis && totalSessionsVis._source) {
        yield at(esClient.index({
            index: gameIndex.index,
            type: 'visualization',
            id: 'TotalSessionPlayers-Cmn',
            body: {
                title: 'TotalSessionPlayers-Cmn',
                visState: '{"title":"Total Session Players",' +
                '"type":"metric","params":{"handleNoResults":true,' +
                '"fontSize":60},"aggs":[{"id":"1","type":"cardinality",' +
                '"schema":"metric","params":{"field":"name.keyword",' +
                '"customLabel":"SessionPlayers"}}],"listeners":{}}',
                uiStateJSON: '{}',
                description: '',
                version: 1,
                kibanaSavedObjectMeta: {
                    searchSourceJSON: '{"index":"57604f53f552624300d9caa6",' +
                    '"query":{"query_string":{"query":"*","analyze_wildcard":true}},' +
                    '"filter":[]}'
                },
                author: '_default_',
                isTeacher: true,
                isDeveloper: true
            }
        }));
    }

    let xAPIVerbsActivity = yield at(esClient.get({
        index: gameIndex.index,
        type: 'visualization',
        id: 'xAPIVerbsActivity'
    })).catch(function (notFound) {
        console.log('No xAPIVerbsActivity visualization' +
            ' found for game index', gameIndex.index);
        console.error(notFound);
    });

    if (!xAPIVerbsActivity || !xAPIVerbsActivity._source) {
        return;
    }

    yield at(esClient.index({
        index: gameIndex.index,
        type: 'visualization',
        id: 'xAPIVerbsActivity',
        body: {
            title: 'xAPIVerbsActivity',
            visState: '{"title":"xAPI Verbs Activity",' +
            '"type":"histogram","params":{"shareYAxis":true,' +
            '"addTooltip":true,"addLegend":true,"scale":"linear",' +
            '"mode":"stacked","times":[],"addTimeMarker":false,' +
            '"defaultYExtents":false,"setYExtents":false,"yAxis":{}},' +
            '"aggs":[{"id":"1","type":"count","schema":"metric",' +
            '"params":{"customLabel":"Activity Count"}},{"id":"2",' +
            '"type":"terms","schema":"segment","params":{' +
            '"field":"event.keyword","size":15,"order":"desc",' +
            '"orderBy":"1","customLabel":"xAPI Verb"}}],"listeners":{}}',
            uiStateJSON: '{}',
            description: '',
            version: 1,
            kibanaSavedObjectMeta: {
                searchSourceJSON: '{"index":"57604f53f552624300d9caa6",' +
                '"query":{"query_string":{"query":"*","analyze_wildcard":true}},' +
                '"filter":[]}'
            },
            author: '_default_',
            isTeacher: false,
            isDeveloper: true
        }
    }));

}

function* upgradeGamesIndices(esClient) {
    if (indices.games.length === 0) {
        return;
    }
    for (let i = 0; i < indices.games.length; i++) {
        let gameIndex = indices.games[i];
        yield upgradeGameIndex(esClient, gameIndex);
    }
}

function checkTraceExtensions(trace) {
    let newTrace = {};
    Object.keys(trace).forEach(function (property) {
            if (defaultTraceAttributes.indexOf(property) === -1) {
                if (!newTrace.ext) {
                    newTrace.ext = {};
                }
                newTrace.ext[property] = String(trace[property]);

                if (extensions.indexOf(property) === -1) {
                    extensions.push(property);
                }
            } else {
                newTrace[property] = trace[property];
            }
        }
    );
    return newTrace;
}

function* identifyExtensionsFromIndex(esClient, index) {

    let oldMapping = yield at(esClient.indices.getMapping({index: index.index}));
    let upgradeIndex = 'upgrade_' + index.index;
    let mapping = {};
    mapping[upgradeIndex] = Object.assign({}, defaultBaseMapping);

    for (let key in oldMapping) {
        let currentType = oldMapping[key].mappings;
        for (let type in currentType) {
            let currentProperties = currentType[type];
            if (currentProperties && currentProperties.properties) {
                if (!mapping[upgradeIndex].mappings[type]) {
                    mapping[upgradeIndex].mappings[type] = {
                        properties: {}
                    };
                }

                let props = currentProperties.properties;
                let newProperties = mapping[upgradeIndex].mappings[type].properties;
                for (let property in props) {
                    if (defaultTraceAttributes.indexOf(property) === -1) {
                        if (!newProperties.ext) {
                            newProperties.ext = {
                                properties: {}
                            };
                        }
                        newProperties.ext.properties[property] = {
                            type: 'text',
                            fields: {
                                keyword: {
                                    type: 'keyword',
                                    ignore_above: 256
                                }
                            }
                        };
                    } else if (!newProperties[property]) {
                        newProperties.properties[property] = {
                            type: 'text',
                            fields: {
                                keyword: {
                                    type: 'keyword',
                                    ignore_above: 256
                                }
                            }
                        };
                    }
                }
            }
        }
    }

    let upgraded = false;

    if (!(yield at(esClient.indices.exists({index: upgradeIndex})))) {
        yield at(esClient.indices.create({index: upgradeIndex}));
    }

    let indexMapping = mapping.mappings;
    yield at(esClient.indices.close({index: upgradeIndex}));
    for (let type in indexMapping) {
        yield at(esClient.indices.putMapping({
            index: upgradeIndex,
            type: type,
            body: indexMapping[type]
        }));
    }
    yield at(esClient.indices.open({index: upgradeIndex}));

    function* windowed(hits) {
        let bulkUpgradedTraces = [];
        hits.hits.forEach(function (hit) {
            let newTrace = checkTraceExtensions(hit._source);
            bulkUpgradedTraces.push({update: {_index: upgradeIndex, _type: hit._type, _id: hit._id}});
            bulkUpgradedTraces.push({doc: newTrace, doc_as_upsert: true});
        });

        if (bulkUpgradedTraces.length > 0) {
            yield at(esClient.bulk({body: bulkUpgradedTraces}));
            upgraded = true;
        }
    }

    yield scrollIndex(esClient, index.index, windowed);
    if (upgraded) {
        copyIndexObjectWithPrefixTo(indices.upgrade, index, upgradeIndex);
    }
}

function* identifyExtensions(esClient, indexArray) {
    if (indexArray.length === 0) {
        return;
    }
    for (let i = 0; i < indexArray.length; i++) {
        yield identifyExtensionsFromIndex(esClient, indexArray[i]);
    }
}

function checkNeedsUpdateVisualization(visualization) {

    let visState = JSON.parse(visualization._source.visState.replaceAll('\\\"', '"'));

    /*
     * Example of visState:
     * {
     *   \"title\":\"Alternative Selected Correct Incorrect Per QuestionId\",
     *   \"type\":\"histogram\",
     *   \"params\":{
     *       \"addLegend\":true,
     *       \"addTimeMarker\":false,
     *       \"addTooltip\":true,
     *       \"defaultYExtents\":false,
     *       \"mode\":\"stacked\",
     *       \"scale\":\"linear\",
     *       \"setYExtents\":false,
     *       \"shareYAxis\":true,
     *       \"times\":[],
     *       \"yAxis\":{}
     *   },
     *   \"aggs\":[
     *       {\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",
     *           \"params\":{}},
     *       {\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",
     *           \"params\":{
     *               \"field\":\"target.keyword\",
     *               \"include\":{\"pattern\":\"\"},
     *               \"size\":100,
     *               \"order\":\"asc\",
     *               \"orderBy\":\"_term\",
     *               \"customLabel\":\"Alternative\"
     *               }
     *           },
     *       {\"id\":\"4\",\"type\":\"filters\",\"schema\":\"group\",
     *           \"params\":{
     *               \"filters\":[
     *                   {
     *                       \"input\":{
     *                           \"query\":{
     *                               \"query_string\":{
     *                                   \"query\":\"success:true\",
     *                                   \"analyze_wildcard\":true
     *                                   }
     *                               }
     *                           },
     *                       \"label\":\"\"
     *                   },
     *                   {
     *                       \"input\":{
     *                           \"query\":{
     *                               \"query_string\":{
     *                                   \"query\":\"success:false\",
     *                                   \"analyze_wildcard\":true
     *                               }
     *                           }
     *                       }
     *                   }
     *               ]
     *           }
     *       }
     *   ],
     *   \"listeners\":{}
     * }
     * */
    let needsUpdate = false;
    for (let i = 0; i < visState.aggs.length; ++i) {
        let agg = visState.aggs[i];
        if (!agg) {
            continue;
        }

        if (!agg.params) {
            continue;
        }

        if (!agg.params.field) {
            continue;
        }

        let field = agg.params.field;

        let isDefaultAttribute = false;
        for (let j = 0; j < defaultTraceAttributes.length; ++j) {
            let defaultAttribute = defaultTraceAttributes[j];

            if (field === defaultAttribute ||
                field === defaultAttribute + '.keyword') {
                isDefaultAttribute = true;
                break;
            }
        }

        if (!isDefaultAttribute) {
            agg.params.field = 'ext.' + field;
            needsUpdate = true;
        }
    }

    if (needsUpdate) {
        visualization._source.visState = JSON.stringify(visState);
    }
}

function* setUpKibanaIndex(esClient) {
    if (!indices.configs.kibana) {
        return;
    }

    let upgraded = false;
    let to = 'upgrade_' + indices.configs.kibana.index;

    function* windowed(hits) {
        let bulkUpgradedTraces = [];

        for (let i = 0; i < hits.hits.length; ++i) {
            let hit = hits.hits[i];
            if (hit._type === 'visualization') {
                checkNeedsUpdateVisualization(hit);
            } else if (hit._type === 'index-pattern') {
                checkNeedsUpdateIndexPattern(hit);
            }

            bulkUpgradedTraces.push({update: {_index: to, _type: hit._type, _id: hit._id}});
            bulkUpgradedTraces.push({doc: hit._source, doc_as_upsert: true});
        }

        if (bulkUpgradedTraces.length > 0) {
            yield at(esClient.bulk({body: bulkUpgradedTraces}));
            upgraded = true;
        }
    }

    yield scrollIndex(esClient, indices.configs.kibana.index, windowed);

    if (upgraded) {
        copyIndexObjectWithPrefixTo(indices.upgrade, indices.configs.kibana, to);
    }
}

function checkNeedsUpdateIndexPattern(indexPattern) {

    let fields = JSON.parse(indexPattern._source.fields.replaceAll('\\\"', '"'));


    /*Example fields:
     * [
     *   {
     *       \"name\":\"type\",
     *       \"type\":\"string\",
     *       \"count\":0,
     *       \"scripted\":false,
     *       \"indexed\":true,
     *       \"analyzed\":true,
     *       \"doc_values\":false
     *   },
     *   {
     *       \"name\":\"gameplayId\",
     *       \"type\":\"string\",
     *       \"count\":0,
     *       \"scripted\":false,
     *       \"indexed\":true,
     *       \"analyzed\":true,
     *       \"doc_values\":false
     *   },
     *   {
     *       \"name\":\"Estimulado.keyword\",
     *       \"type\":\"string\",
     *       \"count\":0,
     *       \"scripted\":false,
     *       \"indexed\":true,
     *       \"analyzed\":false,
     *       \"doc_values\":true
     *   }
     * ]
     *
     * */
    let needsUpdate = false;

    for (let i = 0; i < fields.length; ++i) {
        let field = fields[i];
        if (!field) {
            continue;
        }

        if (!field.name) {
            continue;
        }

        let fieldName = field.name;

        let isDefaultAttribute = false;
        for (let j = 0; j < defaultTraceAttributes.length; ++j) {
            let defaultAttribute = defaultTraceAttributes[j];

            if (fieldName === defaultAttribute ||
                fieldName === defaultAttribute + '.keyword') {
                isDefaultAttribute = true;
                break;
            }
        }

        if (!isDefaultAttribute) {
            field.name = 'ext.' + fieldName;
            needsUpdate = true;
        }
    }

    if (needsUpdate) {
        indexPattern._source.fields = JSON.stringify(fields);
    }
}

function* setUpTemplateIndex(esClient) {
    if (!indices.configs.template) {
        return;
    }

    let upgraded = false;
    let to = 'upgrade_' + indices.configs.template.index;

    function* windowed(hits) {

        let bulkUpgradedTraces = [];
        hits.hits.forEach(function (hit) {
            if (hit._type === 'index' ||
                hit._type === 'index-pattern') {
                checkNeedsUpdateIndexPattern(hit);
            }

            bulkUpgradedTraces.push({update: {_index: to, _type: hit._type, _id: hit._id}});
            bulkUpgradedTraces.push({doc: hit._source, doc_as_upsert: true});
        });

        if (bulkUpgradedTraces.length > 0) {
            yield at(esClient.bulk({body: bulkUpgradedTraces}));
            upgraded = true;
        }
    }

    yield scrollIndex(esClient, indices.configs.template.index, windowed);

    if (upgraded) {
        copyIndexObjectWithPrefixTo(indices.upgrade, indices.configs.template, to);
    }
}

function* setUpGameIndex(esClient, gameIndex) {

    let upgraded = false;
    let to = 'upgrade_' + gameIndex.index;

    function* windowed(hits) {
        let bulkUpgradedTraces = [];
        hits.hits.forEach(function (hit) {
            if (hit._type === 'visualization') {
                checkNeedsUpdateVisualization(hit);
            } else if (hit._type === 'index-pattern' || hit._type === 'index') {
                checkNeedsUpdateIndexPattern(hit);
            }

            bulkUpgradedTraces.push({update: {_index: to, _type: hit._type, _id: hit._id}});
            bulkUpgradedTraces.push({doc: hit._source, doc_as_upsert: true});
        });

        if (bulkUpgradedTraces.length > 0) {
            yield at(esClient.bulk({body: bulkUpgradedTraces}));
            upgraded = true;
        }
    }

    yield scrollIndex(esClient, gameIndex.index, windowed);

    if (upgraded) {
        copyIndexObjectWithPrefixTo(indices.upgrade, gameIndex, to);
    }
}

function* setUpVisualizations(esClient) {

    yield setUpKibanaIndex(esClient);

    yield setUpTemplateIndex(esClient);

    if (indices.games.length === 0) {
        return;
    }

    for (let i = 0; i < indices.games.length; i++) {
        let gameIndex = indices.games[i];
        yield setUpGameIndex(esClient, gameIndex);
    }
}

function* processExistingUpgradeIndex(esClient, index, newIndex) {

    let exists = yield at(esClient.indices.exists({index: newIndex}));

    if (exists) {
        yield at(esClient.indices.delete({index: newIndex}));
    }

    yield reindexManually(esClient, index, newIndex);

    yield at(esClient.indices.delete({index: index}));
    indices.deleted[index] = true;
}

function upgrade(config, callback) {
    co(function* () {


        let esClient = config.elasticsearch.esClient;
        yield at(esClient.indices.refresh({index: '_all'}));

        yield upgradeGamesIndices(esClient);
        console.log('Finished upgrading game indices!');

        yield identifyExtensions(esClient, indices.traces);
        console.log('Finished identifying traces extensions!', extensions);

        yield identifyExtensions(esClient, indices.versions);
        console.log('Finished identifying versions extensions!', extensions);

        yield setUpVisualizations(esClient);
        console.log('Finished setting up visualizations!', extensions);

        if (indices.upgrade.length === 0) {
            return;
        }

        yield at(esClient.indices.refresh({index: '_all'}));

        for (let i = 0; i < indices.upgrade.length; i++) {
            let index = indices.upgrade[i];
            if (!index.index) {
                continue;
            }

            let newIndex = index.index.substr('upgrade_'.length);
            if (!newIndex) {
                continue;
            }
            yield processExistingUpgradeIndex(esClient, index.index, newIndex);
        }

    }).catch(function (err) {
        // Log any uncaught errors
        // co will not throw any errors you do not handle!!!
        // HANDLE ALL YOUR ERRORS!!!
        console.log('Error while backing up!');
        console.error(err.stack);
        callback(err, config);
    }).then(function () {
        callback(null, config);
    });
}

function sourcesEquals(x, y) {
    // Recursive object equality check
    let p = Object.keys(x);
    return Object.keys(y).every(function (i) {
        if (i === 'ext') {
            let exts = y.ext;
            if (exts) {
                Object.keys(exts).every(function (extKey) {
                    if (extensions.indexOf(extKey) === -1) {
                        return false;
                    }

                    return p.indexOf(extKey) !== -1 && exts[extKey] === x[extKey];
                });
            } else {
                for (let j = 0; j < extensions.length; ++i) {
                    let identifiedExt = extensions[j];
                    if (p.indexOf(identifiedExt) !== -1) {
                        return false;
                    }
                }
            }

            return true;
        }

        if (i === 'fields' || i === 'visState') {
            return true;
        }

        if (p.indexOf(i) !== -1) {
            return true;
        }

        return extensions.indexOf(i) !== -1;
    });
}

function* checkHit(esClient, hit, index) {

    let response = yield at(esClient.get({
        index: index,
        type: hit._type,
        id: hit._id
    }));

    if (!response || !response._source) {
        return false;
    }

    return sourcesEquals(hit._source, response._source);
}

function* checkIndices(esClient, backedUpIndex, index) {

    function* windowed(hits) {
        for (let i = 0; i < hits.hits.length; ++i) {
            let hit = hits.hits[i];

            let same = yield checkHit(esClient, hit, index);

            if (!same) {
                throw new Error('Failed comparing hit ' + JSON.stringify(hit, null, 4));
            }
        }
    }

    yield scrollIndex(esClient, backedUpIndex, windowed);
}

function check(config, callback) {
    co(function* () {

            let esClient = config.elasticsearch.esClient;
            yield at(esClient.indices.refresh({index: '_all'}));

            const indicesResponse = yield at(esClient.cat.indices({format: 'json'}));

            if (!indicesResponse || indicesResponse.length === 0) {
                return callback(null, config);
            }

            let backupCount = 0;
            let i;
            let index;
            for (i = 0; i < indicesResponse.length; i++) {
                index = indicesResponse[i];
                if (index.index) {
                    if (index.index.indexOf('backup_') === 0) {
                        backupCount++;
                    }
                }
            }

            for (i = 0; i < indicesResponse.length; i++) {
                index = indicesResponse[i];
                if (!index.index) {
                    continue;
                }
                if (index.index.indexOf('backup_') !== 0) {
                    continue;
                }

                let normalIndex = index.index.substr('backup_'.length);

                if (!normalIndex) {
                    return callback(new Error('Not found correct index for backed index: ' +
                        JSON.stringify(index, null, 4)));
                }

                let foundIndex = false;
                for (let j = 0; j < indicesResponse.length; ++j) {
                    let retIndex = indicesResponse[j];
                    if (!retIndex.index ||
                        retIndex.index !== normalIndex) {
                        continue;
                    }
                    foundIndex = true;

                    if (index['docs.count'] !== retIndex['docs.count']) {
                        return callback(new Error('DIFFERENT document count ' +
                            JSON.stringify(index, null, 4) + ' and ' +
                            JSON.stringify(retIndex, null, 4)), config);
                    }

                    yield checkIndices(esClient, index.index, retIndex.index);
                }

                if (!foundIndex) {
                    return callback(new Error('Not found normal index for backed index for ' +
                        '(should not happen): ' +
                        JSON.stringify(index, null, 4)));
                }
            }
        }
    ).catch(function (err) {
        // Log any uncaught errors
        // co will not throw any errors you do not handle!!!
        // HANDLE ALL YOUR ERRORS!!!
        console.log('Error while backing up!');
        console.error(err.stack);
        callback(err, config);
    }).then(function () {
        callback(null, config);
    });
}

function clean(config, callback) {
    co(function* () {

        let esClient = config.elasticsearch.esClient;
        yield at(esClient.indices.refresh({index: '_all'}));

        let toRemove = [];
        // Remove the backups
        for (let i = 0; i < indices.backup.length; i++) {
            let backupIndex = indices.backup[i];
            toRemove.push(backupIndex.index);
        }

        // Remove the upgrades that werent removed before
        for (let j = 0; j < indices.upgrade.length; j++) {
            let upgradeIndex = indices.upgrade[j];
            if (!indices.deleted[upgradeIndex.index]) {
                toRemove.push(upgradeIndex.index);
            }
        }

        indices = {
            traces: [],
            versions: [],
            results: [],
            opaqueValues: [],
            games: [],
            configs: {
                template: null,
                kibana: null,
                defaultKibanaIndex: null
            },
            others: [],
            backup: [],
            upgrade: [],
            deleted: {}
        };
        extensions = [];

        if (toRemove.length === 0) {
            return;
        }

        yield at(esClient.indices.delete({index: toRemove.join(',')}));

    }).catch(function (err) {
        // Log any uncaught errors
        // co will not throw any errors you do not handle!!!
        // HANDLE ALL YOUR ERRORS!!!
        console.log('Error while backing up!');
        console.error(err.stack);
        callback(err, config);
    }).then(function () {
        callback(null, config);
    });
}

function* restoreIndex(esClient, backedUpIndex, newIndex) {

    let exists = yield at(esClient.indices.exists({index: newIndex}));

    if (exists) {
        let result = yield at(esClient.indices.delete({index: newIndex}));
        console.log('Index ' + newIndex + ' deleted successfully, result ' +
            JSON.stringify(result, null, 4));
    }

    reindexManually(esClient, backedUpIndex, newIndex);
}

function restore(config, callback) {
    co(function* () {

        let esClient = config.elasticsearch.esClient;
        yield at(esClient.indices.refresh({index: '_all'}));

        for (let i = 0; i < indices.backup.length; i++) {
            let index = indices.backup[i];
            restoreIndex(esClient, index.index, index.index.substr('backup_'.length));
            yield at(esClient.indices.delete({index: index.index}));
        }
        let toRemove = [];
        for (let j = 0; j < indices.upgrade.length; j++) {
            let indexj = indices.upgrade[j];
            if (indexj && indexj.index && !indices.deleted[indexj.index]) {
                toRemove.push(indexj.index);
            }
        }
        if (toRemove.length === 0) {
            return;
        }
        yield at(esClient.indices.delete({index: toRemove.join(',')}));

    }).catch(function (err) {
        // Log any uncaught errors
        // co will not throw any errors you do not handle!!!
        // HANDLE ALL YOUR ERRORS!!!
        console.log('Error while backing up!');
        console.error(err.stack);
        callback(err, config);
    }).then(function () {
        callback(null, config);
    });
}

module.exports = {
    backup: backup,
    upgrade: upgrade,
    check: check,
    clean: clean,
    restore: restore,
    requires: {
        mongo: '1'
    },
    version: {
        origin: '1',
        destination: '2'
    }
};
// jscs:enable requireCamelCaseOrUpperCaseIdentifiers