synapsecns/sanguine

View on GitHub
services/scribe/service/indexer/indexer.go

Summary

Maintainability
D
2 days
Test Coverage
package indexer

import (
    "context"
    "errors"
    "fmt"
    "github.com/synapsecns/sanguine/services/scribe/backend"
    scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"

    "github.com/synapsecns/sanguine/services/scribe/logger"
    "math/big"
    "time"

    "github.com/lmittmann/w3"
    "github.com/lmittmann/w3/module/eth"
    "github.com/lmittmann/w3/w3types"
    "github.com/synapsecns/sanguine/core/mapmutex"
    "github.com/synapsecns/sanguine/core/metrics"
    "go.opentelemetry.io/otel/attribute"
    otelMetrics "go.opentelemetry.io/otel/metric"
    "go.opentelemetry.io/otel/trace"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    lru "github.com/hashicorp/golang-lru"
    "github.com/jpillora/backoff"
    "github.com/synapsecns/sanguine/services/scribe/config"
    "github.com/synapsecns/sanguine/services/scribe/db"
    "golang.org/x/sync/errgroup"
)

// Indexer is a backfiller that fetches logs for a specific contract.
type Indexer struct {
    // indexerConfig holds all the metadata needed for logging and indexing.
    indexerConfig scribeTypes.IndexerConfig
    // eventDB is the database to store event data in.
    eventDB db.EventDB
    // client is the client for filtering.
    client []backend.ScribeBackend
    // cache is a cache for txHashes.
    cache *lru.Cache
    // mux is the mutex used to prevent double inserting logs from the same tx
    mux mapmutex.StringerMapMutex
    // handler is the metrics handler for the scribe.
    handler metrics.Handler
    // blockMeter is an otel historgram for doing metrics on block heights by chain
    blockMeter otelMetrics.Int64Histogram
    // refreshRate is the rate at which the indexer will refresh when livefilling.
    refreshRate uint64
    // toHead is a boolean signifying if the indexer is livefilling to the head.
    toHead bool
    // isBackfill is a boolean signifying if the indexer is backfilling (prevents last indexed from running)
    isBackfill bool
}

// retryTolerance is the number of times to retry a failed operation before rerunning the entire Backfill function.
const retryTolerance = 20

// txNotSupportedError is for handling the legacy Arbitrum tx type.
const txNotSupportedError = "transaction type not supported"

// invalidTxVRSError is for handling Aurora VRS error.
const invalidTxVRSError = "invalid transaction v, r, s values"

// txNotFoundError is for handling omniRPC errors for BSx.
const txNotFoundError = "not found"

// txData returns the transaction data for a given transaction hash.
type txData struct {
    receipt     types.Receipt
    transaction types.Transaction
    blockHeader types.Header
    success     bool
}

// errNoContinue indicates an error that is not recoverable, and should not be retried.
var errNoContinue = errors.New("encountered unreconcilable error, will not attempt to store tx")

// errNoTx indicates a tx cannot be parsed, this is only returned when the tx doesn't match our data model.
var errNoTx = errors.New("tx is not supported by the client")

// NewIndexer creates a new backfiller for a contract.
func NewIndexer(chainConfig config.ChainConfig, addresses []common.Address, eventDB db.EventDB, client []backend.ScribeBackend, handler metrics.Handler, blockMeter otelMetrics.Int64Histogram, toHead bool) (*Indexer, error) {
    cache, err := lru.New(500)
    if err != nil {
        return nil, fmt.Errorf("could not initialize cache: %w", err)
    }

    refreshRate := uint64(1)
    if len(addresses) > 1 || len(addresses) == 0 { // livefill settings
        chainConfig.GetLogsRange = chainConfig.LivefillRange
        chainConfig.GetLogsBatchAmount = 1
        chainConfig.StoreConcurrency = 1
        chainConfig.ConcurrencyThreshold = 10000
    } else {
        for i := range chainConfig.Contracts { // get the refresh rate for the contract
            contract := chainConfig.Contracts[i]
            // Refresh rate for more than one contract is 1 second, the refresh rate set in the config is used when it is the only contract.
            if contract.Address == addresses[0].String() && contract.RefreshRate > 0 {
                refreshRate = contract.RefreshRate
                break
            }
        }
    }

    indexerConfig := scribeTypes.IndexerConfig{
        Addresses:            addresses,
        GetLogsRange:         chainConfig.GetLogsRange,
        GetLogsBatchAmount:   chainConfig.GetLogsBatchAmount,
        StoreConcurrency:     chainConfig.StoreConcurrency,
        ChainID:              chainConfig.ChainID,
        ConcurrencyThreshold: chainConfig.ConcurrencyThreshold,
    }

    return &Indexer{
        indexerConfig: indexerConfig,
        eventDB:       eventDB,
        client:        client,
        cache:         cache,
        mux:           mapmutex.NewStringerMapMutex(),
        handler:       handler,
        blockMeter:    blockMeter,
        refreshRate:   refreshRate,
        toHead:        toHead,
        isBackfill:    false,
    }, nil
}

