ghdna/athena-express

View on GitHub
lib/helpers.js

Summary

Maintainability
B
5 hrs
Test Coverage
"use strict";
//helpers.js

const readline = require("readline"),
  csv = require("csvtojson");

let s3Metadata = null;

function startQueryExecution(config) {
  const params = {
    QueryString: config.sql,
    WorkGroup: config.workgroup,
    ResultConfiguration: {
      OutputLocation: config.s3Bucket,
    },
    QueryExecutionContext: {
      Database: config.db,
      Catalog: config.catalog,
    },
  };
  if (config.encryption)
    params.ResultConfiguration.EncryptionConfiguration = config.encryption;

  return new Promise(function (resolve, reject) {
    const startQueryExecutionRecursively = async function () {
      try {
        let data = await config.athena.startQueryExecution(params).promise();
        resolve(data.QueryExecutionId);
      } catch (err) {
        isCommonAthenaError(err.code)
          ? setTimeout(() => {
              startQueryExecutionRecursively();
            }, 2000)
          : reject(err);
      }
    };
    startQueryExecutionRecursively();
  });
}

function checkIfExecutionCompleted(config) {
  let retry = config.retry;
  return new Promise(function (resolve, reject) {
    const keepCheckingRecursively = async function () {
      try {
        let data = await config.athena
          .getQueryExecution({
            QueryExecutionId: config.QueryExecutionId,
          })
          .promise();
        if (data.QueryExecution.Status.State === "SUCCEEDED") {
          retry = config.retry;
          s3Metadata = config.athena
            .getQueryResults({
              QueryExecutionId: config.QueryExecutionId,
              MaxResults: 1,
            })
            .promise();
          resolve(data);
        } else if (data.QueryExecution.Status.State === "FAILED") {
          reject(data.QueryExecution.Status.StateChangeReason);
        } else {
          setTimeout(() => {
            keepCheckingRecursively();
          }, retry);
        }
      } catch (err) {
        if (isCommonAthenaError(err.code)) {
          retry = 2000;
          setTimeout(() => {
            keepCheckingRecursively();
          }, retry);
        } else reject(err);
      }
    };
    keepCheckingRecursively();
  });
}

async function getQueryResultsFromS3(params) {
  const s3Params = {
    Bucket: params.s3Output.split("/")[2],
    Key: params.s3Output.split("/").slice(3).join("/"),
  };

  if (params.statementType === "UTILITY" || params.statementType === "DDL") {
    const input = params.config.s3.getObject(s3Params).createReadStream();
    return { items: await cleanUpNonDML(input) };
  } else if (Boolean(params.config.pagination)) {
    //user wants DML response paginated

    const paginationFactor = Boolean(params.config.NextToken) ? 0 : 1;

    let paginationParams = {
      QueryExecutionId: params.config.QueryExecutionId,
      MaxResults: params.config.pagination + paginationFactor,
      NextToken: params.config.NextToken,
    };

    const queryResults = await params.config.athena
      .getQueryResults(paginationParams)
      .promise();
    if (params.config.formatJson) {
      return {
        items: await cleanUpPaginatedDML(queryResults, paginationFactor),
        nextToken: queryResults.NextToken,
      };
    } else {
      return {
        items: await queryResults,
        nextToken: queryResults.NextToken,
      };
    }
  } else {
    //user doesn't want DML response paginated
    const input = params.config.s3.getObject(s3Params).createReadStream();
    if (params.config.formatJson) {
      return {
        items: await cleanUpDML(input, params.config.ignoreEmpty),
      };
    } else {
      return { items: await getRawResultsFromS3(input) };
    }
  }
}

async function cleanUpPaginatedDML(queryResults, paginationFactor) {
  const dataTypes = await getDataTypes();
  const columnNames = Object.keys(dataTypes).reverse();
  let rowObject = {};
  let unformattedS3RowArray = null;
  let formattedArray = [];

  for (let i = paginationFactor; i < queryResults.ResultSet.Rows.length; i++) {
    unformattedS3RowArray = queryResults.ResultSet.Rows[i].Data;

    for (let j = 0; j < unformattedS3RowArray.length; j++) {
      if (unformattedS3RowArray[j].hasOwnProperty("VarCharValue")) {
        [rowObject[columnNames[j]]] = [unformattedS3RowArray[j].VarCharValue];
      }
    }

    formattedArray.push(addDataType(rowObject, dataTypes));
    rowObject = {};
  }
  return formattedArray;
}

