synapsecns/sanguine

View on GitHub
ethergo/submitter/metrics.go

Summary

Maintainability
A
0 mins
Test Coverage
package submitter

import (
    "context"
    "fmt"
    "github.com/cornelk/hashmap"
    "github.com/synapsecns/sanguine/core/metrics"
    "github.com/synapsecns/sanguine/ethergo/signer/signer"
    "github.com/synapsecns/sanguine/ethergo/submitter/db"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/metric"
    "math/big"
    "time"
)

const meterName = "github.com/synapsecns/sanguine/ethergo/submitter"

// generate an interface for otelRecorder that exports the public method.
// this allows us to avoid using recordX externally anad makes the package less confusing.
//
// =============================================================================
// =============================================================================
// IMPORTANT: DO NOT REMOVE THIS COMMENT.
// NOTICE: PLEASE MAKE SURE YOU UPDATE BOTH THE DOCS AND THE GRAFANA DASHBOARD (IF NEEDED) AFTER UPDATING METRICS.
// =============================================================================
// =============================================================================
//
//go:generate go run github.com/vburenin/ifacemaker -f metrics.go -s otelRecorder -i iOtelRecorder -p submitter -o metrics_generated.go -c "autogenerated file"
type otelRecorder struct {
    metrics metrics.Handler
    // meter is the metrics meter.
    meter metric.Meter
    // confirmedQueueGauge is the gauge for the confirmed queue.
    confirmedQueueGauge metric.Int64ObservableGauge
    // oldestPendingGauge is the gauge for the oldest pending transaction.
    oldestPendingGauge metric.Float64ObservableGauge
    // numPendingGauge is the gauge for the number of pending transactions.
    numPendingGauge metric.Int64ObservableGauge
    // nonceGauge is the gauge for the current nonce.
    nonceGauge metric.Int64ObservableGauge
    // gasBalanceGauge is the gauge for the gas balance.
    gasBalanceGauge metric.Float64ObservableGauge
    // numPendingTxes is used for metrics.
    // note: numPendingTxes will stop counting at DefaultMaxResultsPerChain.
    numPendingTxes *hashmap.Map[uint32, int]
    // currentNonces is used for metrics.
    // chainID -> nonce
    currentNonces *hashmap.Map[uint32, uint64]
    // currentGasBalance is used for metrics.
    // chainID -> balance
    currentGasBalances *hashmap.Map[uint32, *big.Int]
    // oldestPendingPerChain is the oldest pending transaction.
    // chainID -> time
    oldestPendingPerChain *hashmap.Map[uint32, time.Duration]
    // confirmedQueueCount is the count of the confirmed queue.
    confirmedQueueCount *hashmap.Map[uint32, int]
    // signer is the signer for signing transactions.
    signer signer.Signer
}

// nolint: cyclop
func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOtelRecorder, err error) {
    or := otelRecorder{
        metrics:               meterHandler,
        meter:                 meterHandler.Meter(meterName),
        numPendingTxes:        hashmap.New[uint32, int](),
        currentNonces:         hashmap.New[uint32, uint64](),
        currentGasBalances:    hashmap.New[uint32, *big.Int](),
        oldestPendingPerChain: hashmap.New[uint32, time.Duration](),
        confirmedQueueCount:   hashmap.New[uint32, int](),
        signer:                signerT,
    }

    or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes")
    if err != nil {
        return nil, fmt.Errorf("could not create num pending txes gauge: %w", err)
    }

    _, err = or.meter.RegisterCallback(or.recordNumPending, or.numPendingGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback for num pending txes gauge: %w", err)
    }

    or.nonceGauge, err = or.meter.Int64ObservableGauge("current_nonce")
    if err != nil {
        return nil, fmt.Errorf("could not create nonce gauge: %w", err)
    }

    _, err = or.meter.RegisterCallback(or.recordNonces, or.nonceGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback for nonce gauge: %w", err)
    }

    or.gasBalanceGauge, err = or.meter.Float64ObservableGauge("gas_balance")
    if err != nil {
        return nil, fmt.Errorf("could not create gas balance gauge: %w", err)
    }

    _, err = or.meter.RegisterCallback(or.recordBalance, or.gasBalanceGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback for gas balance gauge: %w", err)
    }

    or.oldestPendingGauge, err = or.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s"))
    if err != nil {
        return nil, fmt.Errorf("could not create oldest pending gauge: %w", err)
    }

    _, err = or.meter.RegisterCallback(or.recordOldestPendingTx, or.oldestPendingGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback for oldest pending gauge: %w", err)
    }

    or.confirmedQueueGauge, err = or.meter.Int64ObservableGauge("confirmed_queue")
    if err != nil {
        return nil, fmt.Errorf("could not create confirmed queue gauge: %w", err)
    }

    _, err = or.meter.RegisterCallback(or.recordConfirmedQueue, or.confirmedQueueGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback for confirmed queue gauge: %w", err)
    }

    return &or, nil
}

