synapsecns/sanguine

View on GitHub
services/scribe/service/chain.go

Summary

Maintainability
B
6 hrs
Test Coverage
package service

import (
    "context"
    "fmt"
    "github.com/synapsecns/sanguine/services/scribe/backend"
    "github.com/synapsecns/sanguine/services/scribe/logger"
    "github.com/synapsecns/sanguine/services/scribe/service/indexer"
    scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"
    "math/big"
    "os"

    "math"
    "time"

    "github.com/ethereum/go-ethereum/common"

    "github.com/synapsecns/sanguine/core/metrics"
    "go.opentelemetry.io/otel/metric"

    "github.com/jpillora/backoff"
    "github.com/synapsecns/sanguine/services/scribe/config"
    "github.com/synapsecns/sanguine/services/scribe/db"
    "golang.org/x/sync/errgroup"
)

// ChainIndexer is an indexer that fetches logs for a chain. It aggregates logs
// from a slice of ContractIndexers.
type ChainIndexer struct {
    // chainID is the chain ID of the chain.
    chainID uint32
    // eventDB is the database to store event data in.
    eventDB db.EventDB
    // client contains the clients used for indexing.
    client []backend.ScribeBackend
    // chainConfig is the config for the indexer.
    chainConfig config.ChainConfig
    // handler is the metrics handler for the scribe.
    handler metrics.Handler
    // blockHeightMeters is a map from address -> meter for block height.
    blockHeightMeters map[common.Address]metric.Int64Histogram
    // livefillContracts is a map from address -> livefill contract.
    livefillContracts []config.ContractConfig
    // readyForLivefill is a chan
    readyForLivefill chan config.ContractConfig
}

// Used for handling logging of various context types.
type contextKey int

const maxBackoff = uint64(10)

const (
    chainContextKey contextKey = iota
)

// NewChainIndexer creates a new indexer for a chain. This is done by passing through all the function parameters
// into the ChainIndexer struct, as well as iterating through all the contracts in the chain config & creating
// ContractIndexers for each contract.
func NewChainIndexer(eventDB db.EventDB, client []backend.ScribeBackend, chainConfig config.ChainConfig, handler metrics.Handler) (*ChainIndexer, error) {
    if chainConfig.GetLogsRange == 0 {
        chainConfig.GetLogsRange = 600
    }

    if chainConfig.GetLogsBatchAmount == 0 {
        chainConfig.GetLogsBatchAmount = 2
    }

    if chainConfig.StoreConcurrency == 0 {
        chainConfig.StoreConcurrency = 20
    }

    if chainConfig.ConcurrencyThreshold == 0 {
        chainConfig.ConcurrencyThreshold = 50000
    }
    if chainConfig.LivefillRange == 0 {
        chainConfig.LivefillRange = 100
    }

    if chainConfig.LivefillFlushInterval == 0 {
        chainConfig.LivefillFlushInterval = 10800
    }

    blockHeightMeterMap := make(map[common.Address]metric.Int64Histogram)
    for _, contract := range chainConfig.Contracts {
        blockHeightMeter, err := handler.Metrics().NewHistogram(fmt.Sprintf("scribe_block_meter_%d_%s", chainConfig.ChainID, contract.Address), "block_histogram", "a block height meter", "blocks")
        if err != nil {
            return nil, fmt.Errorf("error creating otel histogram %w", err)
        }
        blockHeightMeterMap[common.HexToAddress(contract.Address)] = blockHeightMeter
    }

    return &ChainIndexer{
        chainID:           chainConfig.ChainID,
        eventDB:           eventDB,
        client:            client,
        blockHeightMeters: blockHeightMeterMap,
        chainConfig:       chainConfig,
        handler:           handler,
        readyForLivefill:  make(chan config.ContractConfig),
    }, nil
}

