zotoio/github-task-manager

View on GitHub
src/agent/AgentMetrics.js

Summary

Maintainability
C
1 day
Test Coverage
import { default as ExpressSSE } from 'express-sse';
import { default as AWS } from 'aws-sdk';
import { default as AgentLogger } from './AgentLogger';
import { default as json } from 'format-json';
import { default as DynamoDBStream } from 'dynamodb-stream';
import { default as elasticsearch } from 'elasticsearch';
import { schedule } from 'tempus-fugit';
import { AgentUtils } from './AgentUtils';
import { default as rp } from 'request-promise-native';
import { version as agentVersion } from '../../../package.json';
import { default as proxy } from 'proxy-agent';
import { default as os } from 'os';

const agentGroup = process.env.GTM_AGENT_GROUP || 'default';

AWS.config.update({ region: process.env.GTM_AWS_REGION });

let log = AgentLogger.log();
const EVENTS_TABLE = process.env.GTM_DYNAMO_TABLE_EVENTS;
const AGENTS_TABLE = process.env.GTM_DYNAMO_TABLE_AGENTS;
let INITIAL_DATA = [];
INITIAL_DATA[EVENTS_TABLE] = [];
INITIAL_DATA[AGENTS_TABLE] = [];

let elastic;
if (process.env.GTM_ELASTIC_HOST && process.env.GTM_ELASTIC_PORT) {
    elastic = new elasticsearch.Client({
        host: `${process.env.GTM_ELASTIC_HOST}:${process.env.GTM_ELASTIC_PORT}`,
        log: 'info'
    });
}

if (process.env.IAM_ENABLED) {
    AWS.config.update({
        httpOptions: {
            agent: proxy(process.env.HTTP_PROXY)
        }
    });
}

let sns = new AWS.SNS({ apiVersion: '2010-03-31' });

export class AgentMetrics {
    static async configureRoutes(app) {
        let ddb = AgentUtils.getDynamoDB();

        await this.bridgeStreams(app, ddb, EVENTS_TABLE, '/metrics/stream');
        await this.bridgeStreams(app, ddb, AGENTS_TABLE, '/metrics/agents/stream');

        app.get('/metrics', (req, res) => {
            res.render('metrics.html');
        });

        app.get('/metrics/log/gtm-:ghEventId.txt', async (req, res) => {
            if (!elastic) {
                res.write('elasticsearch is not configured');
                res.end();
                return;
            }

            let ghEventId = req.params.ghEventId;
            let logs = await this.getEventLogs(ghEventId);

            logs.forEach(log => {
                res.write(`${log._source['@timestamp']} ${log._source.message} \n`);
            });

            res.end();
        });

        app.get('/metrics/log/gtm-:ghEventId.json', async (req, res) => {
            if (!elastic) {
                res.json({ error: 'elasticsearch is not configured' });
                res.end();
                return;
            }

            let ghEventId = req.params.ghEventId;
            let logs = await this.getEventLogs(ghEventId);
            res.json(logs);
        });

        app.get('/metrics/health', async (req, res) => {
            let includeDetail = false;
            let result = await this.getHealth(ddb, includeDetail);
            res.json(result);
            res.end();
        });

        app.get('/metrics/health/detail', async (req, res) => {
            let includeDetail = true;
            let result = await this.getHealth(ddb, includeDetail);
            res.json(result);
            res.end();
        });

        app.get('/metrics/agent/kill/:agentId', async (req, res) => {
            let agentId = req.params.agentId;
            log.info(`sending kill code for agent: ${agentId}`);
            let result = await this.broadcastKill(agentId);
            res.json(result);
            res.end();
        });

        app.get('/metrics/agent/info/:agentId', async (req, res) => {
            let agentId = req.params.agentId || '*';
            log.info(`sending info request to all agents`);
            let result = await this.broadcastInfoRequest(agentId);
            res.json(result);
            res.end();
        });
    }

    static async bridgeStreams(app, ddb, tableName, uri) {
        let ddbStream = await this.getDDBStream(ddb, tableName);

        // fetch stream state initially
        ddbStream.fetchStreamState(async err => {
            if (err) {
                log.error(err);
                return;
            }

            let ddbDocClient = new AWS.DynamoDB.DocumentClient({
                convertEmptyValues: true,
                service: ddb
            });

            // fetch initial data
            let data = await ddbDocClient.scan({ TableName: tableName }).promise();
            INITIAL_DATA[tableName] = data.Items;
            log.debug(`Initial data for ${tableName}: ${json.plain(INITIAL_DATA[tableName])}`);

            // poll
            schedule({ second: 10 }, function(job) {
                ddbStream.fetchStreamState(job.callback());
            });

            let sseStream = new ExpressSSE(INITIAL_DATA[tableName]);
            app.get(uri, sseStream.init);

            ddbStream.on('insert record', object => {
                log.debug(`inserted ${json.plain(object)}`);
                INITIAL_DATA[tableName].push(object);
                sseStream.updateInit(INITIAL_DATA[tableName]);
                sseStream.send(object);
            });

            ddbStream.on('modify record', object => {
                log.debug(`updated ${json.plain(object)}`);
                INITIAL_DATA[tableName].push(object);
                sseStream.updateInit(INITIAL_DATA[tableName]);
                sseStream.send(object);
            });
        });
    }

