ethergo/listener/otel.go
package listener
import (
"context"
"fmt"
"time"
"github.com/synapsecns/sanguine/core/metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
const meterName = "github.com/synapsecns/sanguine/ethergo/listener"
// generate an interface for otelRecorder that exports the public method.
// this allows us to avoid using recordX externally anad makes the package less confusing.
//
// =============================================================================
// =============================================================================
// IMPORTANT: DO NOT REMOVE THIS COMMENT.
// NOTICE: PLEASE MAKE SURE YOU UPDATE BOTH THE DOCS AND THE GRAFANA DASHBOARD (IF NEEDED) AFTER UPDATING METRICS.
// =============================================================================
// =============================================================================
//
//go:generate go run github.com/vburenin/ifacemaker -f otel.go -s otelRecorder -i iOtelRecorder -p listener -o otel_generated.go -c "autogenerated file"
type otelRecorder struct {
metrics metrics.Handler
// meter is the metrics meter.
meter metric.Meter
// lastBlockGauge is the gauge for the last block.
lastBlockGauge metric.Int64ObservableGauge
// lastFetchedBlockAgeGauge is the gauge for the last block age.
lastFetchedBlockAgeGauge metric.Float64ObservableGauge
// lastBlock is the last block processed by the listener.
lastBlock *uint64
// lastBlockFetchTime is the time the last block was fetched (used to calculate last block age).
lastBlockFetchTime *time.Time
// chainID is the chain ID for the listener.
chainID int
// listenerName is the name of the listener.
listenerName string
}
func newOtelRecorder(meterHandler metrics.Handler, chainID int, name string) (_ iOtelRecorder, err error) {
or := otelRecorder{
metrics: meterHandler,
meter: meterHandler.Meter(meterName),
lastBlock: nil,
lastBlockFetchTime: nil,
chainID: chainID,
listenerName: name,
}
or.lastBlockGauge, err = or.meter.Int64ObservableGauge("last_block")
if err != nil {
return nil, fmt.Errorf("could not create last block gauge")
}
or.lastFetchedBlockAgeGauge, err = or.meter.Float64ObservableGauge("last_block_age")
if err != nil {
return nil, fmt.Errorf("could not create last block age gauge")
}
_, err = or.meter.RegisterCallback(or.recordLastBlock, or.lastBlockGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for last block gauge")
}
_, err = or.meter.RegisterCallback(or.recordLastFetchedBlockAge, or.lastFetchedBlockAgeGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for last block age gauge")
}
return &or, nil
}
func (o *otelRecorder) recordLastBlock(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.lastBlockGauge == nil || o.lastBlock == nil {
return nil
}
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, o.chainID),
attribute.String("listener_name", o.listenerName),
)
observer.ObserveInt64(o.lastBlockGauge, int64(*o.lastBlock), opts)
return nil
}
func (o *otelRecorder) recordLastFetchedBlockAge(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.lastFetchedBlockAgeGauge == nil || o.lastBlockFetchTime == nil {
return nil
}
age := time.Since(*o.lastBlockFetchTime).Seconds()
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, o.chainID),
attribute.String("listener_name", o.listenerName),
)
observer.ObserveFloat64(o.lastFetchedBlockAgeGauge, age, opts)
return nil
}
// RecordLastBlock records the last block processed by the listener.
func (o *otelRecorder) RecordLastBlock(lastBlock uint64) {
// verify if the last block has changed
var hasChanged bool
if o.lastBlock == nil {
hasChanged = true
} else {
hasChanged = *o.lastBlock != lastBlock
}
if !hasChanged {
return
}
// record the block
o.lastBlock = &lastBlock
fetchTime := time.Now()
o.lastBlockFetchTime = &fetchTime
}