// Index iterates over each contract indexer and calls Index concurrently on each one.
// If `onlyOneBlock` is true, the indexer will only index the block at `currentBlock`.
//
//nolint:gocognit,cyclop,unparam
func (c *ChainIndexer) Index(parentContext context.Context) error {
    indexGroup, indexCtx := errgroup.WithContext(parentContext)

    latestBlock, err := c.getLatestBlock(indexCtx, scribeTypes.IndexingConfirmed)
    if err != nil {
        return fmt.Errorf("could not get current block number while indexing: %w", err)
    }

    var contractAddresses []common.Address
    for i := range c.chainConfig.Contracts {
        contractAddresses = append(contractAddresses, common.HexToAddress(c.chainConfig.Contracts[i].Address))
    }

    // Gets all last indexed infos for the contracts on the current chain to determine which contracts need to be initially livefilled.
    lastIndexedMap, err := c.eventDB.RetrieveLastIndexedMultiple(parentContext, contractAddresses, c.chainConfig.ChainID)
    if err != nil {
        return fmt.Errorf("could not get last indexed map: %w", err)
    }

    for j := range c.chainConfig.Contracts {
        contract := c.chainConfig.Contracts[j]
        contractAddress := common.HexToAddress(contract.Address)
        lastIndexed := lastIndexedMap[contractAddress]

        // Does not consider if the config's start block is within the livefill threshold for simplicity.
        // In this case, an indexer will bring the contract to head, and it will be passed to livefill.
        // If there is no last indexed info for the contract, it will not be passed to livefill.
        if *latestBlock-c.chainConfig.LivefillThreshold > lastIndexed && lastIndexed > 0 {
            c.livefillContracts = append(c.livefillContracts, contract)
            continue
        }

        // If current contract is not within the livefill threshold, start an indexer for it.
        contractIndexer, err := indexer.NewIndexer(c.chainConfig, []common.Address{contractAddress}, c.eventDB, c.client, c.handler, c.blockHeightMeters[contractAddress], scribeTypes.IndexingConfirmed)
        if err != nil {
            return fmt.Errorf("could not create contract indexer: %w", err)
        }

        // Check if a explicit backfill range has been set.
        var configEnd *uint64
        if contract.EndBlock > contract.StartBlock {
            configEnd = &contract.EndBlock
        }

        indexGroup.Go(func() error {
            err := c.IndexToBlock(indexCtx, contract.StartBlock, configEnd, contractIndexer)
            if err != nil {
                return fmt.Errorf("could not index to livefill: %w", err)
            }
            c.readyForLivefill <- contract

            // TODO make sure metrics are killed when indexing is done
            return nil
        })
    }

    // Livefill contracts that are within the livefill threshold and before the confirmation threshold.
    indexGroup.Go(func() error {
        return c.livefill(indexCtx)
    })

    // Index unconfirmed events to the head.
    if c.chainConfig.Confirmations > 0 {
        indexGroup.Go(func() error {
            return c.livefillAtHead(indexCtx)
        })
    }

    if err := indexGroup.Wait(); err != nil {
        return fmt.Errorf("could not index: %w", err)
    }
    return nil // This shouldn't really ever be hit.
}

// nolint:unparam
func (c *ChainIndexer) getLatestBlock(ctx context.Context, indexingUnconfirmed bool) (*uint64, error) {
    var currentBlock uint64
    var err error
    b := createBackoff()
    timeout := time.Duration(0)
    for {
        select {
        case <-ctx.Done():

            return nil, fmt.Errorf("%s context canceled: %w", ctx.Value(chainContextKey), ctx.Err())
        case <-time.After(timeout):
            currentBlock, err = c.client[0].BlockNumber(ctx)

            if err != nil {
                timeout = b.Duration()
                logger.ReportScribeError(err, c.chainID, logger.GetBlockError)
                continue
            }
            if !indexingUnconfirmed {
                currentBlock -= c.chainConfig.Confirmations
            }
        }

        return &currentBlock, nil
    }
}

