synapsecns/sanguine

View on GitHub
ethergo/chain/chainwatcher/height.go

Summary

Maintainability
A
45 mins
Test Coverage
package chainwatcher

import (
    "context"
    "fmt"
    "time"

    "github.com/pkg/errors"
)

// BlockSubscriberClient defines a method for getting a subscription to the chain-tip height on geth based rpc clients.
type BlockSubscriberClient interface {
    // LatestHeight gets the latest height block. An error is handled by the subscriber
    LatestHeight(ctx context.Context) (uint64, error)
}

// ConditionCheck is a function passed in by the caller that checks the block height against
// their condition and returns a bool indicating whether or not the context should be canceled.
type ConditionCheck func(blockHeight uint64) bool

// BlockHeightWatcher creates a subscription to the block height for geth based chains.
// it uses the observer pattern to allow many subscribers.
//
//go:generate go run github.com/vektra/mockery/v2 --name BlockHeightWatcher --output ./mocks --case=underscore
type BlockHeightWatcher interface {
    // Subscribe creates a new block height subscriber.
    Subscribe() <-chan uint64
    // Unsubscribe removes a block height subscriber.
    Unsubscribe(ch <-chan uint64)
}

type blockHeightWatcherImpl struct {
    *BlockBroadcaster
    // ctx is the master context of the block height subscriber. If this context
    // is canceled all child subscriptions will be canceled
    //nolint: containedctx
    ctx context.Context
    // BlockSubscriberClient is a client used to get/subscribe to the block client
    BlockSubscriberClient
}

// HeightCounterMetricName is the name of the height counter metric.
const HeightCounterMetricName = "height_counter"

// NewBlockHeightWatcher creates a new block height subscriber. This creates a channel for getting the latest heights from
// a subscription and attempts to reconnect on disconnect.
func NewBlockHeightWatcher(ctx context.Context, chainID uint64, reader BlockSubscriberClient) BlockHeightWatcher {
    bls := &blockHeightWatcherImpl{
        ctx:                   ctx,
        BlockSubscriberClient: reader,
        BlockBroadcaster:      NewBlockBroadcaster(ctx, chainID),
    }
    go func() {
        for {
            // TODO this needs to have much better retries
            blocks, err := bls.startBlockSubscriber()
            if err != nil {
                // TODO handle fatal case here + backoff
                continue
            }

            for {
                select {
                case block := <-blocks:
                    logger.Debugf("got new block %d on chain %d", block, chainID)
                    bls.Emit(block)
                case <-ctx.Done():
                    logger.Debugf("context ended on chain: %d", chainID)
                    return
                }
            }
        }
    }()
    return bls
}

// UpdateHeight updates the block height and sends any heights
// in between last height and new height to the channel.
// as a reminder: BlockBroadcaster is a counter - heights go up, not down.
// UpdateHeight follows that interface.
func (b *BlockBroadcaster) UpdateHeight(newHeight uint64) {
    b.lastHeightMux.Lock()
    // if height == last height we don't need to do anything
    if newHeight > b.lastHeight {
        // on the first set, don't use 0. Use 1 height lower than this
        // so we don't add every height from 0 to new height
        if b.firstSet {
            b.firstSet = false
            b.lastHeight = newHeight - 1
        }

        // iterate the height + any other heights received in the mean time and add to the channel
        for i := b.lastHeight + 1; i <= newHeight; i++ {
            b.lastHeight = i
            // add new heights to the block chan
            b.blockChan <- newHeight
        }
    }
    b.lastHeightMux.Unlock()
}

// PollInterval is how often to poll. This is exported for testing.
var PollInterval = time.Second * 5

// Subscribe subscribes to new block heights. The first height (current height) is sent immediately.
//
//nolint:cyclop
func (b *blockHeightWatcherImpl) startBlockSubscriber() (<-chan uint64, error) {
    acquired := b.subscriberMux.TryAcquire(1)
    if !acquired {
        return nil, errors.New("only one Subscribe() can be used per blockHeightWatcherImpl")
    }

    subscribe := func() {
        ticker := time.NewTicker(PollInterval)
        for {
            select {
            case <-b.ctx.Done():
                return
            case <-ticker.C:
                ctx, cancel := context.WithTimeout(b.ctx, time.Second*30)

                currentHeight, err := b.LatestHeight(ctx)
                if err != nil {
                    logger.Warnf("could not create subscription to new blocks: %v on chain %d", err, b.chainID)
                    cancel()
                    return
                }

                b.UpdateHeight(currentHeight)
                cancel()
            }
        }
    }

    // get the most recent height
    latest, err := b.LatestHeight(b.ctx)
    if err != nil {
        return nil, fmt.Errorf("could not get most recent latest height: %w", err)
    }

    // send the initial latest height to the channel
    b.lastHeight = latest

    go func() {
        defer b.subscriberMux.Release(1)

        select {
        case <-b.ctx.Done():
            return
        case b.blockChan <- b.lastHeight:
            break
        }

        for {
            select {
            case <-b.ctx.Done():
                return
            default:
                // will latest until we have an error and try to restart, unless of course we have another error.
                subscribe()
            }
        }
    }()

    return b.blockChan, err
}