synapsecns/sanguine

View on GitHub
services/scribe/service/indexer/fetcher.go

Summary

Maintainability
A
35 mins
Test Coverage
package indexer

import (
    "context"
    "fmt"
    "github.com/ethereum/go-ethereum/common"
    "github.com/synapsecns/sanguine/services/scribe/backend"
    "github.com/synapsecns/sanguine/services/scribe/logger"
    scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"
    "math/big"
    "time"

    "github.com/synapsecns/sanguine/ethergo/util"

    "github.com/ethereum/go-ethereum/core/types"

    "github.com/jpillora/backoff"
)

// LogFetcher pre-fetches filter logs into a channel in deterministic order.
type LogFetcher struct {
    // iterator is the chunk iterator used for the range.
    iterator util.ChunkIterator
    // for logging
    startBlock *big.Int
    // for logging
    endBlock *big.Int
    // fetchedLogsChan is a channel with the fetched chunks of logs.
    fetchedLogsChan chan types.Log
    // backend is the ethereum backend used to fetch logs.
    backend backend.ScribeBackend
    // indexerConfig holds the chain config (config data for the chain)
    indexerConfig *scribeTypes.IndexerConfig
    // topics is the list of topics to filter logs by.
    topics [][]common.Hash
    // bufferSize prevents from overloading the scribe indexer with too many logs as well as upstream RPCs with too many requests.
    bufferSize int
}

// NewLogFetcher creates a new filtering interface for a range of blocks. If reverse is not set, block heights are filtered from start->end.
func NewLogFetcher(backend backend.ScribeBackend, startBlock, endBlock *big.Int, indexerConfig *scribeTypes.IndexerConfig, ascending bool) *LogFetcher {
    // The ChunkIterator is inclusive of the start and ending block resulting in potentially confusing behavior when
    // setting the range size in the config. For example, setting a range of 1 would result in two blocks being queried
    // instead of 1. This is accounted for by subtracting 1.
    chunkSize := int(indexerConfig.GetLogsRange) - 1

    // Using the specified StoreConcurrency value from the config, as the buffer size for the fetchedLogsChan
    bufferSize := indexerConfig.StoreConcurrency
    if bufferSize > 100 {
        bufferSize = 100
    }
    if bufferSize == 0 {
        bufferSize = 3 // default buffer size
    }
    return &LogFetcher{
        iterator:        util.NewChunkIterator(startBlock, endBlock, chunkSize, ascending),
        startBlock:      startBlock,
        endBlock:        endBlock,
        fetchedLogsChan: make(chan types.Log, bufferSize),
        backend:         backend,
        indexerConfig:   indexerConfig,
        bufferSize:      bufferSize,
        topics:          indexerConfig.Topics,
    }
}

// GetChunkArr gets the appropriate amount of block chunks (getLogs ranges).
func (f *LogFetcher) GetChunkArr() (chunkArr []*util.Chunk) {
    for i := uint64(0); i < f.indexerConfig.GetLogsBatchAmount; i++ {
        chunk := f.iterator.NextChunk()
        if chunk == nil {
            return chunkArr
        }
        chunkArr = append(chunkArr, chunk)

        // Stop appending chunks if the max height of the current chunk exceeds the concurrency threshold
        if chunk.EndBlock.Uint64() > f.endBlock.Uint64()-f.indexerConfig.ConcurrencyThreshold {
            logger.ReportScribeState(f.indexerConfig.ChainID, chunk.EndBlock.Uint64(), f.indexerConfig.Addresses, logger.ConcurrencyThresholdReached)
            return chunkArr
        }
    }
    return chunkArr
}

