doitintl/bigquery-grafana

View on GitHub
src/datasource.ts

Summary

Maintainability
D
1 day
Test Coverage
import _ from 'lodash';
import moment from 'moment';
import BigQueryQuery from './bigquery_query';
import ResponseParser, { IResultFormat } from './response_parser';
import SqlParser from './sql_parser';
import { v4 as generateID } from 'uuid';

const Shifted = '_shifted';
function sleep(ms) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}

export class BigQueryDatasource {
  public static formatBigqueryError(error) {
    let message = 'BigQuery: ';
    let status = '';
    let data = '';
    if (error !== undefined) {
      message += error.message ? error.message : 'Cannot connect to BigQuery API';
      status = error.code;
      data = error.errors[0].reason + ': ' + error.message;
    }
    return {
      data: {
        message: data,
      },
      status,
      statusText: message,
    };
  }

  public static _getShiftPeriod(strInterval) {
    const shift = strInterval.match(/\d+/)[0];
    strInterval = strInterval.substr(shift.length, strInterval.length);

    if (strInterval.trim() === 'min') {
      strInterval = 'm';
    }
    return [strInterval, shift];
  }

  public static _extractFromClause(sql: string) {
    return SqlParser.getProjectDatasetTableFromSql(sql);
  }

  public static _FindTimeField(sql, timeFields) {
    const select = sql.search(/select/i);
    const from = sql.search(/from/i);
    const fields = sql.substring(select + 6, from);
    const splitFrom = fields.split(',');
    let col;
    for (let i = 0; i < splitFrom.length; i++) {
      let field = splitFrom[i].search(/ AS /i);
      if (field === -1) {
        field = splitFrom[i].length;
      }
      col = splitFrom[i].substring(0, field).trim().replace('`', '').replace('`', '');
      col = col.replace(/\$__timeGroupAlias\(/g, '');
      col = col.replace(/\$__timeGroup\(/g, '');
      col = col.replace(/\$__timeFilter\(/g, '');
      col = col.replace(/\$__timeFrom\(/g, '');
      col = col.replace(/\$__timeTo\(/g, '');
      col = col.replace(/\$__millisTimeTo\(/g, '');
      col = col.replace(/\$__millisTimeFrom\(/g, '');
      for (const fl of timeFields) {
        if (fl.text === col) {
          return fl;
        }
      }
    }
    return null;
  }
  private static _handleError(error) {
    if (error.cancelled === true) {
      return [];
    }
    let msg = error;
    if (error.data !== undefined) {
      msg = error.data.error;
    }
    throw BigQueryDatasource.formatBigqueryError(msg);
  }
  private static _createTimeShiftQuery(query) {
    const res = BigQueryQuery.getTimeShift(query.rawSql);
    if (!res) {
      return res;
    }
    const copy = query.constructor();
    for (const attr in query) {
      if (query.hasOwnProperty(attr)) {
        copy[attr] = query[attr];
      }
    }
    copy.rawSql = BigQueryQuery.replaceTimeShift(copy.rawSql);
    copy.format += '#' + res;
    copy.refId += Shifted + '_' + res;
    return copy;
  }

  private static _setupTimeShiftQuery(query, options) {
    const index = query.format.indexOf('#');
    const copy = options.constructor();
    for (const attr in options) {
      if (options.hasOwnProperty(attr)) {
        copy[attr] = options[attr];
      }
    }
    if (index === -1) {
      return copy;
    }
    let strInterval = query.format.substr(index + 1, query.format.len);
    const res = BigQueryDatasource._getShiftPeriod(strInterval);
    strInterval = res[0];
    if (!['s', 'min', 'h', 'd', 'w', 'm', 'w', 'y', 'M'].includes(strInterval)) {
      return copy;
    }
    query.format = query.format.substr(0, index);
    strInterval = res[0];
    const shift = res[1];

    if (strInterval === 'min') {
      strInterval = 'm';
    }
    copy.range.from = options.range.from.subtract(parseInt(shift, 10), strInterval);
    copy.range.to = options.range.to.subtract(parseInt(shift, 10), strInterval);
    return copy;
  }

  private static _updatePartition(q, options, convertToUTC = false) {
    if (q.indexOf('AND _PARTITIONTIME >= ') < 1) {
      return q;
    }
    if (q.indexOf('AND _PARTITIONTIME <') < 1) {
      return q;
    }
    const from = q.substr(q.indexOf('AND _PARTITIONTIME >= ') + 22, 21);

    const newFrom = "'" + BigQueryQuery.formatDateToString(options.range.from._d, convertToUTC, '-', true) + "'";
    q = q.replace(from, newFrom);
    const to = q.substr(q.indexOf('AND _PARTITIONTIME < ') + 21, 21);
    const newTo = "'" + BigQueryQuery.formatDateToString(options.range.to._d, convertToUTC, '-', true) + "'";

    q = q.replace(to, newTo) + '\n ';
    return q;
  }

  private static _updateTableSuffix(q, options, convertToUTC = false) {
    const ind = q.indexOf('AND  _TABLE_SUFFIX BETWEEN ');
    if (ind < 1) {
      return q;
    }
    const from = q.substr(ind + 28, 8);

    const newFrom = BigQueryQuery.formatDateToString(options.range.from._d, convertToUTC);
    q = q.replace(from, newFrom);
    const to = q.substr(ind + 43, 8);
    const newTo = BigQueryQuery.formatDateToString(options.range.to._d, convertToUTC);
    q = q.replace(to, newTo) + '\n ';
    return q;
  }
  public authenticationType: string;
  public projectName: string;
  public readonly name: string;
  public readonly id: number;
  public readonly type: string;
  public readonly uid: string;
  private readonly url: string;
  private readonly baseUrl: string;
  private jsonData: any;
  private responseParser: ResponseParser;
  private queryModel: BigQueryQuery;
  private runInProject: string;
  private processingLocation: string;
  private queryPriority: string;

  /** @ngInject */
  constructor(instanceSettings, private backendSrv, private $q, private templateSrv) {
    this.name = instanceSettings.name;
    this.id = instanceSettings.id;
    this.type = instanceSettings.type;
    this.uid = instanceSettings.uid;
    this.url = instanceSettings.url;
    this.jsonData = instanceSettings.jsonData;
    this.baseUrl = `/bigquery/`;

    this.responseParser = new ResponseParser(this.$q);
    this.queryModel = new BigQueryQuery({});
    this.authenticationType = instanceSettings.jsonData.authenticationType || 'jwt';
    (async () => {
      this.projectName = instanceSettings.jsonData.defaultProject || (await this.getDefaultProject());
    })();
    this.runInProject =
      this.jsonData.flatRateProject && this.jsonData.flatRateProject.length
        ? this.jsonData.flatRateProject
        : this.projectName;
    this.processingLocation =
      this.jsonData.processingLocation && this.jsonData.processingLocation.length
        ? this.jsonData.processingLocation
        : undefined;
    this.queryPriority = this.jsonData.queryPriority;
  }

  public async query(options) {
    const queries = _.filter(options.targets, (target) => {
      return target.hide !== true;
    }).map((target) => {
      const queryModel = new BigQueryQuery(target, this.templateSrv, options.scopedVars);
      this.queryModel = queryModel;
      return {
        queryPriority: this.queryPriority,
        datasourceId: this.id,
        format: target.format,
        intervalMs: options.intervalMs,
        maxDataPoints: options.maxDataPoints,
        metricColumn: target.metricColumn,
        partitioned: target.partitioned,
        partitionedField: target.partitionedField,
        rawSql: queryModel.render(this.interpolateVariable),
        refId: target.refId,
        sharded: target.sharded,
        table: target.table,
        timeColumn: target.timeColumn,
        timeColumnType: target.timeColumnType,
      };
    });

    if (queries.length === 0) {
      return this.$q.when({ data: [] });
    }
    _.map(queries, (query) => {
      const newQuery = BigQueryDatasource._createTimeShiftQuery(query);
      if (newQuery) {
        queries.push(newQuery);
      }
    });
    let modOptions;
    const allQueryPromise = _.map(queries, (query) => {
      const tmpQ = this.queryModel.target.rawSql;
      if (this.queryModel.target.rawQuery === false) {
        this.queryModel.target.metricColumn = query.metricColumn;
        this.queryModel.target.partitioned = query.partitioned;
        this.queryModel.target.partitionedField = query.partitionedField;
        this.queryModel.target.rawSql = query.rawSql;
        this.queryModel.target.sharded = query.sharded;
        this.queryModel.target.table = query.table;
        this.queryModel.target.timeColumn = query.timeColumn;
        this.queryModel.target.timeColumnType = query.timeColumnType;
        modOptions = BigQueryDatasource._setupTimeShiftQuery(query, options);
        const q = this.setUpQ(modOptions, options, query);
        console.log(q);
        this.queryModel.target.rawSql = q;
        return this.doQuery(q, options.panelId + query.refId, query.queryPriority).then((response) => {
          return ResponseParser.parseDataQuery(response, query.format);
        });
      } else {
        // Fix raw sql
        const sqlWithNoVariables = this.templateSrv.replace(tmpQ, options.scopedVars, this.interpolateVariable);
        const [project, dataset, table] = BigQueryDatasource._extractFromClause(sqlWithNoVariables);
        this.getDateFields(project, dataset, table)
          .then((dateFields) => {
            const tm = BigQueryDatasource._FindTimeField(tmpQ, dateFields);
            this.queryModel.target.timeColumn = tm.text;
            this.queryModel.target.timeColumnType = tm.value;
            this.queryModel.target.table = table;
          })
          .catch((err) => {
            console.log(err);
          });
        this.queryModel.target.rawSql = query.rawSql;
        modOptions = BigQueryDatasource._setupTimeShiftQuery(query, options);
        const q = this.setUpQ(modOptions, options, query);
        console.log(q);
        return this.doQuery(q, options.panelId + query.refId, query.queryPriority).then((response) => {
          return ResponseParser.parseDataQuery(response, query.format);
        });
      }
    });
    return this.$q.all(allQueryPromise).then((responses): any => {
      const data = [];
      if (responses) {
        for (const response of responses) {
          if (response.type && response.type === 'table') {
            data.push(response);
          } else {
            for (const dp of response) {
              data.push(dp);
            }
          }
        }
      }
      for (const d of data) {
        if (typeof d.target !== 'undefined' && d.target.search(Shifted) > -1) {
          const res = BigQueryDatasource._getShiftPeriod(
            d.target.substring(d.target.lastIndexOf('_') + 1, d.target.length)
          );
          const shiftPeriod = res[0];
          const shiftVal = res[1];
          for (let i = 0; i < d.datapoints.length; i++) {
            d.datapoints[i][1] = moment(d.datapoints[i][1]).subtract(shiftVal, shiftPeriod).valueOf();
          }
        }
      }
      return { data };
    });
  }

  public metricFindQuery(query, optionalOptions) {
    let refId = 'tempvar';
    if (optionalOptions && optionalOptions.variable && optionalOptions.variable.name) {
      refId = optionalOptions.variable.name;
    }
    const interpolatedQuery = {
      datasourceId: this.id,
      format: 'table',
      rawSql: this.templateSrv.replace(query, {}, this.interpolateVariable),
      refId,
    };
    return this.doQuery(interpolatedQuery.rawSql, refId, query.queryPriority).then(metricData =>
      ResponseParser.parseDataQuery(metricData, "var")
    );
  }
  public async testDatasource() {
    let status = 'success';
    let message = 'Successfully queried the BigQuery API.';
    const defaultErrorMessage = 'Cannot connect to BigQuery API';
    if (!this.projectName) {
      try {
        await this.getDefaultProject();
      } catch (error) {
        message = error.statusText ? error.statusText : defaultErrorMessage;
      }
    }
    try {
      const path = `v2/projects/${this.projectName}/datasets`;
      const response = await this.doRequest(`${this.baseUrl}${path}`);
      if (response.status !== 200) {
        status = 'error';
        message = response.statusText ? response.statusText : defaultErrorMessage;
      }
    } catch (error) {
      message = error.statusText ? error.statusText : defaultErrorMessage;
    }
    try {
      const path = `v2/projects/${this.projectName}/jobs/no-such-jobs`;
      const response = await this.doRequest(`${this.baseUrl}${path}`);
      if (response.status !== 200) {
        status = 'error';
        message = response.statusText ? response.statusText : defaultErrorMessage;
      }
    } catch (error) {
      if (error.status !== 404) {
        message = error.statusText ? error.statusText : defaultErrorMessage;
      }
    }
    return {
      message,
      status,
    };
  }

  public async getProjects(): Promise<IResultFormat[]> {
    const path = `v2/projects`;
    const data = await this.paginatedResults(path, 'projects');
    return ResponseParser.parseProjects(data);
  }

  public async getDatasets(projectName): Promise<IResultFormat[]> {
    const path = `v2/projects/${projectName}/datasets`;
    const data = await this.paginatedResults(path, 'datasets');
    return ResponseParser.parseDatasets(data);
  }

  public async getTables(projectName: string, datasetName: string): Promise<IResultFormat[]> {
    const path = `v2/projects/${projectName}/datasets/${datasetName}/tables`;
    const data = await this.paginatedResults(path, 'tables');
    return new ResponseParser(this.$q).parseTabels(data);
  }

  public async getTableFields(
    projectName: string,
    datasetName: string,
    tableName: string,
    filter
  ): Promise<IResultFormat[]> {
    const path = `v2/projects/${projectName}/datasets/${datasetName}/tables/${tableName}`;
    const data = await this.paginatedResults(path, 'schema.fields');
    return ResponseParser.parseTableFields(data, filter);
  }

  public async getDateFields(projectName: string, datasetName: string, tableName: string) {
    return this.getTableFields(projectName, datasetName, tableName, ['DATE', 'TIMESTAMP', 'DATETIME']);
  }
  public async getDefaultProject() {
    try {
      if (this.authenticationType === 'gce' || !this.projectName) {
        const data = await this.getProjects();
        this.projectName = data[0].value;
        if (!this.runInProject) {
          this.runInProject = this.projectName;
        }
        return data[0].value;
      } else {
        return this.projectName;
      }
    } catch (error) {
      return (this.projectName = '');
    }
  }

  public annotationQuery(options) {
    const path = `v2/projects/${this.runInProject}/queries`;
    const url = this.url + `${this.baseUrl}${path}`;
    if (!options.annotation.rawQuery) {
      return this.$q.reject({
        message: 'Query missing in annotation definition',
      });
    }
    const rawSql = this.templateSrv.replace(options.annotation.rawQuery, options.scopedVars, this.interpolateVariable);

    const query = {
      datasourceId: this.id,
      format: 'table',
      rawSql,
      refId: options.annotation.name,
    };
    this.queryModel.target.rawSql = query.rawSql;
    [query.rawSql,,] = this.queryModel.expend_macros(options);
    return this.backendSrv
      .datasourceRequest({
        data: {
          priority: this.queryPriority,
          from: options.range.from.valueOf().toString(),
          query: query.rawSql,
          to: options.range.to.valueOf().toString(),
          useLegacySql: false,
          useQueryCache: true,
        },
        method: 'POST',
        requestId: options.annotation.name,
        url,
      })
      .then((data) => this.responseParser.transformAnnotationResponse(options, data));
  }
  private setUpQ(modOptions, options, query) {
    let [q, hasMacro, convertToUTC] = this.queryModel.expend_macros(modOptions);
    if (q) {
      q = this.setUpPartition(q, query.partitioned, query.partitionedField, modOptions, hasMacro, convertToUTC);
      q = BigQueryDatasource._updatePartition(q, modOptions, convertToUTC);
      q = BigQueryDatasource._updateTableSuffix(q, modOptions, convertToUTC);
      if (query.refId.search(Shifted) > -1) {
        q = this._updateAlias(q, modOptions, query.refId);
      }
      const limit = q.match(/[^]+(\bLIMIT\b)/gi);
      if (limit == null) {
        const limitStatement = ' LIMIT ' + options.maxDataPoints;
        const limitPosition = q.match(/\$__limitPosition/g);
        if (limitPosition !== null) {
          q = q.replace(/\$__limitPosition/g, limitStatement);
        } else {
          q += limitStatement;
        }
      }
    }
    return q;
  }
  /**
   * Add partition to query unless it has one OR already being ranged by other condition
   * @param query
   * @param isPartitioned
   * @param partitionedField
   * @param options
   */
  private setUpPartition(query, isPartitioned, partitionedField, options, hasMacro = false, convertToUTC = false) {
    partitionedField = partitionedField ? partitionedField : '_PARTITIONTIME';
    const hasTimeFilter = !!(BigQueryQuery.hasDateFilter(query.split(/where/gi)[1] || "") || hasMacro);
    if (isPartitioned && !hasTimeFilter) {
      const { from: { _d: fromD }, to: { _d: toD } } = options.range;
      const from = `${partitionedField} >= '${BigQueryQuery.formatDateToString(fromD, convertToUTC, '-', true)}'`;
      const to = `${partitionedField} < '${BigQueryQuery.formatDateToString(toD, convertToUTC, '-', true)}'`;
      const partition = `where ${from} AND ${to} AND `;
      if (query.match(/where/i)) query = query.replace(/where/i, partition);
      else {
        const reg = /from ('|`|"|){1}(.*?)('|`|"|){1} as ('|`|"|)(\S*)('|`|"|){1}|from ('|`|"|){1}(\S*)('|`|"|){1}/i;
        const fromMatch = query.match(reg);
        query = query.replace(reg, `${fromMatch} ${fromMatch}`);
      }
    }
    return query;
  }
  private async doRequest(url, requestId = 'requestId', maxRetries = 3) {
    return this.backendSrv
      .datasourceRequest({
        method: 'GET',
        requestId: generateID(),
        url: this.url + url
      })
      .then(result => {
        if (result.status !== 200) {
          if (result.status >= 500 && maxRetries > 0) {
            return this.doRequest(url, requestId, maxRetries - 1);
          }
          throw BigQueryDatasource.formatBigqueryError(result.data.error);
        }
        return result;
      })
      .catch(error => {
        if (maxRetries > 0) {
          return this.doRequest(url, requestId, maxRetries - 1);
        }
        if (error.cancelled === true) {
          return [];
        }
        return BigQueryDatasource._handleError(error);
      });
  }

  private async doQueryRequest(query, requestId, priority, maxRetries = 3) {
    const location = this.queryModel.target.location || this.processingLocation || 'US';
    let data,
      queryiesOrJobs = 'queries';
    data = { priority: priority, location, query, useLegacySql: false, useQueryCache: true }; //ExternalDataConfiguration
    if (priority.toUpperCase() === 'BATCH') {
      queryiesOrJobs = 'jobs';
      data = { configuration: { query: { query, priority } } };
    }
    const path = `v2/projects/${this.runInProject}/${queryiesOrJobs}`;
    const url = this.url + `${this.baseUrl}${path}`;
    return this.backendSrv
      .datasourceRequest({
        data: data,
        method: 'POST',
        requestId,
        url,
      })
      .then((result) => {
        if (result.status !== 200) {
          if (result.status >= 500 && maxRetries > 0) {
            return this.doQueryRequest(query, requestId, priority, maxRetries - 1);
          }
          throw BigQueryDatasource.formatBigqueryError(result.data.error);
        }
        return result;
      })
      .catch((error) => {
        if (maxRetries > 0) {
          return this.doQueryRequest(query, requestId, priority, maxRetries - 1);
        }
        if (error.cancelled === true) {
          return [];
        }
        return BigQueryDatasource._handleError(error);
      });
  }
  private async _waitForJobComplete(queryResults, requestId, jobId) {
    let sleepTimeMs = 100;
    console.log('New job id: ', jobId);
    const location = this.queryModel.target.location || this.processingLocation || 'US';
    const path = `v2/projects/${this.runInProject}/queries/` + jobId + '?location=' + location;
    while (!queryResults.data.jobComplete) {
      await sleep(sleepTimeMs);
      sleepTimeMs *= 2;
      queryResults = await this.doRequest(`${this.baseUrl}${path}`, requestId);
      console.log('wating for job to complete ', jobId);
    }
    console.log('Job Done ', jobId);
    return queryResults;
  }

  private async _getQueryResults(queryResults, rows, requestId, jobId) {
    while (queryResults.data.pageToken) {
      const location = this.queryModel.target.location || this.processingLocation || 'US';
      const path =
        `v2/projects/${this.runInProject}/queries/` +
        jobId +
        '?pageToken=' +
        queryResults.data.pageToken +
        '&location=' +
        location;
      queryResults = await this.doRequest(`${this.baseUrl}${path}`, requestId);
      if (queryResults.length === 0) {
        return rows;
      }
      rows = rows.concat(queryResults.data.rows);
      console.log('getting results for: ', jobId);
    }
    return rows;
  }

  private async doQuery(query, requestId, priority = 'INTERACTIVE') {
    if (!query) {
      return {
        rows: null,
        schema: null,
      };
    }
    let notReady = false;
    ['-- time --', '-- value --'].forEach((element) => {
      if (query.indexOf(element) !== -1) {
        notReady = true;
      }
    });
    if (notReady) {
      return {
        rows: null,
        schema: null,
      };
    }
    let queryResults = await this.doQueryRequest(
      //"tableDefinitions": {
      //   string: {
      //     object (ExternalDataConfiguration)
      //   },
      //   ...
      // },
      query,
      requestId,
      priority
    );
    if (queryResults.length === 0) {
      return {
        rows: null,
        schema: null,
      };
    }
    const jobId = queryResults.data.jobReference.jobId;
    queryResults = await this._waitForJobComplete(queryResults, requestId, jobId);
    if (queryResults.length === 0) {
      return {
        rows: null,
        schema: null,
      };
    }
    let rows = queryResults.data.rows;
    const schema = queryResults.data.schema;
    rows = await this._getQueryResults(queryResults, rows, requestId, jobId);
    return {
      rows,
      schema,
    };
  }

  private interpolateVariable = (value, variable) => {
    if (typeof value === 'string') {
      if (variable.multi || variable.includeAll) {
        return BigQueryQuery.quoteLiteral(value);
      } else {
        return value;
      }
    }

    if (typeof value === 'number') {
      return value;
    }

    const quotedValues = _.map(value, (v) => {
      return BigQueryQuery.quoteLiteral(v);
    });
    return quotedValues.join(',');
  };

  private async paginatedResults(path, dataName) {
    let queryResults = await this.doRequest(`${this.baseUrl}${path}`);
    let data = queryResults.data;
    if (!data) return data;
    const dataList = dataName.split('.');
    dataList.forEach(element => {
      if (data && data[element]) data = data[element];
    });
    while (queryResults && queryResults.data && queryResults.data.nextPageToken) {
      queryResults = await this.doRequest(`${this.baseUrl}${path}` + '?pageToken=' + queryResults.data.nextPageToken);
      dataList.forEach(element => {
        data = data.concat(queryResults.data[element]);
      });
    }
    return data;
  }
  private _updateAlias(q, options, shiftstr) {
    if (shiftstr !== undefined) {
      const index = shiftstr.search(Shifted);
      const shifted = shiftstr.substr(index, shiftstr.length);
      for (const al of options.targets[0].select[0]) {
        if (al.type === 'alias') {
          q = q.replace('AS ' + al.params[0], 'AS ' + al.params[0] + shifted);
          return q;
        }
      }
      const aliasshiftted = [options.targets[0].select[0][0].params[0] + shifted];
      const oldSelect = this.queryModel.buildValueColumn(options.targets[0].select[0]);
      const newSelect = this.queryModel.buildValueColumn([
        options.targets[0].select[0][0],
        options.targets[0].select[0][1],
        { type: 'alias', params: [aliasshiftted] },
      ]);
      q = q.replace(oldSelect, newSelect);
    }
    return q;
  }
}