// IndexToBlock takes a contract indexer and indexes a contract up until it reaches the livefill threshold. This function should be generally used for calling a indexer with a single contract.
func (c *ChainIndexer) IndexToBlock(parentContext context.Context, configStart uint64, configEnd *uint64, indexer *indexer.Indexer) error {
    timeout := time.Duration(0)
    b := createBackoff()
    for {
        select {
        case <-parentContext.Done():
            logger.ReportIndexerError(fmt.Errorf("context canceled in index to block"), indexer.GetIndexerConfig(), logger.BackfillIndexerError)
            return fmt.Errorf("%s chain context canceled: %w", parentContext.Value(chainContextKey), parentContext.Err())
        case <-time.After(timeout):
            indexerConfig := indexer.GetIndexerConfig()

            logger.ReportScribeState(indexerConfig.ChainID, 0, indexerConfig.Addresses, logger.BeginBackfillIndexing)

            var endHeight uint64
            var err error
            startHeight, endHeight, err := c.getIndexingRange(parentContext, configStart, configEnd, indexer)
            if err != nil {
                timeout = b.Duration()
                logger.ReportIndexerError(err, indexer.GetIndexerConfig(), logger.BackfillIndexerError)
                continue
            }

            err = indexer.Index(parentContext, startHeight, endHeight)
            if err != nil {
                timeout = b.Duration()
                // if the config has set the contract to refresh at a slower rate than the timeout, use the refresh rate instead.
                if indexer.RefreshRate() > maxBackoff {
                    timeout = time.Duration(indexer.RefreshRate()) * time.Second
                }
                logger.ReportIndexerError(fmt.Errorf("error indexing, timeout %v, %w", timeout.Seconds(), err), indexer.GetIndexerConfig(), logger.BackfillIndexerError)
                continue
            }
            if configEnd != nil {
                logger.ReportScribeState(indexerConfig.ChainID, endHeight, indexerConfig.Addresses, logger.BackfillCompleted)
                return nil
            }

            livefillReady, err := c.isReadyForLivefill(parentContext, indexer)
            if err != nil {
                logger.ReportIndexerError(fmt.Errorf("could not get last indexed: %w", err), indexer.GetIndexerConfig(), logger.BackfillIndexerError)
                continue
            }
            if livefillReady {
                return nil
            }

            timeout = time.Duration(indexer.RefreshRate()) * time.Second
        }
    }
}

func getMinFromMap(inputMap map[common.Address]uint64) uint64 {
    minValue := uint64(math.MaxUint64)

    for i := range inputMap {
        if inputMap[i] < minValue {
            minValue = inputMap[i]
        }
    }

    return minValue
}

func getAddressesFromConfig(contractConfigs []config.ContractConfig) []common.Address {
    var addresses []common.Address
    for i := range contractConfigs {
        contract := common.HexToAddress(contractConfigs[i].Address)
        addresses = append(addresses, contract)
    }

    return addresses
}

func createBackoff() *backoff.Backoff {
    return &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    1 * time.Second,
        Max:    time.Duration(maxBackoff) * time.Second,
    }
}

func (c *ChainIndexer) isReadyForLivefill(parentContext context.Context, indexer *indexer.Indexer) (bool, error) {
    // get last indexed to check livefill threshold
    lastBlockIndexed, err := c.eventDB.RetrieveLastIndexed(parentContext, indexer.GetIndexerConfig().Addresses[0], c.chainConfig.ChainID, scribeTypes.IndexingConfirmed)
    if err != nil {
        return false, fmt.Errorf("could not get last indexed: %w", err)
    }
    endHeight, err := c.getLatestBlock(parentContext, scribeTypes.IndexingConfirmed)
    if err != nil {
        return false, fmt.Errorf("could not get current block number while indexing: %w", err)
    }
    return int64(lastBlockIndexed) >= int64(*endHeight)-int64(c.chainConfig.LivefillThreshold), nil
}