// UpdateAddress updates the address arrays for the indexer.
func (x *Indexer) UpdateAddress(addresses []common.Address) {
    x.indexerConfig.Addresses = addresses
}

// SetToBackfill sets the indexer to backfill (will not update last indexed).
func (x *Indexer) SetToBackfill() {
    x.isBackfill = true
}

// GetIndexerConfig returns the indexer config.
func (x *Indexer) GetIndexerConfig() scribeTypes.IndexerConfig {
    return x.indexerConfig
}

// RefreshRate returns the refresh rate for the indexer.
func (x *Indexer) RefreshRate() uint64 {
    return x.refreshRate
}

// Index retrieves logs, receipts, and transactions for a contract from a given range and does so in the following manner.
// 1. Get logs for the contract in chunks of batch requests.
// 2. Iterate through each log's Tx Hash and performs the following
//   - Get the receipt for each log and store it and all of its logs.
//   - Get the transaction for each log and store it.
//
//nolint:gocognit, cyclop
func (x *Indexer) Index(parentCtx context.Context, startHeight uint64, endHeight uint64) (err error) {
    ctx, span := x.handler.Tracer().Start(parentCtx, "contract.Backfill", trace.WithAttributes(
        attribute.Int("chain", int(x.indexerConfig.ChainID)),
        attribute.String("address", x.addressesToString(x.indexerConfig.Addresses)),
        attribute.Int("start", int(startHeight)),
        attribute.Int("end", int(endHeight)),
    ))

    defer func() {
        metrics.EndSpanWithErr(span, err)
    }()

    g, groupCtx := errgroup.WithContext(ctx)

    // For logging
    x.indexerConfig.StartHeight = startHeight
    x.indexerConfig.EndHeight = endHeight

    // Start fetching logs
    logFetcher := NewLogFetcher(x.client[0], big.NewInt(int64(startHeight)), big.NewInt(int64(endHeight)), &x.indexerConfig, true)
    logsChan := *logFetcher.GetFetchedLogsChan()
    g.Go(func() error {
        return logFetcher.Start(groupCtx)
    })
    // Reads from the local logsChan and stores the logs and associated receipts / txs.
    g.Go(func() error {
        concurrentCalls := 0
        lastBlockSeen := uint64(0)
        gS, storeCtx := errgroup.WithContext(ctx)
        // could change this to for - range
        for {
            select {
            case <-groupCtx.Done():
                logger.ReportIndexerError(groupCtx.Err(), x.indexerConfig, logger.ContextCancelled)
                return fmt.Errorf("context canceled while storing and retrieving logs: %w", groupCtx.Err())
            case log, ok := <-logsChan: // empty log passed when ok is false.
                if !ok {
                    return nil
                }
                concurrentCalls++
                gS.Go(func() error {
                    // another goroutine is already storing this receipt
                    locker, ok := x.mux.TryLock(log.TxHash)
                    if !ok {
                        return nil
                    }
                    defer locker.Unlock()

                    // Check if the txHash has already been stored in the cache.
                    if _, ok := x.cache.Get(log.TxHash); ok {
                        return nil
                    }

                    err := x.store(storeCtx, log)
                    if err != nil {
                        logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)

                        return fmt.Errorf("could not store log: %w", err)
                    }

                    return nil
                })

                // Checks if:
                // 1. The number of concurrent calls is greater than the concurrency threshold.
                // 2. The indexer's distance from the chaintip is within the concurrency ending threshold.
                // If so, all the go routines are waited on and the last indexed block is stored.
                if concurrentCalls >= x.indexerConfig.StoreConcurrency || x.indexerConfig.ConcurrencyThreshold > endHeight-log.BlockNumber {
                    if err = gS.Wait(); err != nil {
                        return fmt.Errorf("error waiting for go routines: %w", err)
                    }

                    // reset group context and concurrent calls
                    gS, storeCtx = errgroup.WithContext(ctx)
                    concurrentCalls = 0

                    // Only update last indexed if all logs from the last block have been processed to prevent premature
                    // updates of last indexed. Prevents having to lag a block behind on downstream dependencies (agents).
                    if lastBlockSeen < log.BlockNumber {
                        err = x.saveLastIndexed(storeCtx, lastBlockSeen)
                        if err != nil {
                            logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
                            return fmt.Errorf("could not store last indexed: %w", err)
                        }
                        lastBlockSeen = log.BlockNumber
                    }

                    x.blockMeter.Record(ctx, int64(log.BlockNumber), otelMetrics.WithAttributeSet(
                        attribute.NewSet(attribute.Int64("start_block", int64(startHeight)), attribute.Int64("chain_id", int64(x.indexerConfig.ChainID)))),
                    )
                }
            }
        }
    })

    err = g.Wait()

    if err != nil {
        return fmt.Errorf("could not backfill contract: %w \nChain: %d\nLog 's Contract Address: %s\n ", err, x.indexerConfig.ChainID, x.indexerConfig.Addresses)
    }

    err = x.saveLastIndexed(ctx, endHeight)
    if err != nil {
        logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
        return fmt.Errorf("could not store last indexed: %w", err)
    }

    x.blockMeter.Record(ctx, int64(endHeight), otelMetrics.WithAttributeSet(
        attribute.NewSet(attribute.Int64("start_block", int64(startHeight)), attribute.Int64("chain_id", int64(x.indexerConfig.ChainID)))),
    )

    return nil
}