func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) {
    if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil {
        return nil
    }

    o.numPendingTxes.Range(func(chainID uint32, numPending int) bool {
        opts := metric.WithAttributes(
            attribute.Int(metrics.ChainID, int(chainID)),
            attribute.String("wallet", o.signer.Address().Hex()),
        )
        observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts)

        return true
    })

    return nil
}

func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) (err error) {
    if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil {
        return nil
    }

    o.currentNonces.Range(func(chainID uint32, nonce uint64) bool {
        opts := metric.WithAttributes(
            attribute.Int(metrics.ChainID, int(chainID)),
            attribute.String("wallet", o.signer.Address().Hex()),
        )
        observer.ObserveInt64(o.nonceGauge, int64(nonce), opts)
        return true
    })

    return nil
}

func (o *otelRecorder) recordConfirmedQueue(_ context.Context, observer metric.Observer) (err error) {
    if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil {
        return nil
    }

    o.confirmedQueueCount.Range(func(chainID uint32, queueSize int) bool {
        opts := metric.WithAttributes(
            attribute.Int(metrics.ChainID, int(chainID)),
            attribute.String("wallet", o.signer.Address().Hex()),
        )
        observer.ObserveInt64(o.confirmedQueueGauge, int64(queueSize), opts)
        return true
    })

    return nil
}

func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer) (err error) {
    if o.metrics == nil || o.gasBalanceGauge == nil {
        return nil
    }

    o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool {
        opts := metric.WithAttributes(
            attribute.Int(metrics.ChainID, int(chainID)),
            attribute.String("wallet", o.signer.Address().Hex()),
        )

        observer.ObserveFloat64(o.gasBalanceGauge, toFloat(gasPrice), opts)
        return true
    })

    return nil
}

func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) {
    if o.metrics == nil || o.oldestPendingGauge == nil {
        return nil
    }

    o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool {
        opts := metric.WithAttributes(
            attribute.Int(metrics.ChainID, int(chainID)),
            attribute.String("wallet", o.signer.Address().Hex()),
        )
        observer.ObserveFloat64(o.oldestPendingGauge, oldestPendingTx.Seconds(), opts)

        return true
    })

    return nil
}

// RecordNonceForChain sets the nonce for a chain.
func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) {
    o.currentNonces.Set(chainID, nonce)
}

// RecordGasBalanceForChain sets the gas balance for a chain.
func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) {
    o.currentGasBalances.Set(chainID, balance)
}

// RecordOldestPendingTx sets the oldest pending tx.
func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) {
    o.oldestPendingPerChain.Set(chainID, lastPending)
}

// RecordNumPendingTxes sets the number of pending txes.
func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) {
    o.numPendingTxes.Set(chainID, numPending)
}

// RecordConfirmedQueue sets the confirmed queue count.
func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) {
    o.confirmedQueueCount.Set(chainID, queueSize)
}

// HasNonceForChain checks if a nonce exists for a chain.
func (o *otelRecorder) HasNonceForChain(chainID uint32) bool {
    _, ok := o.currentNonces.Get(chainID)
    return ok
}

// HasGasBalanceForChain checks if a gas balance exists for a chain.
func (o *otelRecorder) HasGasBalanceForChain(chainID uint32) bool {
    _, ok := o.currentGasBalances.Get(chainID)
    return ok
}

// fetchOldestPendingTx fetches the oldest pending tx in the queue.
func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time {
    oldestPendingTx := time.Now()
    for _, tx := range txes {
        if tx.Nonce() >= nonce {
            continue
        }

        if tx.CreationTime().Before(oldestPendingTx) {
            oldestPendingTx = tx.CreationTime()
        }
    }

    return oldestPendingTx
}

// calculatePendingTxes calculates the number of pending txes in the queue.
func calculatePendingTxes(txes []db.TX, nonce uint64) int {
    realPendingCount := 0
    for _, tx := range txes {
        // current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0)
        // so we use equal to here
        if tx.Nonce() >= nonce {
            realPendingCount++
        }
    }

    return realPendingCount
}