ethergo/submitter/metrics.go
package submitter
import (
"context"
"fmt"
"github.com/cornelk/hashmap"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/ethergo/signer/signer"
"github.com/synapsecns/sanguine/ethergo/submitter/db"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"math/big"
"time"
)
const meterName = "github.com/synapsecns/sanguine/ethergo/submitter"
// generate an interface for otelRecorder that exports the public method.
// this allows us to avoid using recordX externally anad makes the package less confusing.
//
// =============================================================================
// =============================================================================
// IMPORTANT: DO NOT REMOVE THIS COMMENT.
// NOTICE: PLEASE MAKE SURE YOU UPDATE BOTH THE DOCS AND THE GRAFANA DASHBOARD (IF NEEDED) AFTER UPDATING METRICS.
// =============================================================================
// =============================================================================
//
//go:generate go run github.com/vburenin/ifacemaker -f metrics.go -s otelRecorder -i iOtelRecorder -p submitter -o metrics_generated.go -c "autogenerated file"
type otelRecorder struct {
metrics metrics.Handler
// meter is the metrics meter.
meter metric.Meter
// confirmedQueueGauge is the gauge for the confirmed queue.
confirmedQueueGauge metric.Int64ObservableGauge
// oldestPendingGauge is the gauge for the oldest pending transaction.
oldestPendingGauge metric.Float64ObservableGauge
// numPendingGauge is the gauge for the number of pending transactions.
numPendingGauge metric.Int64ObservableGauge
// nonceGauge is the gauge for the current nonce.
nonceGauge metric.Int64ObservableGauge
// gasBalanceGauge is the gauge for the gas balance.
gasBalanceGauge metric.Float64ObservableGauge
// numPendingTxes is used for metrics.
// note: numPendingTxes will stop counting at DefaultMaxResultsPerChain.
numPendingTxes *hashmap.Map[uint32, int]
// currentNonces is used for metrics.
// chainID -> nonce
currentNonces *hashmap.Map[uint32, uint64]
// currentGasBalance is used for metrics.
// chainID -> balance
currentGasBalances *hashmap.Map[uint32, *big.Int]
// oldestPendingPerChain is the oldest pending transaction.
// chainID -> time
oldestPendingPerChain *hashmap.Map[uint32, time.Duration]
// confirmedQueueCount is the count of the confirmed queue.
confirmedQueueCount *hashmap.Map[uint32, int]
// signer is the signer for signing transactions.
signer signer.Signer
}
// nolint: cyclop
func newOtelRecorder(meterHandler metrics.Handler, signerT signer.Signer) (_ iOtelRecorder, err error) {
or := otelRecorder{
metrics: meterHandler,
meter: meterHandler.Meter(meterName),
numPendingTxes: hashmap.New[uint32, int](),
currentNonces: hashmap.New[uint32, uint64](),
currentGasBalances: hashmap.New[uint32, *big.Int](),
oldestPendingPerChain: hashmap.New[uint32, time.Duration](),
confirmedQueueCount: hashmap.New[uint32, int](),
signer: signerT,
}
or.numPendingGauge, err = or.meter.Int64ObservableGauge("num_pending_txes")
if err != nil {
return nil, fmt.Errorf("could not create num pending txes gauge: %w", err)
}
_, err = or.meter.RegisterCallback(or.recordNumPending, or.numPendingGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for num pending txes gauge: %w", err)
}
or.nonceGauge, err = or.meter.Int64ObservableGauge("current_nonce")
if err != nil {
return nil, fmt.Errorf("could not create nonce gauge: %w", err)
}
_, err = or.meter.RegisterCallback(or.recordNonces, or.nonceGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for nonce gauge: %w", err)
}
or.gasBalanceGauge, err = or.meter.Float64ObservableGauge("gas_balance")
if err != nil {
return nil, fmt.Errorf("could not create gas balance gauge: %w", err)
}
_, err = or.meter.RegisterCallback(or.recordBalance, or.gasBalanceGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for gas balance gauge: %w", err)
}
or.oldestPendingGauge, err = or.meter.Float64ObservableGauge("oldest_pending_tx", metric.WithUnit("s"))
if err != nil {
return nil, fmt.Errorf("could not create oldest pending gauge: %w", err)
}
_, err = or.meter.RegisterCallback(or.recordOldestPendingTx, or.oldestPendingGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for oldest pending gauge: %w", err)
}
or.confirmedQueueGauge, err = or.meter.Int64ObservableGauge("confirmed_queue")
if err != nil {
return nil, fmt.Errorf("could not create confirmed queue gauge: %w", err)
}
_, err = or.meter.RegisterCallback(or.recordConfirmedQueue, or.confirmedQueueGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for confirmed queue gauge: %w", err)
}
return &or, nil
}
func (o *otelRecorder) recordNumPending(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.numPendingGauge == nil || o.numPendingTxes == nil {
return nil
}
o.numPendingTxes.Range(func(chainID uint32, numPending int) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", o.signer.Address().Hex()),
)
observer.ObserveInt64(o.numPendingGauge, int64(numPending), opts)
return true
})
return nil
}
func (o *otelRecorder) recordNonces(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil {
return nil
}
o.currentNonces.Range(func(chainID uint32, nonce uint64) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", o.signer.Address().Hex()),
)
observer.ObserveInt64(o.nonceGauge, int64(nonce), opts)
return true
})
return nil
}
func (o *otelRecorder) recordConfirmedQueue(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.nonceGauge == nil || o.currentNonces == nil {
return nil
}
o.confirmedQueueCount.Range(func(chainID uint32, queueSize int) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", o.signer.Address().Hex()),
)
observer.ObserveInt64(o.confirmedQueueGauge, int64(queueSize), opts)
return true
})
return nil
}
func (o *otelRecorder) recordBalance(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.gasBalanceGauge == nil {
return nil
}
o.currentGasBalances.Range(func(chainID uint32, gasPrice *big.Int) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", o.signer.Address().Hex()),
)
observer.ObserveFloat64(o.gasBalanceGauge, toFloat(gasPrice), opts)
return true
})
return nil
}
func (o *otelRecorder) recordOldestPendingTx(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.oldestPendingGauge == nil {
return nil
}
o.oldestPendingPerChain.Range(func(chainID uint32, oldestPendingTx time.Duration) bool {
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
attribute.String("wallet", o.signer.Address().Hex()),
)
observer.ObserveFloat64(o.oldestPendingGauge, oldestPendingTx.Seconds(), opts)
return true
})
return nil
}
// RecordNonceForChain sets the nonce for a chain.
func (o *otelRecorder) RecordNonceForChain(chainID uint32, nonce uint64) {
o.currentNonces.Set(chainID, nonce)
}
// RecordGasBalanceForChain sets the gas balance for a chain.
func (o *otelRecorder) RecordGasBalanceForChain(chainID uint32, balance *big.Int) {
o.currentGasBalances.Set(chainID, balance)
}
// RecordOldestPendingTx sets the oldest pending tx.
func (o *otelRecorder) RecordOldestPendingTx(chainID uint32, lastPending time.Duration) {
o.oldestPendingPerChain.Set(chainID, lastPending)
}
// RecordNumPendingTxes sets the number of pending txes.
func (o *otelRecorder) RecordNumPendingTxes(chainID uint32, numPending int) {
o.numPendingTxes.Set(chainID, numPending)
}
// RecordConfirmedQueue sets the confirmed queue count.
func (o *otelRecorder) RecordConfirmedQueue(chainID uint32, queueSize int) {
o.confirmedQueueCount.Set(chainID, queueSize)
}
// HasNonceForChain checks if a nonce exists for a chain.
func (o *otelRecorder) HasNonceForChain(chainID uint32) bool {
_, ok := o.currentNonces.Get(chainID)
return ok
}
// HasGasBalanceForChain checks if a gas balance exists for a chain.
func (o *otelRecorder) HasGasBalanceForChain(chainID uint32) bool {
_, ok := o.currentGasBalances.Get(chainID)
return ok
}
// fetchOldestPendingTx fetches the oldest pending tx in the queue.
func fetchOldestPendingTx(txes []db.TX, nonce uint64) time.Time {
oldestPendingTx := time.Now()
for _, tx := range txes {
if tx.Nonce() >= nonce {
continue
}
if tx.CreationTime().Before(oldestPendingTx) {
oldestPendingTx = tx.CreationTime()
}
}
return oldestPendingTx
}
// calculatePendingTxes calculates the number of pending txes in the queue.
func calculatePendingTxes(txes []db.TX, nonce uint64) int {
realPendingCount := 0
for _, tx := range txes {
// current nonce is going to be transaction count (nonces are offset by -1 since first nonce is 0)
// so we use equal to here
if tx.Nonce() >= nonce {
realPendingCount++
}
}
return realPendingCount
}