// Start starts the log fetching process. If the context is canceled, logs will stop being filtered.
// 1. Within an infinite for loop, chunks of getLogs blocks are constructed and used to get logs. This flow is paused
// when the logs channel's buffer of 15 is reached.
// 2. Each time the logs are received, a wait group is used to ensure that there is no race condition
// where channels could be closed before a log could be saved.
// 3. When the range to get logs is completed (GetChunkArr returns a zero array), the wait group is used to ensure
// that all logs are added to the logs channel before returning and terminating the function.
// 4. Completing the Start function triggers the closeOnDone function, which sends a boolean in the done channel
// that signals that the fetcher has completed. The consumer of these logs then performs a drain to fully empty the logs
// channel. See contract.go to learn more how the logs from this file are consumed.
func (f *LogFetcher) Start(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            if ctx.Err() != nil {
                return fmt.Errorf("could not finish filtering range: %w", ctx.Err())
            }

            return nil
        default:
            chunks := f.GetChunkArr()

            if len(chunks) == 0 {
                close(f.fetchedLogsChan)
                return nil
            }
            logs, err := f.FetchLogs(ctx, chunks)
            if err != nil {
                return fmt.Errorf("could not filter logs: %w", err)
            }
            select {
            case <-ctx.Done():
                return fmt.Errorf("context canceled while adding log to chan %w", ctx.Err())

            default:
                // insert logs into channel
                for i := range logs {
                    f.fetchedLogsChan <- logs[i]
                }
            }
        }
    }
}

// FetchLogs safely calls FilterLogs with the filtering implementing a backoff in the case of
// rate limiting and respects context cancellation.
//
// nolint:cyclop
func (f *LogFetcher) FetchLogs(ctx context.Context, chunks []*util.Chunk) ([]types.Log, error) {
    backoffConfig := &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    1 * time.Second,
        Max:    8 * time.Second,
    }

    attempt := 0
    timeout := time.Duration(0)

    for {
        select {
        case <-ctx.Done():
            logger.ReportIndexerError(ctx.Err(), *f.indexerConfig, logger.GetLogsError)
            return nil, fmt.Errorf("context was canceled before logs could be fetched")
        case <-time.After(timeout):
            attempt++
            if attempt > retryTolerance {
                logger.ReportIndexerError(fmt.Errorf("retry max reached"), *f.indexerConfig, logger.GetLogsError)
                return nil, fmt.Errorf("maximum number of fetch logs attempts exceeded")
            }

            logs, err := f.getAndUnpackLogs(ctx, chunks, backoffConfig)
            if err != nil {
                logger.ReportIndexerError(err, *f.indexerConfig, logger.GetLogsError)
                timeout = backoffConfig.Duration()
                continue
            }

            return logs, nil
        }
    }
}

func (f *LogFetcher) getAndUnpackLogs(ctx context.Context, chunks []*util.Chunk, backoffConfig *backoff.Backoff) ([]types.Log, error) {
    result, err := backend.GetLogsInRange(ctx, f.backend, f.indexerConfig.Addresses, uint64(f.indexerConfig.ChainID), chunks, f.indexerConfig.Topics)
    if err != nil {
        backoffConfig.Duration()
        return nil, fmt.Errorf("could not get logs: %w", err)
    }

    var logs []types.Log
    resultIterator := result.Iterator()
    for !resultIterator.Done() {
        select {
        case <-ctx.Done():
            logger.ReportIndexerError(ctx.Err(), *f.indexerConfig, logger.GetLogsError)
            return nil, fmt.Errorf("context canceled while unpacking logs from request: %w", ctx.Err())
        default:
            _, logChunk := resultIterator.Next()
            if logChunk == nil || len(*logChunk) == 0 {
                logger.ReportIndexerError(fmt.Errorf("empty log chunk"), *f.indexerConfig, logger.EmptyGetLogsChunk)
                continue
            }

            logs = append(logs, *logChunk...)
        }
    }

    return logs, nil
}

// GetFetchedLogsChan returns the fetchedLogsChan channel as a pointer for access by the indexer and tests.
func (f *LogFetcher) GetFetchedLogsChan() *chan types.Log {
    return &f.fetchedLogsChan
}