function getRawResultsFromS3(input) {
  let rawJson = [];
  return new Promise(function (resolve, reject) {
    readline
      .createInterface({
        input,
      })
      .on("line", (line) => {
        rawJson.push(line.trim());
      })
      .on("close", function () {
        resolve(rawJson);
      });
  });
}

function getDataTypes() {
  return new Promise(async function (resolve) {
    const columnInfoArray = (await s3Metadata).ResultSet.ResultSetMetadata
      .ColumnInfo;
    let columnInfoArrayLength = columnInfoArray.length;
    let columnInfoObject = {};
    while (columnInfoArrayLength--) {
      [columnInfoObject[columnInfoArray[columnInfoArrayLength].Name]] = [
        columnInfoArray[columnInfoArrayLength].Type,
      ];
    }
    resolve(columnInfoObject);
  });
}

async function cleanUpDML(input, ignoreEmpty) {
  let cleanJson = [];
  const dataTypes = await getDataTypes();
  return new Promise(function (resolve) {
    input.pipe(
      csv({
        ignoreEmpty,
      })
        .on("data", (data) => {
          cleanJson.push(
            addDataType(JSON.parse(data.toString("utf8")), dataTypes)
          );
        })
        .on("finish", function () {
          resolve(cleanJson);
        })
    );
  });
}

function addDataType(input, dataTypes) {
  let updatedObjectWithDataType = {};

  for (const key in input) {
    if (!input[key]) {
      updatedObjectWithDataType[key] = null;
    } else {
      switch (dataTypes[key]) {
        case "varchar":
          updatedObjectWithDataType[key] = input[key];
          break;
        case "boolean":
          updatedObjectWithDataType[key] = JSON.parse(input[key].toLowerCase());
          break;
        case "bigint":
          updatedObjectWithDataType[key] = BigInt(input[key]);
          break;
        case "integer":
        case "tinyint":
        case "smallint":
        case "int":
        case "float":
        case "double":
          updatedObjectWithDataType[key] = Number(input[key]);
          break;
        default:
          updatedObjectWithDataType[key] = input[key];
      }
    }
  }
  return updatedObjectWithDataType;
}

function cleanUpNonDML(input) {
  let cleanJson = [];
  return new Promise(function (resolve) {
    readline
      .createInterface({
        input,
      })
      .on("line", (line) => {
        switch (true) {
          case line.indexOf("\t") > 0:
            line = line.split("\t");
            cleanJson.push({
              [line[0].trim()]: line[1].trim(),
            });
            break;
          default:
            if (line.trim().length) {
              cleanJson.push({
                row: line.trim(),
              });
            }
        }
      })
      .on("close", function () {
        resolve(cleanJson);
      });
  });
}

function validateConstructor(init) {
  if (!init)
    throw new TypeError("Config object not present in the constructor");

  try {
    let aws = init.s3 ? init.s3 : init.aws.config.credentials.accessKeyId;
    let athena = new init.aws.Athena({
      apiVersion: "2017-05-18",
    });
  } catch (e) {
    throw new TypeError(
      "AWS object not present or incorrect in the constructor"
    );
  }
}

function isCommonAthenaError(err) {
  return err === "TooManyRequestsException" ||
    err === "ThrottlingException" ||
    err === "NetworkingError" ||
    err === "UnknownEndpoint"
    ? true
    : false;
}

const lowerCaseKeys = (obj) =>
  Object.keys(obj).reduce((acc, key) => {
    if (obj[key] !== undefined) {
      acc[key.toLowerCase()] = obj[key];
    }
    return acc;
  }, {});

module.exports = {
  validateConstructor,
  startQueryExecution,
  checkIfExecutionCompleted,
  getQueryResultsFromS3,
  lowerCaseKeys,
};