    static async getDDBStream(ddb, tableName) {
        let tableDetails = await ddb.describeTable({ TableName: tableName }).promise();
        log.debug(json.plain(tableDetails));
        let tableStreamArn = tableDetails.Table.LatestStreamArn;
        let ddbStream = new DynamoDBStream(new AWS.DynamoDBStreams(), tableStreamArn);
        ddbStream.on('error', err => {
            log.error(err);
        });

        ddbStream.on('end', () => {
            log.info(`stream closed`);
        });
        return ddbStream;
    }

    static async getEventLogs(ghEventId) {
        let results = await elastic.search({
            index: 'logstash*',
            size: 10000,
            body: {
                query: {
                    match: {
                        ghEventId: ghEventId
                    }
                }
            },
            sort: '@timestamp:asc'
        });

        return results.hits.hits;
    }

    static async getHealth(ddb, includeDetails) {
        return {
            agent: await this.getAgentInfo(includeDetails),
            node: await this.getProcessInfo(includeDetails),
            elastic: await this.getElasticInfo(includeDetails),
            dynamodb: await this.getDynamoInfo(ddb, includeDetails),
            sqs: await this.getSQSInfo(includeDetails),
            os: await this.getOSInfo(includeDetails)
        };
    }

    static async getOSInfo(includeDetails) {
        if (!includeDetails) {
            return {
                hostname: os.hostname()
            };
        } else {
            return {
                hostname: os.hostname(),
                type: os.type(),
                platform: os.platform(),
                arch: os.arch(),
                release: os.release(),
                uptime: os.uptime(),
                loadavg: os.loadavg(),
                totalmem: os.totalmem(),
                freemem: os.freemem(),
                cpus: os.cpus(),
                networkInterfaces: os.networkInterfaces()
            };
        }
    }

    static async getAgentInfo(includeDetails) {
        let result = {
            id: AgentUtils.agentId(),
            version: agentVersion,
            group: agentGroup
        };
        if (includeDetails) {
            result.env = this.getEnvParams();
        }
        return result;
    }

    static async getElasticInfo(includeDetails) {
        let result = 'not configured';
        if (process.env.GTM_ELASTIC_HOST && process.env.GTM_ELASTIC_PORT) {
            result = await rp({
                json: true,
                uri: `http://${process.env.GTM_ELASTIC_HOST}:${process.env.GTM_ELASTIC_PORT}`
            });
            if (!includeDetails) {
                result = 'found';
            }
        }
        return result;
    }

    static async getSQSInfo(includeDetails) {
        let sqsPendingStats = await this.describeQueue(process.env.GTM_SQS_PENDING_QUEUE, ['All'], true);
        let sqsResultsStats = await this.describeQueue(process.env.GTM_SQS_RESULTS_QUEUE, ['All'], true);
        if (!includeDetails) {
            sqsPendingStats = 'found';
            sqsResultsStats = 'found';
        }
        return {
            pending: sqsPendingStats,
            results: sqsResultsStats
        };
    }

    static async getDynamoInfo(ddb, includeDetails) {
        let eventsTableStatus = await ddb.describeTable({ TableName: EVENTS_TABLE }).promise();
        let agentsTableStatus = await ddb.describeTable({ TableName: AGENTS_TABLE }).promise();

        if (!includeDetails) {
            eventsTableStatus = 'found';
            agentsTableStatus = 'found';
        }

        return {
            events: eventsTableStatus,
            agents: agentsTableStatus
        };
    }

    static getProcessInfo() {
        return {
            version: process.version,
            pid: process.pid,
            uptime: process.uptime(),
            cpuUsage: process.cpuUsage(),
            memoryUsage: process.memoryUsage()
        };
    }

    static getEnvParams() {
        let env = {};
        Object.keys(process.env)
            .sort()
            .forEach(key => {
                if (key.startsWith('GTM')) {
                    env[key] = AgentUtils.varMask(key, process.env[key]);
                }
            });
        return env;
    }

    static async describeQueue(queueName, attributeNameArray, includeDetails) {
        let sqs = new AWS.SQS();
        if (!includeDetails) {
            return 'found';
        }
        let queueUrl = await sqs.getQueueUrl({ QueueName: queueName }).promise();
        log.debug(`sqs queue url ${queueName}: ${json.plain(queueUrl)}`);
        if (!attributeNameArray) attributeNameArray = ['All'];
        let sqsQueueParams = {
            QueueUrl: queueUrl.QueueUrl,
            AttributeNames: attributeNameArray
        };
        let queueDetails = await sqs.getQueueAttributes(sqsQueueParams).promise();

        let result = {};
        result.name = queueName;
        result.url = queueUrl.QueueUrl;
        result.attributes = queueDetails.Attributes;
        log.debug(`sqs queue details: ${result}`);
        return result;
    }

    static async broadcast(message) {
        return sns
            .createTopic({
                Name: process.env.GTM_SNS_AGENTS_TOPIC
            })
            .promise()
            .then(data => {
                let topicArn = data.TopicArn;
                let params = {
                    Message: JSON.stringify(message),
                    TopicArn: topicArn
                };
                return Promise.resolve(params);
            })
            .then(params => {
                return sns.publish(params).promise();
            });
    }

    static async broadcastKill(agentId) {
        let msg = {
            action: 'KILL',
            agentId: agentId
        };
        return await this.broadcast(msg);
    }

    static async broadcastInfoRequest(agentId) {
        let msg = {
            action: 'INFO',
            agentId: agentId
        };
        return await this.broadcast(msg);
    }
}