avegao/aws-athena-node-client

View on GitHub
src/index.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import {AthenaClientConfig} from './AthenaClientConfig.js';
import {Queue} from './Queue.js';
import {Query} from './Query.js';
import {Column, ColumnParse} from './Column.js';
import {
    AthenaClient as AwsAthenaClient,
    GetQueryExecutionCommand,
    GetQueryExecutionInput,
    GetQueryResultsCommand,
    GetQueryResultsInput,
    ResultSet,
    Row,
    StartQueryExecutionCommand,
    StartQueryExecutionCommandInput,
    StopQueryExecutionCommand,
    StopQueryExecutionInput,
} from '@aws-sdk/client-athena';
import {setInterval} from 'timers/promises';
import {AthenaClientException} from './exception/AthenaClientException.js';
import {QueryCanceledException} from './exception/QueryCanceledException.js';
import {GetObjectCommand} from '@aws-sdk/client-s3';
import {getSignedUrl} from '@aws-sdk/s3-request-presigner';

const expiration1Day = 60 * 60 * 24;

enum AthenaDataTypeEnum {
    Integer = 'integer',
    Float = 'float',
    Double = 'double',
    Decimal = 'decimal',
    Char = 'char',
    Varchar = 'varchar',
    Boolean = 'boolean',
    Binary = 'binary',
    Date = 'date',
    Timestamp = 'timestamp',
    TimestampWithTz = 'timestamp with time zone',
    Array = 'array',
    Json = 'json',
    Map = 'map',
    String = 'string',
    Struct = 'struct',
    TinyInt = 'tinyint',
    SmallInt = 'smallint',
    BigInt = 'bigint',
}

export interface QueryConfig {
    readonly id?: string;
    readonly parameters?: Record<string, unknown>;
    readonly cacheInMinutes?: number;
}

export interface QueryWithResultsInS3Config extends QueryConfig {
    s3LinkExpirationInSeconds?: number;
}

export class AthenaNodeClient {
    private readonly _client: AwsAthenaClient;
    private readonly _config: AthenaClientConfig;

    private _queue: Queue;

    public constructor(client: AwsAthenaClient, config: AthenaClientConfig) {
        this._client = client;
        this._config = config;
        this._queue = new Queue();
    }

    public async executeQuery<T extends object>(sql: string, config?: QueryConfig): Promise<T[]> {
        const query = await this.executeQueryCommon<T>(sql, config);

        return this.getQueryResults<T>(query);
    }

    /**
     * Execute query in Athena and get S3 URL with CSV file
     */
    public async executeQueryAndGetS3Key(sql: string, config?: QueryConfig): Promise<{ bucket: string; key: string }> {
        const query = await this.executeQueryCommon(sql, config);

        if (query.s3Location == null) {
            throw new Error('Athena no returns results S3 url');
        }

        const [bucket, key] = query.s3Location.replace('s3://', '').split('/', 1);

        return {
            bucket,
            key: `${key ?? ''}${query.athenaId}.csv`,
        };
    }

    public async executeQueryAndGetDownloadSignedUrl(sql: string, config?: QueryWithResultsInS3Config): Promise<string> {
        if (this._config.s3Client == null) {
            throw new Error('[AthenaNodeClient] S3 Client is missing, you must install @aws-sdk/client-s3 and @aws-sdk/s3-request-presigner dependencies and fill s3Client field in configuration');
        }

        const {bucket, key} = await this.executeQueryAndGetS3Key(sql, config);
        const command = new GetObjectCommand({
            Bucket: bucket,
            Key: key,
        });

        return getSignedUrl(this._config.s3Client, command, {
            expiresIn: config?.s3LinkExpirationInSeconds ?? expiration1Day,
        });
    }

    /**
     * Cancel a AWS Athena query
     *
     * @param {string} id Execution query ID
     *
     * @returns {Promise<void>}
     *
     * @memberof AthenaNodeClient
     */
    public async cancelQuery(id: string): Promise<void> {
        const query = this._queue.getQueryById(id);
        const input: StopQueryExecutionInput = {
            QueryExecutionId: query.athenaId,
        };

        await this._client.send(new StopQueryExecutionCommand(input));
    }

    private async executeQueryCommon<T>(sql: string, config?: QueryConfig): Promise<Query<T>> {
        const query = new Query<T>(sql, this._config.waitTimeInSeconds, config?.parameters, config?.id);

        this._queue.addQuery(query);

        query.athenaId = await this.startQueryExecution(query);

        try {
            await this.waitUntilSucceedQuery(query);
        } catch (exception) {
            this._queue.removeQuery(query);

            throw exception;
        }

        return query;
    }

    /**
     * Starts query execution and gets an ID for the operation
     *
     * @private
     * @param {Query} query - Athena request params
     * @returns {Promise<string>} - query execution id
     * @memberof AthenaNodeClient
     */
    private async startQueryExecution<T>(query: Query<T>): Promise<string> {
        const input: StartQueryExecutionCommandInput = {
            QueryExecutionContext: {
                Database: this._config.database,
            },
            QueryString: query.sql,
            ResultConfiguration: {
                OutputLocation: this._config.bucketUri,
            },
        };

        if (this._config.workGroup != null) {
            input.WorkGroup = this._config.workGroup;
        }

        if (query.cacheInMinutes ?? 0 > 0) {
            input.ResultReuseConfiguration = {
                ResultReuseByAgeConfiguration: {
                    Enabled: true,
                    MaxAgeInMinutes: query.cacheInMinutes,
                },
            };
        }

        const response = await this._client.send(new StartQueryExecutionCommand(input));

        if (response.QueryExecutionId != null) {
            return response.QueryExecutionId;
        } else {
            throw new Error('[AthenaNodeClient] Athena no returns query execution id. This may be a problem in AWS side.');
        }
    }

