gfw-api/gfw-subscription-api

View on GitHub
app/src/services/datasetService.js

Summary

Maintainability
F
5 days
Test Coverage
F
30%
/* eslint-disable no-continue */
const Subscription = require('models/subscription');
const ctRegisterMicroservice = require('ct-register-microservice-node');
const MailService = require('services/mailService');
const JSONAPIDeserializer = require('jsonapi-serializer').Deserializer;
const julian = require('julian');
const logger = require('logger');
const request = require('request');

class DatasetService {

    // eslint-disable-next-line consistent-return
    static async runSubscriptionQuery(subscription, queryType) {
        logger.info('Iterate over datasetsQuery of each subs');
        const queryData = [];
        for (let j = 0; j < subscription.datasetsQuery.length; j++) {
            try {
                const datasetQuery = subscription.datasetsQuery[j];
                // for each subs, each dataset query -> get dataset, get geostoreId from area and finally exec the desired query
                const dataset = await DatasetService.getDataset(datasetQuery.id);
                if (!dataset) {
                    logger.error('Error getting dataset of subs');
                    continue;
                }
                // geostore from different sources
                let geostoreId = null;
                if (subscription.params.area) {
                    geostoreId = await DatasetService.getGeostoreIdByArea(subscription.params.area);
                } else if (subscription.params.geostore) {
                    geostoreId = subscription.params.geostore;
                } else {
                    geostoreId = await DatasetService.getGeostoreIdByParams(subscription.params);
                }
                if (!geostoreId) {
                    logger.error('Error getting geostore of area');
                    continue;
                }

                const result = await DatasetService.executeQuery(
                    dataset.subscribable[datasetQuery.type][queryType],
                    datasetQuery.lastSentDate,
                    new Date(),
                    geostoreId,
                    dataset.tableName,
                    datasetQuery.threshold
                );
                // for data endpoint
                if (queryType === 'dataQuery') {
                    const queryDataObject = {};
                    queryDataObject[`${datasetQuery.id}`] = {
                        type: datasetQuery.type,
                        timestamp: datasetQuery.lastSentDate,
                        data: result.data
                    };
                    queryData.push(queryDataObject);
                    continue; // not sending emails if dataQuery
                }
                if (!result) {
                    logger.error('Error processing subs query');
                    continue;
                } else {
                    logger.debug('Result: ', result);
                    try {
                        if (result.data && result.data.length === 1 && result.data[0].value && result.data[0].value > 0) {
                            // getting metadata first
                            const metadata = await DatasetService.getMetadata(datasetQuery.id, subscription.application, subscription.language);
                            const areaName = subscription.params.area ? await DatasetService.getAreaName(subscription.params.area) : '';
                            const datasetName = metadata[0].attributes.info.name || dataset.attributes.name || dataset.name;
                            const data = {
                                subject: subscription.name ? subscription.name : `${datasetQuery.type} in ${areaName} above ${datasetQuery.threshold}`,
                                datasetName,
                                datasetId: datasetQuery.id,
                                datasetSummary: metadata[0].attributes.info.functions,
                                areaId: subscription.params.area ? subscription.params.area : '',
                                areaName,
                                alertName: subscription.name ? subscription.name : `${datasetQuery.type} in ${areaName} above ${datasetQuery.threshold}`,
                                alertType: datasetQuery.type,
                                alertBeginDate: datasetQuery.lastSentDate.toISOString().slice(0, 10),
                                alertEndDate: new Date().toISOString().slice(0, 10),
                                alertResult: result.data[0].value
                            };

                            // Execute EMAIL notification - sending an email
                            if (subscription.resource.type === 'EMAIL') {
                                logger.debug('Sending mail with data', data);
                                let template = 'dataset-rw';
                                if (subscription.env && subscription.env !== 'production') {
                                    template += `-${subscription.env}`;
                                }
                                MailService.sendMail(template, data, [{ address: subscription.resource.content }], 'rw'); // sender='rw'
                            }

                            // Execute URL notification - POSTing to webhook
                            if (subscription.resource.type === 'URL') {
                                // POST to URL configured in subscription.resource.content
                                request.post(subscription.resource.content, { json: data }, (error, res, body) => {
                                    if (res && res.statusCode < 400) {
                                        logger.info(`Successfully POSTed to subscription web-hook with status code ${res.statusCode}:`, subscription, body);
                                        return;
                                    }

                                    logger.warn(`Error in call to subscription webhook. Subscription id: ${subscription.id} || POST url: ${subscription.resource.content} || POST body: ${JSON.stringify(data)} || Response code: ${res && res.statusCode} || Response body: ${body && JSON.stringify(body)} || error: ${error && JSON.stringify(error)}`);
                                });
                            }

                            // update subs
                            if (dataset.mainDateField) {
                                subscription.datasetsQuery[j].lastSentDate = await DatasetService.getLastDateFromDataset(dataset.slug, dataset.mainDateField);
                            } else {
                                subscription.datasetsQuery[j].lastSentDate = new Date();
                            }
                            subscription.datasetsQuery[j].historical = subscription.datasetsQuery[j].historical.concat([{
                                value: result.data[0].value,
                                date: new Date()
                            }]);
                            await subscription.save();
                            logger.debug('Finished subscription');
                        }
                    } catch (e) {
                        logger.error(e);
                        continue;
                    }
                }
            } catch (e) {
                logger.error(e);
            }
        }
        if (queryType === 'dataQuery') {
            return queryData;
        }
    }

    // for data endpoint
    static async processSubscriptionData(subscriptionId) {
        const subscription = await Subscription.findById(subscriptionId).exec();
        const data = await DatasetService.runSubscriptionQuery(subscription, 'dataQuery');
        return data;
    }


