firehol/netdata

View on GitHub
src/go/plugin/go.d/modules/pgbouncer/collect.go

Summary

Maintainability
A
1 hr
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package pgbouncer

import (
    "context"
    "database/sql"
    "fmt"
    "regexp"
    "strconv"
    "strings"
    "time"

    "github.com/blang/semver/v4"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/stdlib"
)

// 'SHOW STATS;' response was changed significantly in v1.8.0
// v1.8.0 was released in 2015 - no need to complicate the code to support the old version.
var minSupportedVersion = semver.Version{Major: 1, Minor: 8, Patch: 0}

const (
    queryShowVersion   = "SHOW VERSION;"
    queryShowConfig    = "SHOW CONFIG;"
    queryShowDatabases = "SHOW DATABASES;"
    queryShowStats     = "SHOW STATS;"
    queryShowPools     = "SHOW POOLS;"
)

func (p *PgBouncer) collect() (map[string]int64, error) {
    if p.db == nil {
        if err := p.openConnection(); err != nil {
            return nil, err
        }
    }
    if p.version == nil {
        ver, err := p.queryVersion()
        if err != nil {
            return nil, err
        }
        p.Debugf("connected to PgBouncer v%s", ver)
        if ver.LE(minSupportedVersion) {
            return nil, fmt.Errorf("unsupported version: v%s, required v%s+", ver, minSupportedVersion)
        }
        p.version = ver
    }

    now := time.Now()
    if now.Sub(p.recheckSettingsTime) > p.recheckSettingsEvery {
        v, err := p.queryMaxClientConn()
        if err != nil {
            return nil, err
        }
        p.maxClientConn = v
    }

    // http://www.pgbouncer.org/usage.html

    p.resetMetrics()

    if err := p.collectDatabases(); err != nil {
        return nil, err
    }
    if err := p.collectStats(); err != nil {
        return nil, err
    }
    if err := p.collectPools(); err != nil {
        return nil, err
    }

    mx := make(map[string]int64)
    p.collectMetrics(mx)

    return mx, nil
}

func (p *PgBouncer) collectMetrics(mx map[string]int64) {
    var clientConns int64
    for name, db := range p.metrics.dbs {
        if !db.updated {
            delete(p.metrics.dbs, name)
            p.removeDatabaseCharts(name)
            continue
        }
        if !db.hasCharts {
            db.hasCharts = true
            p.addNewDatabaseCharts(name, db.pgDBName)
        }

        mx["db_"+name+"_total_xact_count"] = db.totalXactCount
        mx["db_"+name+"_total_xact_time"] = db.totalXactTime
        mx["db_"+name+"_avg_xact_time"] = db.avgXactTime

        mx["db_"+name+"_total_query_count"] = db.totalQueryCount
        mx["db_"+name+"_total_query_time"] = db.totalQueryTime
        mx["db_"+name+"_avg_query_time"] = db.avgQueryTime

        mx["db_"+name+"_total_wait_time"] = db.totalWaitTime
        mx["db_"+name+"_maxwait"] = db.maxWait*1e6 + db.maxWaitUS

        mx["db_"+name+"_cl_active"] = db.clActive
        mx["db_"+name+"_cl_waiting"] = db.clWaiting
        mx["db_"+name+"_cl_cancel_req"] = db.clCancelReq
        clientConns += db.clActive + db.clWaiting + db.clCancelReq

        mx["db_"+name+"_sv_active"] = db.svActive
        mx["db_"+name+"_sv_idle"] = db.svIdle
        mx["db_"+name+"_sv_used"] = db.svUsed
        mx["db_"+name+"_sv_tested"] = db.svTested
        mx["db_"+name+"_sv_login"] = db.svLogin

        mx["db_"+name+"_total_received"] = db.totalReceived
        mx["db_"+name+"_total_sent"] = db.totalSent

        mx["db_"+name+"_sv_conns_utilization"] = calcPercentage(db.currentConnections, db.maxConnections)
    }

    mx["cl_conns_utilization"] = calcPercentage(clientConns, p.maxClientConn)
}

func (p *PgBouncer) collectDatabases() error {
    q := queryShowDatabases
    p.Debugf("executing query: %v", q)

    var db string
    return p.collectQuery(q, func(column, value string) {
        switch column {
        case "name":
            db = value
            p.getDBMetrics(db).updated = true
        case "database":
            p.getDBMetrics(db).pgDBName = value
        case "max_connections":
            p.getDBMetrics(db).maxConnections = parseInt(value)
        case "current_connections":
            p.getDBMetrics(db).currentConnections = parseInt(value)
        case "paused":
            p.getDBMetrics(db).paused = parseInt(value)
        case "disabled":
            p.getDBMetrics(db).disabled = parseInt(value)
        }
    })
}

func (p *PgBouncer) collectStats() error {
    q := queryShowStats
    p.Debugf("executing query: %v", q)

    var db string
    return p.collectQuery(q, func(column, value string) {
        switch column {
        case "database":
            db = value
            p.getDBMetrics(db).updated = true
        case "total_xact_count":
            p.getDBMetrics(db).totalXactCount = parseInt(value)
        case "total_query_count":
            p.getDBMetrics(db).totalQueryCount = parseInt(value)
        case "total_received":
            p.getDBMetrics(db).totalReceived = parseInt(value)
        case "total_sent":
            p.getDBMetrics(db).totalSent = parseInt(value)
        case "total_xact_time":
            p.getDBMetrics(db).totalXactTime = parseInt(value)
        case "total_query_time":
            p.getDBMetrics(db).totalQueryTime = parseInt(value)
        case "total_wait_time":
            p.getDBMetrics(db).totalWaitTime = parseInt(value)
        case "avg_xact_time":
            p.getDBMetrics(db).avgXactTime = parseInt(value)
        case "avg_query_time":
            p.getDBMetrics(db).avgQueryTime = parseInt(value)
        }
    })
}

