synapsecns/sanguine

View on GitHub
services/omnirpc/chainmanager/manager.go

Summary

Maintainability
A
1 hr
Test Coverage
package chainmanager

import (
    "context"
    "fmt"
    "github.com/ipfs/go-log"
    "github.com/synapsecns/sanguine/core/metrics"
    "github.com/synapsecns/sanguine/services/omnirpc/config"
    "github.com/synapsecns/sanguine/services/omnirpc/rpcinfo"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/metric"
    "sort"
    "sync"
    "time"
)

var logger = log.Logger("chainmanager")

// rpcTimeout is how long to wait for a response.
const rpcTimeout = time.Second * 5

// ChainManager manages chain context.
type ChainManager interface {
    // GetChainIDs gets all chainids
    GetChainIDs() (chainIDs []uint32)
    // RefreshRPCInfo refreshes rpc info for a chain id
    RefreshRPCInfo(ctx context.Context, chainID uint32)
    // GetChain gets the chain
    GetChain(chainID uint32) Chain
    // PutChain adds chain urls. Any previous chain data is overwritten
    PutChain(chainID uint32, urls []string, confirmations uint16)
}

// NewChainManager creates a new chain manager.
func NewChainManager(handler metrics.Handler) ChainManager {
    return &chainManager{
        chainList: make(map[uint32]*chain),
        // mux is used to prevent parallel manipulations to the map
        mux: sync.RWMutex{},
        // handler is the metrics handler
        handler: handler,
    }
}

// NewChainManagerFromConfig creates a new chain manager.
func NewChainManagerFromConfig(configuration config.Config, handler metrics.Handler) ChainManager {
    cm := &chainManager{
        chainList: make(map[uint32]*chain),
        mux:       sync.RWMutex{},
        handler:   handler,
    }

    for chainID, chn := range configuration.Chains {
        // default the confirmation threshold to 1
        confThreshold := uint16(1)

        // if confirmation threshold is 1, set the checks to 1
        if chn.Checks > 0 {
            confThreshold = chn.Checks
        }

        // store all the chains w/ empty latency results
        chains := make([]rpcinfo.Result, len(chn.RPCs))
        for i := range chn.RPCs {
            chains[i] = rpcinfo.Result{
                URL: chn.RPCs[i],
            }
        }

        cm.chainList[chainID] = &chain{
            chainID:               chainID,
            confirmationThreshold: confThreshold,
            rpcs:                  chains,
        }
    }

    err := cm.setupMetrics()
    if err != nil {
        logger.Errorf("could not setup metrics: %v", err)
    }

    return cm
}

// chainManager contains a chain manager.
type chainManager struct {
    chainList map[uint32]*chain
    mux       sync.RWMutex
    handler   metrics.Handler
}

func (c *chainManager) GetChain(chainID uint32) Chain {
    c.mux.RLock()
    defer c.mux.RUnlock()

    res, ok := c.chainList[chainID]
    if !ok {
        return nil
    }

    return res
}

func (c *chainManager) GetChainIDs() (chainIDs []uint32) {
    c.mux.RLock()
    defer c.mux.RUnlock()

    chainIDs = make([]uint32, len(c.chainList))
    i := 0
    for chainID := range c.chainList {
        chainIDs[i] = chainID
        i++
    }
    return chainIDs
}

// PutChain puts new chain urls.
func (c *chainManager) PutChain(chainID uint32, urls []string, confirmations uint16) {
    rpcs := make([]rpcinfo.Result, len(urls))
    for i, url := range urls {
        rpcs[i] = rpcinfo.Result{
            URL: url,
        }
    }

    c.mux.Lock()
    defer c.mux.Unlock()

    c.chainList[chainID] = &chain{
        chainID:               chainID,
        rpcs:                  rpcs,
        confirmationThreshold: confirmations,
    }
}

// RefreshRPCInfo refreshes rpc info for a given chain id.
func (c *chainManager) RefreshRPCInfo(ctx context.Context, chainID uint32) {
    c.mux.RLock()
    chainList, ok := c.chainList[chainID]
    c.mux.RUnlock()

    // nothing to reorder
    if !ok {
        return
    }
    rpcURLS := chainList.URLs()

    rpcInfoList := sortInfoList(rpcinfo.GetRPCLatency(ctx, rpcTimeout, rpcURLS, c.handler))

    c.mux.Lock()
    c.chainList[chainID].rpcs = rpcInfoList
    c.mux.Unlock()
}