func (c *ChainIndexer) getIndexingRange(parentContext context.Context, configStart uint64, configEnd *uint64, indexer *indexer.Indexer) (uint64, uint64, error) {
    var endHeight uint64
    startHeight := configStart

    // If a range is set in the config, respect those values,
    if configEnd != nil {
        endHeight = *configEnd
        indexer.SetToBackfill()
        return startHeight, endHeight, nil
    }

    // otherwise, get the last indexed block and start from the last indexed block
    lastIndexed, err := c.eventDB.RetrieveLastIndexed(parentContext, indexer.GetIndexerConfig().Addresses[0], c.chainConfig.ChainID, scribeTypes.IndexingConfirmed)
    if err != nil {
        return 0, 0, fmt.Errorf("could not get last block indexed: %w", err)
    }
    if lastIndexed > startHeight {
        startHeight = lastIndexed + 1
    }
    latestBlock, err := c.getLatestBlock(parentContext, scribeTypes.IndexingConfirmed)
    if err != nil {
        return 0, 0, fmt.Errorf("could not get current block number while indexing: %w", err)
    }
    endHeight = *latestBlock

    // Check RPC flake
    if startHeight > endHeight {
        return startHeight, endHeight, fmt.Errorf("start height is greater than head block")
    }

    return startHeight, endHeight, nil
}

// LivefillAtHead stores data for all contracts all the way to the head in a separate table.
//
// nolint:cyclop
func (c *ChainIndexer) livefillAtHead(parentContext context.Context) error {
    timeout := time.Duration(0)
    b := createBackoff()
    addresses := getAddressesFromConfig(c.chainConfig.Contracts)
    tipLivefillBlockMeter, err := c.handler.Metrics().NewHistogram(fmt.Sprintf("scribe_block_meter_%d_tip_livefill", c.chainConfig.ChainID), "block_histogram", "a block height meter", "blocks")
    if err != nil {
        return fmt.Errorf("error creating otel histogram %w", err)
    }

    tipLivefillIndexer, err := indexer.NewIndexer(c.chainConfig, addresses, c.eventDB, c.client, c.handler, tipLivefillBlockMeter, true)
    if err != nil {
        return fmt.Errorf("could not create contract indexer: %w", err)
    }
    flushDuration := time.Duration(c.chainConfig.LivefillFlushInterval) * time.Second
    for {
        select {
        case <-parentContext.Done():
            logger.ReportScribeError(parentContext.Err(), c.chainID, logger.ContextCancelled)
            return fmt.Errorf("context canceled: %w", parentContext.Err())
        case <-time.After(flushDuration):
            logger.ReportScribeState(c.chainID, 0, addresses, logger.FlushingLivefillAtHead)
            deleteBefore := time.Now().Add(-flushDuration).UnixNano()
            err := c.eventDB.FlushFromHeadTables(parentContext, deleteBefore)
            if err != nil {
                return fmt.Errorf("could not flush logs from head: %w", err)
            }
        case <-time.After(timeout):

            endHeight, err := c.getLatestBlock(parentContext, scribeTypes.LivefillAtHead)
            if err != nil {
                logger.ReportIndexerError(err, tipLivefillIndexer.GetIndexerConfig(), logger.GetBlockError)
                timeout = b.Duration()
                continue
            }

            tipLivefillLastIndexed, err := c.eventDB.RetrieveLastIndexed(parentContext, common.BigToAddress(big.NewInt(0)), c.chainConfig.ChainID, scribeTypes.LivefillAtHead)
            if err != nil {
                logger.ReportIndexerError(err, tipLivefillIndexer.GetIndexerConfig(), logger.LivefillIndexerError)
                timeout = b.Duration()
                continue
            }
            startHeight := tipLivefillLastIndexed
            if startHeight == 0 {
                startHeight = *endHeight - c.chainConfig.Confirmations
            }

            // Check for RPC flake
            if startHeight > *endHeight {
                logger.ReportIndexerError(fmt.Errorf("start height is greater than head block"), tipLivefillIndexer.GetIndexerConfig(), logger.ErroneousHeadBlock)
                timeout = b.Duration()
                continue
            }

            err = tipLivefillIndexer.Index(parentContext, startHeight, *endHeight)
            if err != nil {
                timeout = b.Duration()
                logger.ReportIndexerError(err, tipLivefillIndexer.GetIndexerConfig(), logger.LivefillIndexerError)
                continue
            }

            // Default refresh rate for livefill to tip is 3 second.
            timeout = defaultTimeout
        }
    }
}

