firehol/netdata

View on GitHub
src/go/plugin/go.d/modules/mysql/collect.go

Summary

Maintainability
A
3 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package mysql

import (
    "context"
    "database/sql"
    "errors"
    "fmt"
    "strconv"
    "strings"
    "time"

    "github.com/blang/semver/v4"
)

func (m *MySQL) collect() (map[string]int64, error) {
    if m.db == nil {
        if err := m.openConnection(); err != nil {
            return nil, err
        }
    }
    if m.version == nil {
        if err := m.collectVersion(); err != nil {
            return nil, fmt.Errorf("error on collecting version: %v", err)
        }
        // https://mariadb.com/kb/en/user-statistics/
        m.doUserStatistics = m.isPercona || m.isMariaDB && m.version.GTE(semver.Version{Major: 10, Minor: 1, Patch: 1})
    }

    m.disableSessionQueryLog()

    mx := make(map[string]int64)

    if err := m.collectGlobalStatus(mx); err != nil {
        return nil, fmt.Errorf("error on collecting global status: %v", err)
    }

    if hasInnodbOSLog(mx) {
        m.addInnoDBOSLogOnce.Do(m.addInnoDBOSLogCharts)
    }
    if hasInnodbDeadlocks(mx) {
        m.addInnodbDeadlocksOnce.Do(m.addInnodbDeadlocksChart)
    }
    if hasQCacheMetrics(mx) {
        m.addQCacheOnce.Do(m.addQCacheCharts)
    }
    if hasGaleraMetrics(mx) {
        m.addGaleraOnce.Do(m.addGaleraCharts)
    }
    if hasTableOpenCacheOverflowsMetrics(mx) {
        m.addTableOpenCacheOverflowsOnce.Do(m.addTableOpenCacheOverflowChart)
    }

    now := time.Now()
    if now.Sub(m.recheckGlobalVarsTime) > m.recheckGlobalVarsEvery {
        if err := m.collectGlobalVariables(); err != nil {
            return nil, fmt.Errorf("error on collecting global variables: %v", err)
        }
    }
    mx["max_connections"] = m.varMaxConns
    mx["table_open_cache"] = m.varTableOpenCache

    if m.isMariaDB || !strings.Contains(m.varDisabledStorageEngine, "MyISAM") {
        m.addMyISAMOnce.Do(m.addMyISAMCharts)
    }
    if m.varLogBin != "OFF" {
        m.addBinlogOnce.Do(m.addBinlogCharts)
    }

    // TODO: perhaps make a decisions based on privileges? (SHOW GRANTS FOR CURRENT_USER();)
    if m.doSlaveStatus {
        if err := m.collectSlaveStatus(mx); err != nil {
            m.Warningf("error on collecting slave status: %v", err)
            m.doSlaveStatus = errors.Is(err, context.DeadlineExceeded)
        }
    }

    if m.doUserStatistics {
        if err := m.collectUserStatistics(mx); err != nil {
            m.Warningf("error on collecting user statistics: %v", err)
            m.doUserStatistics = errors.Is(err, context.DeadlineExceeded)
        }
    }

    if err := m.collectProcessListStatistics(mx); err != nil {
        m.Errorf("error on collecting process list statistics: %v", err)
    }

    calcThreadCacheMisses(mx)
    return mx, nil
}

func (m *MySQL) openConnection() error {
    db, err := sql.Open("mysql", m.DSN)
    if err != nil {
        return fmt.Errorf("error on opening a connection with the mysql database [%s]: %v", m.safeDSN, err)
    }

    db.SetConnMaxLifetime(10 * time.Minute)

    ctx, cancel := context.WithTimeout(context.Background(), m.Timeout.Duration())
    defer cancel()

    if err := db.PingContext(ctx); err != nil {
        _ = db.Close()
        return fmt.Errorf("error on pinging the mysql database [%s]: %v", m.safeDSN, err)
    }

    m.db = db
    return nil
}

func calcThreadCacheMisses(collected map[string]int64) {
    threads, cons := collected["threads_created"], collected["connections"]
    if threads == 0 || cons == 0 {
        collected["thread_cache_misses"] = 0
    } else {
        collected["thread_cache_misses"] = int64(float64(threads) / float64(cons) * 10000)
    }
}

func hasInnodbOSLog(collected map[string]int64) bool {
    // removed in MariaDB 10.8 (https://mariadb.com/kb/en/innodb-status-variables/#innodb_os_log_fsyncs)
    _, ok := collected["innodb_os_log_fsyncs"]
    return ok
}

func hasInnodbDeadlocks(collected map[string]int64) bool {
    _, ok := collected["innodb_deadlocks"]
    return ok
}

func hasGaleraMetrics(collected map[string]int64) bool {
    _, ok := collected["wsrep_received"]
    return ok
}

func hasQCacheMetrics(collected map[string]int64) bool {
    _, ok := collected["qcache_hits"]
    return ok
}

func hasTableOpenCacheOverflowsMetrics(collected map[string]int64) bool {
    _, ok := collected["table_open_cache_overflows"]
    return ok
}

func (m *MySQL) collectQuery(query string, assign func(column, value string, lineEnd bool)) (duration int64, err error) {
    ctx, cancel := context.WithTimeout(context.Background(), m.Timeout.Duration())
    defer cancel()

    s := time.Now()
    rows, err := m.db.QueryContext(ctx, query)
    if err != nil {
        return 0, err
    }
    duration = time.Since(s).Milliseconds()
    defer func() { _ = rows.Close() }()

    columns, err := rows.Columns()
    if err != nil {
        return duration, err
    }

    vs := makeValues(len(columns))
    for rows.Next() {
        if err := rows.Scan(vs...); err != nil {
            return duration, err
        }
        for i, l := 0, len(vs); i < l; i++ {
            assign(columns[i], valueToString(vs[i]), i == l-1)
        }
    }
    return duration, rows.Err()
}

func makeValues(size int) []any {
    vs := make([]any, size)
    for i := range vs {
        vs[i] = &sql.NullString{}
    }
    return vs
}

func valueToString(value any) string {
    v, ok := value.(*sql.NullString)
    if !ok || !v.Valid {
        return ""
    }
    return v.String
}

func parseInt(s string) int64 {
    v, _ := strconv.ParseInt(s, 10, 64)
    return v
}

func parseFloat(s string) float64 {
    v, _ := strconv.ParseFloat(s, 64)
    return v
}