synapsecns/sanguine

View on GitHub
services/explorer/consumer/fetcher/scribefetcher.go

Summary

Maintainability
A
0 mins
Test Coverage
package fetcher

import (
    "context"
    "fmt"
    "github.com/synapsecns/sanguine/core/metrics"
    "time"

    "github.com/ethereum/go-ethereum/common"
    ethTypes "github.com/ethereum/go-ethereum/core/types"
    "github.com/jpillora/backoff"
    "github.com/synapsecns/sanguine/services/explorer/consumer/client"
    "github.com/synapsecns/sanguine/services/scribe/graphql"
)

// ScribeFetcher is the interface for fetching events. It uses GQL.
type ScribeFetcher interface {
    // FetchTxSender fetches the sender of a transaction.
    FetchTxSender(ctx context.Context, chainID uint32, txHash string) (string, error)
    // FetchLastIndexed fetches the last indexed block per contract.
    FetchLastIndexed(ctx context.Context, chainID uint32, contractAddress string) (uint64, error)
    // FetchLogsInRange fetches logs in a range with the GQL client.
    FetchLogsInRange(ctx context.Context, chainID uint32, startBlock, endBlock uint64, contractAddress common.Address) ([]ethTypes.Log, error)
    // FetchBlockTime fetches the timestamp of a block.
    FetchBlockTime(ctx context.Context, chainID int, blockNumber int) (*int, error)
    // FetchTx fetches the transaction.
    FetchTx(ctx context.Context, tx string, chainID int, blockNumber int) (*uint64, *string, error)
}

type scribeFetcherImpl struct {
    underlyingClient *client.Client
    handler          metrics.Handler
}

const retryThreshold = 5

// NewFetcher creates a new fetcher.
func NewFetcher(fetchClient *client.Client, handler metrics.Handler) ScribeFetcher {
    return &scribeFetcherImpl{
        underlyingClient: fetchClient,
        handler:          handler,
    }
}

func (s scribeFetcherImpl) FetchTxSender(ctx context.Context, chainID uint32, txHash string) (string, error) {
    b := &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    10 * time.Millisecond,
        Max:    3 * time.Second,
    }
    timeout := time.Duration(0)
RETRY:
    select {
    case <-ctx.Done():

        return "", nil
    case <-time.After(timeout):
        sender, err := s.underlyingClient.GetTxSender(ctx, int(chainID), txHash)

        if err != nil {
            logger.Warnf("could not get sender for tx, trying again %s: %v", txHash, err)
            timeout = b.Duration()
            goto RETRY
        }

        if sender == nil || sender.Response == nil {
            logger.Warnf("could not get sender for tx, invalid tx likely (arb legacy, v,r,x, etc.) %s: %v", txHash)
            *sender.Response = ""
        }

        return *sender.Response, nil
    }
}

// FetchLastIndexed fetches the last indexed block per contract.
func (s scribeFetcherImpl) FetchLastIndexed(ctx context.Context, chainID uint32, contractAddress string) (uint64, error) {
    lastIndexed, err := s.underlyingClient.GetLastIndexed(ctx, int(chainID), contractAddress)
    if err != nil || lastIndexed == nil || lastIndexed.Response == nil {
        return 0, fmt.Errorf("could not get last indexed for contract %s: %w", contractAddress, err)
    }
    return uint64(*lastIndexed.Response), nil
}

// FetchLogsInRange fetches logs in a range with the GQL client.
func (s scribeFetcherImpl) FetchLogsInRange(ctx context.Context, chainID uint32, startBlock, endBlock uint64, contractAddress common.Address) ([]ethTypes.Log, error) {
    logs := &client.GetLogsRange{}
    page := 1
    contractAddressString := contractAddress.String()

    for {
        paginatedLogs, err := s.underlyingClient.GetLogsRange(ctx, int(chainID), int(startBlock), int(endBlock), page, &contractAddressString)
        if err != nil {
            return nil, fmt.Errorf("could not get logs: %w", err)
        }
        if len(paginatedLogs.Response) == 0 {
            break
        }

        logs.Response = append(logs.Response, paginatedLogs.Response...)
        page++
    }

    var parsedLogs []ethTypes.Log

    for _, log := range logs.Response {
        parsedLog, err := graphql.ParseLog(*log)
        if err != nil {
            return nil, fmt.Errorf("could not parse log: %w", err)
        }

        parsedLogs = append(parsedLogs, *parsedLog)
    }

    return parsedLogs, nil
}

func (s scribeFetcherImpl) FetchBlockTime(ctx context.Context, chainID int, blockNumber int) (*int, error) {
    b := &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    10 * time.Millisecond,
        Max:    3 * time.Second,
    }
    timeout := time.Duration(0)
    attempts := 0
RETRY:
    attempts++
    if attempts > retryThreshold {
        logger.Errorf("could not get block time for block %d on chainID %d after %d attempts", blockNumber, chainID, retryThreshold)
        return nil, fmt.Errorf("could not get block time for block %d on chainID %d after %d attempts", blockNumber, chainID, retryThreshold)
    }
    select {
    case <-ctx.Done():

        return nil, fmt.Errorf("could not get timestamp for block, context canceled %d: %d", chainID, blockNumber)
    case <-time.After(timeout):

        timeStamp, err := s.underlyingClient.GetBlockTime(ctx, chainID, blockNumber)

        if err != nil {
            logger.Warnf("could not get timestamp for block, trying again %d: %v", blockNumber, err)
            timeout = b.Duration()
            goto RETRY
        }

        if timeStamp == nil || timeStamp.Response == nil {
            logger.Warnf("could not get timestamp for block, invalid blocktime %d: %d", chainID, blockNumber)
            return nil, fmt.Errorf("could not get timestamp for block, invalid blocktime %d: %d", chainID, blockNumber)
        }

        return timeStamp.Response, nil
    }
}

// FetchTx fetches the transaction of a log.
func (s scribeFetcherImpl) FetchTx(ctx context.Context, tx string, chainID int, blockNumber int) (*uint64, *string, error) {
    b := &backoff.Backoff{
        Factor: 2,
        Jitter: true,
        Min:    10 * time.Millisecond,
        Max:    3 * time.Second,
    }
    attempts := 0
    timeout := time.Duration(0)
RETRY:
    attempts++

    if attempts > retryThreshold {
        logger.Errorf("could not get tx after %d attempts for hash %s on chain %d trying blocktime", retryThreshold, tx, chainID)
        auxiliaryBlocktime, err := s.FetchBlockTime(ctx, chainID, blockNumber)
        if err != nil {
            return nil, nil, fmt.Errorf("could not get tx for log, after trying to get blocktime, invalid response %d: %s", chainID, tx)
        }
        sender := ""
        blocktime := uint64(*auxiliaryBlocktime)
        return &blocktime, &sender, nil
    }

    select {
    case <-ctx.Done():
        return nil, nil, fmt.Errorf("could not get tx for log, context canceled %d: %s", chainID, tx)
    case <-time.After(timeout):

        res, err := s.underlyingClient.GetTransactions(ctx, chainID, 1, &tx)

        if err != nil || res == nil || res.Response == nil || len(res.Response) == 0 {
            timeout = b.Duration()
            goto RETRY
        }

        resTx := res.Response[0]
        sender := resTx.Sender
        blocktime := uint64(resTx.Timestamp)
        return &blocktime, &sender, nil
    }
}