// nolint:cyclop
func (c *ChainIndexer) livefill(parentContext context.Context) error {
    timeout := time.Duration(0)
    b := createBackoff()
    livefillBlockMeter, err := c.handler.Metrics().NewHistogram(fmt.Sprintf("scribe_block_meter_%d_livefill", c.chainConfig.ChainID), "block_histogram", "a block height meter", "blocks")
    if err != nil {
        return fmt.Errorf("error creating otel histogram %w", err)
    }

    livefillIndexer, err := indexer.NewIndexer(c.chainConfig, getAddressesFromConfig(c.livefillContracts), c.eventDB, c.client, c.handler, livefillBlockMeter, scribeTypes.IndexingConfirmed)
    if err != nil {
        return fmt.Errorf("could not create contract indexer: %w", err)
    }
    for {
        select {
        case <-parentContext.Done():
            logger.ReportScribeError(parentContext.Err(), c.chainID, logger.ContextCancelled)
            return fmt.Errorf("%s chain context canceled: %w", parentContext.Value(chainContextKey), parentContext.Err())
        case newLivefillContract := <-c.readyForLivefill:
            c.livefillContracts = append(c.livefillContracts, newLivefillContract)
            // Update indexer's config to include new contract.
            livefillIndexer.UpdateAddress(getAddressesFromConfig(c.livefillContracts))
        case <-time.After(timeout):
            if len(c.livefillContracts) == 0 {
                timeout = b.Duration()
                continue
            }
            var endHeight *uint64
            var err error
            livefillLastIndexed, err := c.eventDB.RetrieveLastIndexedMultiple(parentContext, getAddressesFromConfig(c.livefillContracts), c.chainConfig.ChainID)
            if err != nil {
                logger.ReportIndexerError(err, livefillIndexer.GetIndexerConfig(), logger.LivefillIndexerError)
                timeout = b.Duration()
                continue
            }
            startHeight := getMinFromMap(livefillLastIndexed)

            endHeight, err = c.getLatestBlock(parentContext, scribeTypes.IndexingConfirmed)
            if err != nil {
                logger.ReportIndexerError(err, livefillIndexer.GetIndexerConfig(), logger.GetBlockError)
                timeout = b.Duration()
                continue
            }

            // Check for RPC flake
            if startHeight > *endHeight {
                logger.ReportIndexerError(fmt.Errorf("start height is greater than head block"), livefillIndexer.GetIndexerConfig(), logger.ErroneousHeadBlock)
                timeout = b.Duration()
                continue
            }

            // Don't reindex the head block.
            if startHeight == *endHeight {
                timeout = 1 * time.Second
                continue
            }

            err = livefillIndexer.Index(parentContext, startHeight, *endHeight)
            if err != nil {
                timeout = b.Duration()
                logger.ReportIndexerError(err, livefillIndexer.GetIndexerConfig(), logger.LivefillIndexerError)
                continue
            }

            // Default refresh rate for livefill is 1 second.
            // TODO add to config
            timeout = defaultTimeout
        }
    }
}

var defaultTimeout = 3 * time.Second

func init() {
    if os.Getenv("CI") != "" {
        defaultTimeout = 1 * time.Second
    }
}