// TODO split two goroutines into sep functions for maintainability
// store stores the logs, receipts, and transactions for a tx hash.
//
//nolint:cyclop,gocognit,maintidx
func (x *Indexer) store(parentCtx context.Context, log types.Log) (err error) {
    ctx, span := x.handler.Tracer().Start(parentCtx, "store", trace.WithAttributes(
        attribute.String("contract", x.addressesToString(x.indexerConfig.Addresses)),
        attribute.String("tx", log.TxHash.Hex()),
        attribute.String("block", fmt.Sprintf("%d", log.BlockNumber)),
    ))

    defer func() {
        metrics.EndSpanWithErr(span, err)
    }()

    b := &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    3 * time.Millisecond,
        Max:    2 * time.Second,
    }

    timeout := time.Duration(0)
    tryCount := 0

    var tx *txData
    hasTX := true

OUTER:
    for {
        select {
        case <-ctx.Done():
            return fmt.Errorf("context canceled while storing logs/receipts: %w", ctx.Err())
        case <-time.After(timeout):
            tryCount++

            tx, err = x.fetchEventData(ctx, log.TxHash, log.BlockNumber)
            if err != nil {
                if errors.Is(err, errNoContinue) {
                    logger.ReportIndexerError(err, x.indexerConfig, logger.GetTxError)
                    return nil
                }

                if errors.Is(err, errNoTx) {
                    logger.ReportIndexerError(err, x.indexerConfig, logger.GetTxError)
                    hasTX = false
                    break OUTER
                }

                if tryCount > retryTolerance {
                    return fmt.Errorf("retry tolerance exceeded: %w", err)
                }

                timeout = b.Duration()
                continue
            }

            break OUTER
        }
    }

    g, groupCtx := errgroup.WithContext(ctx)
    g.Go(func() error {
        // Store receipt in the EventDB.
        if x.toHead {
            err = x.eventDB.StoreReceiptAtHead(groupCtx, x.indexerConfig.ChainID, tx.receipt)
        } else {
            err = x.eventDB.StoreReceipt(groupCtx, x.indexerConfig.ChainID, tx.receipt)
        }
        if err != nil {
            return fmt.Errorf("could not store receipt: %w", err)
        }
        return nil
    })

    if hasTX {
        g.Go(func() error {
            if x.toHead {
                err = x.eventDB.StoreEthTxAtHead(groupCtx, &tx.transaction, x.indexerConfig.ChainID, log.BlockHash, log.BlockNumber, uint64(log.TxIndex))
            } else {
                err = x.eventDB.StoreEthTx(groupCtx, &tx.transaction, x.indexerConfig.ChainID, log.BlockHash, log.BlockNumber, uint64(log.TxIndex))
            }
            if err != nil {
                return fmt.Errorf("could not store tx: %w", err)
            }
            return nil
        })
    }

    g.Go(func() error {
        logs, err := x.prunedReceiptLogs(tx.receipt)
        if err != nil {
            return err
        }
        if x.toHead {
            err = x.eventDB.StoreLogsAtHead(groupCtx, x.indexerConfig.ChainID, logs...)
        } else {
            err = x.eventDB.StoreLogs(groupCtx, x.indexerConfig.ChainID, logs...)
        }
        if err != nil {
            return fmt.Errorf("could not store receipt logs: %w", err)
        }

        return nil
    })

    g.Go(func() error {
        err := x.eventDB.StoreBlockTime(groupCtx, x.indexerConfig.ChainID, tx.blockHeader.Number.Uint64(), tx.blockHeader.Time)
        if err != nil {
            return fmt.Errorf("could not store receipt logs: %w", err)
        }
        return nil
    })

    err = g.Wait()
    if err != nil {
        return fmt.Errorf("could not store data: %w\n%s on chain %d from %d to %s", err, x.addressesToString(x.indexerConfig.Addresses), x.indexerConfig.ChainID, log.BlockNumber, log.TxHash.String())
    }

    x.cache.Add(log.TxHash, true)
    return nil
}

