resource-watch/adapter-bigquery

View on GitHub
app/src/routes/api/v1/bigquery.router.js

Summary

Maintainability
C
1 day
Test Coverage
D
66%
const Router = require('koa-router');
const logger = require('logger');
const { RWAPIMicroservice } = require('rw-api-microservice-node');
const BigQueryService = require('services/bigquery.service');
const QueryService = require('services/query.service');
const FieldSerializer = require('serializers/field.serializer');
const passThrough = require('stream').PassThrough;
const ErrorSerializer = require('serializers/error.serializer');
const DatasetMiddleware = require('middleware/dataset.middleware');

const router = new Router({
    prefix: '/bigquery',
});

const serializeObjToQuery = (obj) => Object.keys(obj).reduce((a, k) => {
    a.push(`${k}=${encodeURIComponent(obj[k])}`);
    return a;
}, []).join('&');

class BigQueryRouter {

    static getCloneUrl(url, idDataset) {
        return {
            http_method: 'POST',
            url: `/dataset/${idDataset}/clone`,
            body: {
                dataset: {
                    datasetUrl: url.replace('/bigquery', ''),
                    application: ['your', 'apps']
                }
            }
        };
    }

    static async query(ctx) {
        ctx.set('Content-type', 'application/json');
        const { format } = ctx.query;
        const cloneUrl = BigQueryRouter.getCloneUrl(ctx.request.url, ctx.params.dataset);
        try {
            ctx.body = passThrough();
            const queryService = new QueryService(ctx.query.sql, ctx.request.body.dataset, ctx.body, cloneUrl, false, format);
            queryService.execute();
        } catch (err) {
            ctx.body = ErrorSerializer.serializeError(err.statusCode || 500, err.error && err.error.error ? err.error.error[0] : err.message);
            ctx.status = 500;
            ctx.body.end();
        }
    }

    static async download(ctx) {
        try {
            ctx.body = passThrough();
            const format = ctx.query.format ? ctx.query.format : 'csv';
            let mimetype;
            switch (format) {

                case 'csv':
                    mimetype = 'text/csv';
                    break;
                case 'json':
                default:
                    mimetype = 'application/json';
                    break;

            }

            const cloneUrl = BigQueryRouter.getCloneUrl(ctx.request.url, ctx.params.dataset);
            const queryService = await new QueryService(ctx.query.sql, ctx.request.body.dataset, ctx.body, cloneUrl, true, format);
            ctx.set('Content-disposition', `attachment; filename=${ctx.request.body.dataset.id}.${format}`);
            ctx.set('Content-type', mimetype);
            queryService.execute();
        } catch (err) {
            ctx.body = ErrorSerializer.serializeError(err.statusCode || 500, err.error && err.error.error ? err.error.error[0] : err.message);
            ctx.status = 500;
        }
    }

    static async fields(ctx) {
        logger.info(`Obtaining fields of dataset ${ctx.request.body.dataset.id}`);
        const bigQueryService = new BigQueryService(ctx.request.body.dataset.tableName);
        const fields = await bigQueryService.getFields();
        ctx.body = FieldSerializer.serialize(fields, ctx.request.body.dataset.tableName);
    }

    static async registerDataset(ctx) {
        logger.info('Registering dataset with data', ctx.request.body.connector);
        try {
            const bigQueryService = new BigQueryService(ctx.request.body.connector.tableName);
            await bigQueryService.getFields();
            await RWAPIMicroservice.requestToMicroservice({
                method: 'PATCH',
                uri: `/v1/dataset/${ctx.request.body.connector.id}`,
                body: {
                    dataset: {
                        status: 1
                    }
                },
                json: true,
                headers: {
                    'x-api-key': ctx.request.headers['x-api-key'],
                }
            });
        } catch (e) {
            await RWAPIMicroservice.requestToMicroservice({
                method: 'PATCH',
                uri: `/v1/dataset/${ctx.request.body.connector.id}`,
                body: {
                    dataset: {
                        status: 2,
                        errorMessage: `${e.name} - ${e.message}`
                    }
                },
                json: true,
                headers: {
                    'x-api-key': ctx.request.headers['x-api-key'],
                }
            });
        }
        ctx.body = {};
    }

}

const toSQLMiddleware = async (ctx, next) => {
    const options = {
        method: 'GET',
        json: true,
        resolveWithFullResponse: true,
        simple: false,
        headers: {
            'x-api-key': ctx.request.headers['x-api-key'],
        }
    };
    if (!ctx.query.sql && !ctx.request.body.sql && !ctx.query.outFields && !ctx.query.outStatistics) {
        ctx.throw(400, 'sql or fs required');
        return;
    }
    // remove it in the future when join is implemented in converter
    let sql = null;
    if (ctx.query.sql || ctx.request.body.sql) {
        logger.debug('Checking sql correct');
        const params = { ...ctx.query, ...ctx.request.body };
        sql = params.sql;
        options.uri = `/v1/convert/sql2SQL?sql=${params.sql}&experimental=true`;
        if (params.geostore) {
            options.uri += `&geostore=${params.geostore}`;
        }
        if (params.geojson) {
            options.body = {
                geojson: params.geojson
            };
            options.method = 'POST';
        }

    } else {
        logger.debug('Obtaining sql from featureService');
        const fs = { ...ctx.request.body };
        delete fs.dataset;
        const query = serializeObjToQuery(ctx.request.query);
        const body = fs;
        const resultQuery = { ...query };

        if (resultQuery) {
            options.uri = `/v1/convert/fs2SQL${resultQuery}&tableName=${ctx.request.body.dataset.tableName}`;
        } else {
            options.uri = `/v1/convert/fs2SQL?tableName=${ctx.request.body.dataset.tableName}`;
        }
        options.body = body;
        options.method = 'POST';
    }

    try {
        const result = await RWAPIMicroservice.requestToMicroservice(options);

        if (result.statusCode === 204 || result.statusCode === 200) {
            // ctx.query.sql = result.body.data.attributes.query;

            // remove it in the future when join is implemented in converter
            ctx.query.sql = sql;
            logger.info('Query', ctx.query.sql);
            await next();
        } else if (result.statusCode === 400) {
            ctx.status = result.statusCode;
            ctx.body = result.body;
        } else {
            ctx.throw(result.statusCode, result.body);
        }

    } catch (e) {
        if (e.errors && e.errors.length > 0 && e.errors[0].status >= 400 && e.errors[0].status < 500) {
            ctx.status = e.errors[0].status;
            ctx.body = e;
        } else {
            throw e;
        }
    }
};

router.get('/query/:dataset', DatasetMiddleware.getDatasetById, toSQLMiddleware, BigQueryRouter.query);
router.get('/download/:dataset', DatasetMiddleware.getDatasetById, toSQLMiddleware, BigQueryRouter.download);
router.post('/download/:dataset', DatasetMiddleware.getDatasetById, toSQLMiddleware, BigQueryRouter.download);
router.get('/fields/:dataset', DatasetMiddleware.getDatasetById, BigQueryRouter.fields);
router.post('/rest-datasets/bigquery', BigQueryRouter.registerDataset);

module.exports = router;