    static async processSubscriptions() {
        logger.info('Processing dataset subs');
        logger.info('Getting datasetsQuery subscriptions');
        const subscriptions = await Subscription.find({
            confirmed: true,
            datasetsQuery: { $exists: true, $not: { $size: 0 } }
        }).exec();
        logger.info('Iterate over subs');
        for (let i = 0; i < subscriptions.length; i++) {
            const subscription = subscriptions[i];
            await DatasetService.runSubscriptionQuery(subscription, 'subscriptionQuery');
        }
    }

    static async getDataset(datasetId) {
        try {
            const result = await ctRegisterMicroservice.requestToMicroservice({
                uri: `/dataset/${datasetId}`,
                method: 'GET',
                json: true
            });
            return result.data.attributes;
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async getMetadata(datasetId, application, language) {
        try {
            const result = await ctRegisterMicroservice.requestToMicroservice({
                uri: `/dataset/${datasetId}/metadata?application=${application}&language=${language}`,
                method: 'GET',
                json: true
            });
            return result.data;
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async getGeostoreIdByArea(idArea) {
        try {
            logger.info('Obtaining area with id: ', idArea);
            const areaResult = await ctRegisterMicroservice.requestToMicroservice({
                uri: `/area/${idArea}`,
                method: 'GET',
                json: true
            });

            const area = await new JSONAPIDeserializer({
                keyForAttribute: 'camelCase'
            }).deserialize(areaResult);
            logger.info('Area Result', area);
            if (area.geostore) {
                return area.geostore;
            }
            let uri = '/geostore';
            if (area.use && area.use.name) {
                uri += `/use/${area.use.name}/${area.use.id}`;
            } else if (area.wdpaid) {
                uri += `/wdpa/${area.wdpaid}`;
            } else if (area.iso) {
                if (area.iso && area.iso.region) {
                    if (area.iso.subregion) {
                        uri += `/admin/${area.iso.country}/${area.iso.region}/${area.iso.subregion}`;
                    } else {
                        uri += `/admin/${area.iso.country}/${area.iso.region}`;
                    }
                } else {
                    uri += `/admin/${area.iso.country}`;
                }
            }
            try {
                logger.info('Uri', uri);
                const result = await ctRegisterMicroservice.requestToMicroservice({
                    uri,
                    method: 'GET',
                    json: true
                });
                const geostore = await new JSONAPIDeserializer({
                    keyForAttribute: 'camelCase'
                }).deserialize(result);
                return geostore.id;
            } catch (error) {
                logger.error(error);
                return null;
            }
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async getAreaName(idArea) {
        try {
            logger.info('Obtaining area with id: ', idArea);
            const result = await ctRegisterMicroservice.requestToMicroservice({
                uri: `/area/${idArea}`,
                method: 'GET',
                json: true
            });
            const area = await new JSONAPIDeserializer({
                keyForAttribute: 'camelCase'
            }).deserialize(result);
            return area.name;
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async getGeostoreIdByParams(params) {
        try {
            let uri = '/geostore';
            if (params.use && params.useid) {
                uri += `/use/${params.use}/${params.useid}`;
            } else if (params.wdpaid) {
                uri += `/wdpa/${params.wdpaid}`;
            } else if (params.iso && params.iso.country) {
                if (params.iso && params.iso.region) {
                    if (params.iso.subregion) {
                        uri += `/admin/${params.iso.country}/${params.iso.region}/${params.iso.subregion}`;
                    } else {
                        uri += `/admin/${params.iso.country}/${params.iso.region}`;
                    }
                } else {
                    uri += `/admin/${params.iso.country}`;
                }
            }
            try {
                logger.info('Uri', uri);
                const result = await ctRegisterMicroservice.requestToMicroservice({
                    uri,
                    method: 'GET',
                    json: true
                });
                const geostore = await new JSONAPIDeserializer({
                    keyForAttribute: 'camelCase'
                }).deserialize(result);
                return geostore.id;
            } catch (error) {
                logger.error(error);
                return null;
            }
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async executeQuery(query, beginDate, endDate, geostoreId, tableName, threshold) {

        const julianDayBegin = julian.toJulianDay(beginDate);
        const yearBegin = beginDate.getFullYear();
        const julianDayEnd = julian.toJulianDay(endDate);
        const yearEnd = endDate.getFullYear();
        const finalQuery = query.replace('{{begin}}', beginDate.toISOString().slice(0, 10)).replace('{{end}}', endDate.toISOString().slice(0, 10))
            .replace('{{julianDayBegin}}', julianDayBegin).replace('{{yearBegin}}', yearBegin)
            .replace('{{julianDayEnd}}', julianDayEnd)
            .replace('{{yearEnd}}', yearEnd);
        logger.debug('Doing query: ', finalQuery);
        try {
            const result = await ctRegisterMicroservice.requestToMicroservice({
                uri: '/query',
                qs: {
                    sql: finalQuery,
                    threshold,
                    geostore: geostoreId
                },
                method: 'GET',
                json: true
            });
            return result;
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

    static async getLastDateFromDataset(datasetSlug, datasetMainDateField) {

        const query = `select max(${datasetMainDateField}) as lastdate from ${datasetSlug}`;
        logger.debug('Doing query: ', query);
        try {
            const result = await ctRegisterMicroservice.requestToMicroservice({
                uri: '/query',
                qs: {
                    sql: query
                },
                method: 'GET',
                json: true
            });
            return result.data[0].lastdate;
        } catch (error) {
            logger.error(error);
            return null;
        }
    }

}


module.exports = DatasetService;