pkg/plugin.go
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"cloud.google.com/go/bigquery"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"golang.org/x/oauth2/jwt"
)
type QueryModel struct {
Format string `json:"format"`
// Constant string `json:"constant"`
Dataset string `json:"dataset"`
// Group []Group `json:"group"`
MetricColumn string `json:"metricColumn"`
OrderByCol string/*int32*/ `json:"orderByCol"`
OrderBySort string/*int32*/ `json:"orderBySort"`
Partitioned bool `json:"partitioned"`
PartitionedField string `json:"partitionedField"`
ProjectID string `json:"project"`
RawQuery bool `json:"rawQuery"`
RawSQL string `json:"rawSql"`
RefID string `json:"refId"`
// Select []string `json:"select"`
Sharded bool `json:"sharded"`
Table string `json:"table"`
TimeColumn string `json:"timeColumn"`
TimeColumnType string `json:"timeColumnType"`
Location string `json:"location"`
// Where []string `json:"where"`
}
// JSONData holds the req.PluginContext.DataSourceInstanceSettings.JSONData struct
type JSONData struct {
AuthenticationType string `json:"authenticationType"`
ClientEmail string `json:"clientEmail"`
DefaultProject string `json:"defaultProject"`
ProcessingLocation string `json:"processingLocation"`
QueryPriority string `json:"queryPriority"`
TokenURI string `json:"tokenUri"`
}
// BigQueryResult represents a full resultset.
// because the results are row based and the grafana api expects columnar values it is easier
// to first collect all the values then map that to the grafana expected format
type BigQueryResult struct {
Schema bigquery.Schema
Rows []map[string]bigquery.Value
}
type instanceSettings struct {
httpClient *http.Client
}
// BigQueryDatasource is an example datasource used to scaffold
// new datasource plugins with an backend.
type BigQueryDatasource struct {
// The instance manager can help with lifecycle management
// of datasource instances in plugins. It's not a requirements
// but a best practice that we recommend that you follow.
im instancemgmt.InstanceManager
}
// newDatasource returns datasource.ServeOpts.
func newDatasource() datasource.ServeOpts {
// creates a instance manager for your plugin. The function passed
// into `NewInstanceManger` is called when the instance is created
// for the first time or when a datasource configuration changed.
im := datasource.NewInstanceManager(newDataSourceInstance)
ds := &BigQueryDatasource{
im: im,
}
return datasource.ServeOpts{
QueryDataHandler: ds,
CheckHealthHandler: ds,
}
}
// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifer).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (td *BigQueryDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
res := td.query(ctx, q, req)
// save the response in a hashmap
// based on with RefID as identifier
response.Responses[q.RefID] = res
}
return response, nil
}
func (td *BigQueryDatasource) query(ctx context.Context, query backend.DataQuery, req *backend.QueryDataRequest) backend.DataResponse {
response := backend.DataResponse{}
// Unmarshal the json into our queryModel
var queryModel QueryModel
response.Error = json.Unmarshal(query.JSON, &queryModel)
if response.Error != nil {
return response
}
// full resultset from the query
var bigQueryResult *BigQueryResult
bigQueryResult, response.Error = td.executeQuery(ctx, queryModel, query, req)
if response.Error != nil {
return response
}
// create data frame response
frame := data.NewFrame("response")
// supports only time_series initally
switch queryModel.Format {
case "time_series":
// any field called "time" will be parsed from Timestamp
// else try to convert from an int64/float64 metric
for _, fieldSchema := range bigQueryResult.Schema {
switch fieldSchema.Name {
case "time":
values := make([]time.Time, 0)
for _, row := range bigQueryResult.Rows {
value := row[fieldSchema.Name]
switch fieldSchema.Type {
case bigquery.TimestampFieldType:
v, ok := value.(time.Time)
if !ok {
response.Error = fmt.Errorf("could not convert field '%s' into time.Time field", fieldSchema.Name)
return response
}
values = append(values, v)
break
default:
response.Error = fmt.Errorf("unexpected type for field '%s': %s", fieldSchema.Name, fieldSchema.Type)
return response
}
}
frame.Fields = append(frame.Fields,
data.NewField(fieldSchema.Name, nil, values),
)
break
default:
switch fieldSchema.Type {
case bigquery.IntegerFieldType:
values := make([]int64, 0)
for _, row := range bigQueryResult.Rows {
value := row[fieldSchema.Name]
v, ok := value.(int64)
if !ok {
response.Error = fmt.Errorf("could not convert field '%s' into int64 field", fieldSchema.Name)
return response
}
values = append(values, v)
}
frame.Fields = append(frame.Fields,
data.NewField("values", nil, values),
)
break
case bigquery.FloatFieldType:
values := make([]float64, 0)
for _, row := range bigQueryResult.Rows {
value := row[fieldSchema.Name]
v, ok := value.(float64)
if !ok {
response.Error = fmt.Errorf("could not convert field '%s' into int64 field", fieldSchema.Name)
return response
}
values = append(values, v)
}
frame.Fields = append(frame.Fields,
data.NewField(fieldSchema.Name, nil, values),
)
break
default:
response.Error = fmt.Errorf("unexpected type for field '%s': %s", fieldSchema.Name, fieldSchema.Type)
return response
}
break
}
}
break
default:
response.Error = fmt.Errorf("unimplemented format '%s'. expected one of ['time_series']", queryModel.Format)
return response
}
// add the frames to the response
response.Frames = append(response.Frames, frame)
return response
}
// CheckHealth handles health checks sent from Grafana to the plugin.
// The main use case for these health checks is the test button on the
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (td *BigQueryDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
var status = backend.HealthStatusOk
var message = "Data source is working"
return &backend.CheckHealthResult{
Status: status,
Message: message,
}, nil
}
func newDataSourceInstance(setting backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return &instanceSettings{
httpClient: &http.Client{},
}, nil
}
func (s *instanceSettings) Dispose() {}
// ExecuteQuery executes the query against BigQuery and returns an result iterator
func (td *BigQueryDatasource) executeQuery(ctx context.Context, queryModel QueryModel, originalQuery backend.DataQuery, req *backend.QueryDataRequest) (*BigQueryResult, error) {
if !queryModel.RawQuery {
return nil, errors.New("alerting queries only support raw sql")
}
// unmarshal the values provided by the datasource configuration
var jsonData JSONData
err := json.Unmarshal(req.PluginContext.DataSourceInstanceSettings.JSONData, &jsonData)
if err != nil {
return nil, err
}
// ensure a projectID is available
projectID := jsonData.DefaultProject
if projectID == "" {
projectID = queryModel.ProjectID
}
if projectID == "" {
return nil, errors.New("expected 'query.ProjectID' or 'req.PluginContext.DataSourceInstanceSettings.JSONData.defaultProject' to be set")
}
// ensure a location is available
location := jsonData.ProcessingLocation
if location == "" {
location = queryModel.Location
}
if location == "" {
return nil, errors.New("expected 'query.Location' or 'req.PluginContext.DataSourceInstanceSettings.JSONData.processingLocation' to be set")
}
// the client is going to be created depending on AuthenticationType
var client *bigquery.Client
switch jsonData.AuthenticationType {
case "gce":
client, err = bigquery.NewClient(ctx, projectID)
if err != nil {
log.DefaultLogger.Error("bigquery.NewClient", err)
return nil, err
}
case "jwt":
// test validity of required JSONData
if jsonData.ClientEmail == "" {
return nil, errors.New("expected req.PluginContext.DataSourceInstanceSettings.JSONData.clientEmail' to be set")
}
if jsonData.TokenURI == "" {
return nil, errors.New("expected req.PluginContext.DataSourceInstanceSettings.JSONData.tokenUri' to be set")
}
// test validity of required DecryptedSecureJSONData
privateKey, ok := req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData["privateKey"]
if !ok {
return nil, errors.New("expected 'req.PluginContext.DataSourceInstanceSettings.DecryptedSecureJSONData.privateKey' to be set")
}
conf := &jwt.Config{
Email: jsonData.ClientEmail,
PrivateKey: []byte(privateKey),
Scopes: []string{
"https://www.googleapis.com/auth/bigquery",
},
TokenURL: jsonData.TokenURI,
}
// create the client
client, err = bigquery.NewClient(ctx, projectID, option.WithHTTPClient(conf.Client(ctx)))
if err != nil {
log.DefaultLogger.Error("bigquery.NewClient", err)
return nil, err
}
default:
return nil, fmt.Errorf("unimplemented authenticationType '%s'", jsonData.AuthenticationType)
}
defer client.Close()
// ensure we have a query to run
if queryModel.RawSQL == "" {
return nil, errors.New("expected 'req.PluginContext.DataSourceInstanceSettings.JSONData.rawSql' to be set")
}
// substituteVariables replaces any placeholder variables
sql := substituteVariables(queryModel.RawSQL, queryModel, originalQuery)
log.DefaultLogger.Debug(fmt.Sprintf("query: %s\n", sql))
// create the query
query := client.Query(sql)
query.Location = location
// Run the query
job, err := query.Run(ctx)
if err != nil {
log.DefaultLogger.Error("*bigquery.Query.Run", err)
return nil, err
}
log.DefaultLogger.Debug("*bigquery.Query.Run", job.ID())
status, err := job.Wait(ctx)
if err != nil {
log.DefaultLogger.Error("*bigquery.Job.Wait", err)
return nil, err
}
if err := status.Err(); err != nil {
log.DefaultLogger.Error("*bigquery.JobStatus", err)
return nil, err
}
rowIterator, err := job.Read(ctx)
if err != nil {
log.DefaultLogger.Error("*bigquery.Job.Read", err)
return nil, err
}
// create a structure to collect all the results
rows := make([]map[string]bigquery.Value, 0)
for {
var row map[string]bigquery.Value
err := rowIterator.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
log.DefaultLogger.Error("*bigquery.RowIterator.Next", err)
return nil, err
}
rows = append(rows, row)
}
return &BigQueryResult{rowIterator.Schema, rows}, nil
}
// substituteVariables replaces standard grafana variables in an input string and returns the result
func substituteVariables(sql string, queryModel QueryModel, originalQuery backend.DataQuery) string {
// __from
sql = strings.Replace(sql, "${__from}", fmt.Sprintf("%d", originalQuery.TimeRange.From.UnixNano()/int64(time.Millisecond)), -1)
sql = strings.Replace(sql, "${__from:date}", originalQuery.TimeRange.From.Format(time.RFC3339), -1)
sql = strings.Replace(sql, "${__from:date:iso}", originalQuery.TimeRange.From.Format(time.RFC3339), -1)
sql = strings.Replace(sql, "${__from:date:seconds}", fmt.Sprintf("%d", originalQuery.TimeRange.From.Unix()), -1)
sql = strings.Replace(sql, "${__from:date:YYYY-MM}", originalQuery.TimeRange.From.Format("2006-01"), -1)
// __to
sql = strings.Replace(sql, "${__to}", fmt.Sprintf("%d", originalQuery.TimeRange.To.UnixNano()/int64(time.Millisecond)), -1)
sql = strings.Replace(sql, "${__to:date}", originalQuery.TimeRange.To.Format(time.RFC3339), -1)
sql = strings.Replace(sql, "${__to:date:iso}", originalQuery.TimeRange.To.Format(time.RFC3339), -1)
sql = strings.Replace(sql, "${__to:date:seconds}", fmt.Sprintf("%d", originalQuery.TimeRange.To.Unix()), -1)
sql = strings.Replace(sql, "${__to:date:YYYY-MM}", originalQuery.TimeRange.To.Format("2006-01"), -1)
// Macros
from := adjustTime(originalQuery.TimeRange.From, queryModel.TimeColumnType)
to := adjustTime(originalQuery.TimeRange.To, queryModel.TimeColumnType)
wholeRange := quoteTimeColumn(queryModel.TimeColumn) + " BETWEEN " + from + " AND " + to
fromRange := quoteTimeColumn(queryModel.TimeColumn) + " > " + from + " "
toRange := quoteTimeColumn(queryModel.TimeColumn) + " < " + to + " "
timeFilterRegex := regexp.MustCompile(`\$__timeFilter\((.*?)\)`)
sql = timeFilterRegex.ReplaceAllString(sql, wholeRange)
timeFromRegex := regexp.MustCompile(`\$__timeFrom\(([\w_.]+)\)`)
sql = timeFromRegex.ReplaceAllString(sql, fromRange)
timeToRegex := regexp.MustCompile(`\$__timeTo\(([\w_.]+)\)`)
sql = timeToRegex.ReplaceAllString(sql, toRange)
millisTimeToRegex := regexp.MustCompile(`\$__millisTimeTo\(([\w_.]+)\)`)
sql = millisTimeToRegex.ReplaceAllString(sql, to)
millisTimeFromRegex := regexp.MustCompile(`\$__millisTimeFrom\(([\w_.]+)\)`)
sql = millisTimeFromRegex.ReplaceAllString(sql, from)
return sql
}
func adjustTime(value time.Time, timeType string) string {
switch timeType {
case "DATE":
return fmt.Sprintf("'%s'", value.Format("2006-01-01")) // '2006-01-01'
case "DATETIME":
return fmt.Sprintf("'%s'", value.Format("2006-01-01 15:04:05")) // '2006-01-01 15:04:05'
default:
return fmt.Sprintf("TIMESTAMP_MILLIS (%d)", value.UnixMilli()) // TIMESTAMP_MILLIS (1612056873199)
}
}
func quoteTimeColumn(value string) string {
vals := strings.Split(value, ".")
res := ""
for i := 0; i < len(vals); i++ {
res = res + "`" + vals[i] + "`"
if len(vals) > 1 && i+1 < len(vals) {
res = res + "."
}
}
return res
}