    /**
     * Processes query results and parses them
     *
     * @private
     * @template T
     *
     * @param {Query<T>} query
     * @param {string} nextToken
     *
     * @returns {Promise<T[]>} - parsed query result rows
     * @memberof AthenaNodeClient
     */
    private async getQueryResults<T extends object>(query: Query<T>, nextToken?: string): Promise<T[]> {
        const input: GetQueryResultsInput = {
            NextToken: nextToken,
            QueryExecutionId: query.athenaId,
        };

        const response = await this._client.send(new GetQueryResultsCommand(input));
        const results = response.ResultSet;

        if (results == null) {
            throw new Error('[AthenaNodeClient] No query results. This may be a problem in AWS side.');
        }

        if (!query.hasColumns()) {
            query.columns = this.setColumnParsers(results);
        }

        const isFirstPage = !query.hasResults() && nextToken == null;

        query.results.push(...this.parseRows<T>(results.Rows ?? [], query.columns, isFirstPage));

        if (response.NextToken != null) {
            query.results = await this.getQueryResults<T>(query, response.NextToken);
        }

        return query.results;
    }

    private parseRow<T extends object>(row: Row, columns: Column[]): T {
        const result = {} as T;
        const dataLength = row.Data?.length ?? 0;

        for (let rowDataIndex = 0; rowDataIndex < dataLength; rowDataIndex++) {
            const rowData = row.Data?.[rowDataIndex];
            const column = columns[rowDataIndex];
            const columnName = column.name;

            let value: unknown | null = null;

            if (rowData?.VarCharValue != null) {
                value = column.parse(rowData.VarCharValue);
            }

            Reflect.set(result, columnName, value);
        }

        return result;
    }

    private parseRows<T extends object>(rows: Row[], columns: Column[], isFirstPage = false): T[] {
        const results: T[] = [];

        // Start with 1 when first line is column title (in first page)
        for (let rowIndex = (isFirstPage) ? 1 : 0, len = rows.length; rowIndex < len; rowIndex++) {
            const row = rows[rowIndex];

            results.push(this.parseRow(row, columns));
        }

        return results;
    }

    private setColumnParsers(results: ResultSet): Column[] {
        return results.ResultSetMetadata?.ColumnInfo?.map((columnInfo, index) => {
            const name = columnInfo.Name ?? `column_${index}`;
            let parse: ColumnParse;

            switch (columnInfo.Type as AthenaDataTypeEnum) {
                case AthenaDataTypeEnum.Integer:
                case AthenaDataTypeEnum.TinyInt:
                case AthenaDataTypeEnum.SmallInt:
                case AthenaDataTypeEnum.BigInt:
                case AthenaDataTypeEnum.Float:
                case AthenaDataTypeEnum.Double:
                case AthenaDataTypeEnum.Decimal:
                    parse = Column.parseNumber;
                    break;

                case AthenaDataTypeEnum.Char:
                case AthenaDataTypeEnum.Varchar:
                case AthenaDataTypeEnum.String:
                    parse = Column.parseString;
                    break;

                case AthenaDataTypeEnum.Boolean:
                    parse = Column.parseBoolean;
                    break;

                case AthenaDataTypeEnum.Date:
                case AthenaDataTypeEnum.Timestamp:
                case AthenaDataTypeEnum.TimestampWithTz:
                    parse = Column.parseDate;
                    break;

                case AthenaDataTypeEnum.Array:
                    parse = Column.parseArray;
                    break;
                case AthenaDataTypeEnum.Json:
                    parse = Column.parseJson;
                    break;
                case AthenaDataTypeEnum.Binary:
                case AthenaDataTypeEnum.Map:
                case AthenaDataTypeEnum.Struct:
                default:
                    throw new Error(`Column type '${columnInfo.Type}' not supported`);
            }

            return new Column(name, parse);
        }) ?? [];
    }

    /**
     * Checks the query execution status until the query sends SUCCEEDED signal
     *
     * @private
     * @param {Query} query - the query
     * @returns {Promise<void>} - promise that will resolve once the operation has finished
     * @memberof AthenaNodeClient
     */
    private async waitUntilSucceedQuery(query: Query<unknown>): Promise<void> {
        const input: GetQueryExecutionInput = {
            QueryExecutionId: query.athenaId,
        };

        const waitTime = this._config.waitTimeInSeconds * 1000;

        for await (const _ of setInterval(waitTime)) {
            const response = await this._client.send(new GetQueryExecutionCommand(input));
            query.status = response.QueryExecution?.Status?.State;

            switch (query.status) {
                case 'SUCCEEDED':
                    query.s3Location = response?.QueryExecution?.ResultConfiguration?.OutputLocation;

                    return;
                case 'QUEUED':
                case 'RUNNING':
                    break;
                case 'CANCELLED':
                    throw new QueryCanceledException();
                case 'FAILED':
                    throw new AthenaClientException('Query failed');
                default:
                    throw new AthenaClientException(`Query Status '${query.status}' not supported`);
            }
        }
    }
}