const (
    meter             = "github.com/synapsecns/sanguine/services/omnirpc/chainmanager"
    blockNumberMetric = "block_number"
    latencyMetric     = "latency"
    blockAgeMetric    = "block_age"
)

// records metrics for various rpcs. Should only be called once.
//
// note: because of missing support for  https://github.com/open-telemetry/opentelemetry-specification/issues/2318
// this is done from the struct rather than recorded at refresh time.
//
// in a future version, this should be a synchronous gauge.
func (c *chainManager) setupMetrics() error {
    meterMaid := c.handler.Meter(meter)
    blockGauge, err := meterMaid.Int64ObservableGauge(blockNumberMetric)
    if err != nil {
        return fmt.Errorf("could not create histogram: %w", err)
    }

    latencyGauge, err := meterMaid.Float64ObservableGauge(latencyMetric, metric.WithUnit("seconds"))
    if err != nil {
        return fmt.Errorf("could not create histogram: %w", err)
    }

    ageGauge, err := meterMaid.Float64ObservableGauge(blockAgeMetric, metric.WithUnit("seconds"))
    if err != nil {
        return fmt.Errorf("could not create histogram: %w", err)
    }

    if _, err := meterMaid.RegisterCallback(func(parentCtx context.Context, o metric.Observer) (err error) {
        c.mux.RLock()
        defer c.mux.RUnlock()

        for chainID, chainInfo := range c.chainList {
            for _, rpc := range chainInfo.rpcs {
                attributeSet := attribute.NewSet(attribute.Int64(metrics.ChainID, int64(chainID)), attribute.String("rpc_url", rpc.URL))

                if rpc.HasError {
                    continue
                }

                o.ObserveInt64(blockGauge, int64(rpc.BlockNumber), metric.WithAttributeSet(attributeSet))
                o.ObserveFloat64(latencyGauge, rpc.Latency.Seconds(), metric.WithAttributeSet(attributeSet))
                o.ObserveFloat64(ageGauge, rpc.BlockAge.Seconds(), metric.WithAttributeSet(attributeSet))
            }
        }

        return nil
    }, blockGauge, latencyGauge, ageGauge); err != nil {
        return fmt.Errorf("could not register callback for gauges: %w", err)
    }
    return nil
}

func sortInfoList(rpcInfoList []rpcinfo.Result) []rpcinfo.Result {
    sort.Slice(rpcInfoList, func(i, j int) bool {
        // ignore latencies with an error
        if rpcInfoList[i].HasError {
            return false
        }

        ageDifference := rpcInfoList[i].BlockAge - rpcInfoList[j].BlockAge
        if ageDifference == 0 {
            return rpcInfoList[i].Latency < rpcInfoList[j].Latency
        } else if ageDifference > 0 {
            return false
        }

        return true
    })

    return rpcInfoList
}

var _ ChainManager = &chainManager{}

// Chain contains the context for a single chain.
//
//go:generate go run github.com/vektra/mockery/v2 --name Chain --output ./mocks --case=underscore
type Chain interface {
    // ConfirmationsThreshold gets the confirmation count
    ConfirmationsThreshold() uint16
    // URLs gets the urls
    URLs() []string
    // ID returns the id of the chain
    ID() uint32
}

// chain contains the settings for a single chain.
type chain struct {
    // chainID is the chainid
    chainID uint32
    // confirmationThreshold is the confirmation threshold of the chain
    confirmationThreshold uint16
    // rpcs contains a list of rpcs sorted by speed
    rpcs []rpcinfo.Result
}

func (c *chain) ID() uint32 {
    return c.chainID
}

func (c *chain) ConfirmationsThreshold() uint16 {
    return c.confirmationThreshold
}

// URLs gets all urls for a chain.
func (c *chain) URLs() (res []string) {
    res = make([]string, len(c.rpcs))
    for i, chainInfo := range c.rpcs {
        res[i] = chainInfo.URL
    }
    return res
}

var _ Chain = &chain{}