// prunedReceiptLogs gets all logs from a receipt and prunes null logs.
func (x *Indexer) prunedReceiptLogs(receipt types.Receipt) (logs []types.Log, err error) {
    for i := range receipt.Logs {
        log := receipt.Logs[i]
        if log == nil {
            return nil, fmt.Errorf("log is nil\nChain: %d\nTxHash: %s\nLog BlockNumber: %d\nLog 's Contract Address: %s\nContract Address: %s", x.indexerConfig.ChainID, log.TxHash.String(), log.BlockNumber, log.Address.String(), x.addressesToString(x.indexerConfig.Addresses))
        }
        logs = append(logs, *log)
    }
    return logs, nil
}

// fetchEventData tries to fetch a transaction from the cache, if it's not there it tries to fetch it from the database.
// nolint: cyclop
func (x *Indexer) fetchEventData(parentCtx context.Context, txhash common.Hash, blockNumber uint64) (tx *txData, err error) {
    ctx, span := x.handler.Tracer().Start(parentCtx, "fetchEventData", trace.WithAttributes(
        attribute.String("tx", txhash.Hex()),
        attribute.String("block", fmt.Sprintf("%d", blockNumber)),
    ))

    defer func() {
        metrics.EndSpanWithErr(span, err)
    }()

OUTER:
    // increasing this across more clients puts too much load on the server, results in failed requests. TODO investigate
    for i := range x.client[0:1] {
        tx = &txData{}

        calls := make([]w3types.Caller, 3)

        // setup referencable indexes so we can access errors from the calls
        const (
            receiptIndex = 0
            txIndex      = 1
            headerIndex  = 2
        )

        // get the transaction receipt
        calls[receiptIndex] = eth.TxReceipt(txhash).Returns(&tx.receipt)

        // get the raw transaction
        calls[txIndex] = eth.Tx(txhash).Returns(&tx.transaction)

        // get the block number
        calls[headerIndex] = eth.HeaderByNumber(new(big.Int).SetUint64(blockNumber)).Returns(&tx.blockHeader)

        //nolint: nestif
        if err := x.client[i].BatchWithContext(ctx, calls...); err != nil {
            //nolint: errorlint
            callErr, ok := err.(w3.CallErrors)
            if !ok {
                return nil, fmt.Errorf("could not parse errors: %w", err)
            }

            if callErr[receiptIndex] != nil {
                if callErr[receiptIndex].Error() == txNotFoundError {
                    logger.ReportIndexerError(fmt.Errorf(txNotFoundError), x.indexerConfig, logger.GetTxError)
                    continue OUTER
                }
            }

            if callErr[txIndex] != nil {
                switch callErr[txIndex].Error() {
                case txNotSupportedError:
                    logger.ReportIndexerError(fmt.Errorf(txNotSupportedError), x.indexerConfig, logger.GetTxError)
                    return tx, errNoTx
                case invalidTxVRSError:
                    logger.ReportIndexerError(fmt.Errorf(invalidTxVRSError), x.indexerConfig, logger.GetTxError)
                    return tx, errNoTx
                case txNotFoundError:
                    logger.ReportIndexerError(fmt.Errorf(txNotFoundError), x.indexerConfig, logger.GetTxError)
                    continue OUTER
                }
            }

            return nil, fmt.Errorf("could not get tx receipt: %w", err)
        }

        tx.success = true
    }

    if tx == nil || !tx.success {
        return nil, fmt.Errorf("could not get tx data: %w", err)
    }

    return tx, nil
}

// addressesToString is a helper function for logging events.
func (x *Indexer) addressesToString(addresses []common.Address) string {
    var output string
    for i := range addresses {
        if i == 0 {
            output = addresses[i].String()
        } else {
            output = output + "," + addresses[i].String()
        }
    }
    return output
}

func (x *Indexer) saveLastIndexed(parentCtx context.Context, blockNumber uint64) error {
    if !x.isBackfill {
        var err error
        var errMessage string
        if x.toHead {
            err = x.eventDB.StoreLastIndexed(parentCtx, common.Address{}, x.indexerConfig.ChainID, blockNumber, scribeTypes.LivefillAtHead)
            errMessage = "could not store last indexed block while livefilling at head"
        } else {
            err = x.eventDB.StoreLastIndexedMultiple(parentCtx, x.indexerConfig.Addresses, x.indexerConfig.ChainID, blockNumber)
            errMessage = "could not store last indexed blocks"
        }
        if err != nil {
            logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
            return fmt.Errorf("%s: %w", errMessage, err)
        }
    }
    return nil
}