func (p *PgBouncer) collectPools() error {
    q := queryShowPools
    p.Debugf("executing query: %v", q)

    // an entry is made for each couple of (database, user).
    var db string
    return p.collectQuery(q, func(column, value string) {
        switch column {
        case "database":
            db = value
            p.getDBMetrics(db).updated = true
        case "cl_active":
            p.getDBMetrics(db).clActive += parseInt(value)
        case "cl_waiting":
            p.getDBMetrics(db).clWaiting += parseInt(value)
        case "cl_cancel_req":
            p.getDBMetrics(db).clCancelReq += parseInt(value)
        case "sv_active":
            p.getDBMetrics(db).svActive += parseInt(value)
        case "sv_idle":
            p.getDBMetrics(db).svIdle += parseInt(value)
        case "sv_used":
            p.getDBMetrics(db).svUsed += parseInt(value)
        case "sv_tested":
            p.getDBMetrics(db).svTested += parseInt(value)
        case "sv_login":
            p.getDBMetrics(db).svLogin += parseInt(value)
        case "maxwait":
            p.getDBMetrics(db).maxWait += parseInt(value)
        case "maxwait_us":
            p.getDBMetrics(db).maxWaitUS += parseInt(value)
        }
    })
}

func (p *PgBouncer) queryMaxClientConn() (int64, error) {
    q := queryShowConfig
    p.Debugf("executing query: %v", q)

    var v int64
    var key string
    err := p.collectQuery(q, func(column, value string) {
        switch column {
        case "key":
            key = value
        case "value":
            if key == "max_client_conn" {
                v = parseInt(value)
            }
        }
    })
    return v, err
}

var reVersion = regexp.MustCompile(`\d+\.\d+\.\d+`)

func (p *PgBouncer) queryVersion() (*semver.Version, error) {
    q := queryShowVersion
    p.Debugf("executing query: %v", q)

    var resp string
    ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration())
    defer cancel()
    if err := p.db.QueryRowContext(ctx, q).Scan(&resp); err != nil {
        return nil, err
    }

    if !strings.Contains(resp, "PgBouncer") {
        return nil, fmt.Errorf("not PgBouncer instance: version response: %s", resp)
    }

    ver := reVersion.FindString(resp)
    if ver == "" {
        return nil, fmt.Errorf("couldn't parse version string '%s' (expected pattern '%s')", resp, reVersion)
    }

    v, err := semver.New(ver)
    if err != nil {
        return nil, fmt.Errorf("couldn't parse version string '%s': %v", ver, err)
    }

    return v, nil
}

func (p *PgBouncer) openConnection() error {
    cfg, err := pgx.ParseConfig(p.DSN)
    if err != nil {
        return err
    }
    cfg.PreferSimpleProtocol = true

    db, err := sql.Open("pgx", stdlib.RegisterConnConfig(cfg))
    if err != nil {
        return fmt.Errorf("error on opening a connection with the PgBouncer database [%s]: %v", p.DSN, err)
    }

    db.SetMaxOpenConns(1)
    db.SetMaxIdleConns(1)
    db.SetConnMaxLifetime(10 * time.Minute)

    p.db = db

    return nil
}

func (p *PgBouncer) collectQuery(query string, assign func(column, value string)) error {
    ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration())
    defer cancel()
    rows, err := p.db.QueryContext(ctx, query)
    if err != nil {
        return err
    }
    defer func() { _ = rows.Close() }()

    columns, err := rows.Columns()
    if err != nil {
        return err
    }

    values := makeNullStrings(len(columns))
    for rows.Next() {
        if err := rows.Scan(values...); err != nil {
            return err
        }
        for i, v := range values {
            assign(columns[i], valueToString(v))
        }
    }
    return rows.Err()
}

func (p *PgBouncer) getDBMetrics(dbname string) *dbMetrics {
    db, ok := p.metrics.dbs[dbname]
    if !ok {
        db = &dbMetrics{name: dbname}
        p.metrics.dbs[dbname] = db
    }
    return db
}

func (p *PgBouncer) resetMetrics() {
    for name, db := range p.metrics.dbs {
        p.metrics.dbs[name] = &dbMetrics{
            name:      db.name,
            pgDBName:  db.pgDBName,
            hasCharts: db.hasCharts,
        }
    }
}

func valueToString(value any) string {
    v, ok := value.(*sql.NullString)
    if !ok || !v.Valid {
        return ""
    }
    return v.String
}

func makeNullStrings(size int) []any {
    vs := make([]any, size)
    for i := range vs {
        vs[i] = &sql.NullString{}
    }
    return vs
}

func parseInt(s string) int64 {
    v, _ := strconv.ParseInt(s, 10, 64)
    return v
}

func calcPercentage(value, total int64) int64 {
    if total == 0 {
        return 0
    }
    return value * 100 / total
}