synapsecns/sanguine

View on GitHub
services/scribe/service/scribe_test.go

Summary

Maintainability
D
1 day
Test Coverage
package service_test

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/brianvoe/gofakeit/v6"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/jpillora/backoff"
    . "github.com/stretchr/testify/assert"
    "github.com/synapsecns/sanguine/ethergo/backends/geth"
    "github.com/synapsecns/sanguine/services/scribe/backend"
    "github.com/synapsecns/sanguine/services/scribe/config"
    "github.com/synapsecns/sanguine/services/scribe/db"
    "github.com/synapsecns/sanguine/services/scribe/service"
    scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"

    "github.com/synapsecns/sanguine/services/scribe/db/datastore/sql/base"
    "github.com/synapsecns/sanguine/services/scribe/logger"
    "github.com/synapsecns/sanguine/services/scribe/testutil"
    "math/big"
    "net/http"
    "os"
    "strconv"
    "strings"
    "time"
)

// Spins up three chains with three contracts on each. Each contract emits events across a span of 20 blocks.
// The generated chains and contracts are fed into a new scribe instance, which is then queried for logs.
func (s *ScribeSuite) TestSimulatedScribe() {
    if os.Getenv("CI") != "" {
        s.T().Skip("Test flake: 20 sec of livefilling may fail on CI")
    }
    const numberOfContracts = 3
    const desiredBlockHeight = 20
    chainIDs := []uint32{gofakeit.Uint32(), gofakeit.Uint32(), gofakeit.Uint32()}
    chainBackends := make(map[uint32]geth.Backend)
    for i := range chainIDs {
        newBackend := geth.NewEmbeddedBackendForChainID(s.GetTestContext(), s.T(), big.NewInt(int64(chainIDs[i])))
        chainBackends[chainIDs[i]] = *newBackend
    }

    managers := []*testutil.DeployManager{s.manager}
    if numberOfContracts > 1 {
        for i := 1; i < numberOfContracts; i++ {
            managers = append(managers, testutil.NewDeployManager(s.T()))
        }
    }

    testChainHandlerMap, chainBackendMap, err := testutil.PopulateChainsWithLogs(s.GetTestContext(), s.T(), chainBackends, desiredBlockHeight, managers, s.nullMetrics)
    Nil(s.T(), err)

    // Build scribe config
    var chainConfigs []config.ChainConfig
    for chainID, testChainHandler := range testChainHandlerMap {
        contractConfigs := config.ContractConfigs{}
        for i := range testChainHandler.Addresses {
            contractConfig := config.ContractConfig{
                Address: testChainHandler.Addresses[i].String(),
            }
            contractConfigs = append(contractConfigs, contractConfig)
        }

        chainConfig := config.ChainConfig{
            ChainID:            chainID,
            Confirmations:      0,
            GetLogsBatchAmount: 1,
            StoreConcurrency:   1,
            GetLogsRange:       1,
            Contracts:          contractConfigs,
        }
        chainConfigs = append(chainConfigs, chainConfig)
    }

    scribeConfig := config.Config{
        Chains: chainConfigs,
    }

    scribe, err := service.NewScribe(s.testDB, chainBackendMap, scribeConfig, s.nullMetrics)
    Nil(s.T(), err)
    killableContext, cancel := context.WithTimeout(s.GetTestContext(), 20*time.Second)
    defer cancel()
    _ = scribe.Start(killableContext)

    // Check that the events were recorded.
    for _, chainConfig := range scribeConfig.Chains {
        for _, contractConfig := range chainConfig.Contracts {
            // Check the storage of logs.
            logFilter := db.LogFilter{
                ChainID:         chainConfig.ChainID,
                ContractAddress: contractConfig.Address,
            }
            logs, err := s.testDB.RetrieveLogsWithFilter(s.GetTestContext(), logFilter, 1)
            Nil(s.T(), err)
            Equal(s.T(), 4, len(logs))
            lastIndexed, err := s.testDB.RetrieveLastIndexed(s.GetTestContext(), common.HexToAddress(contractConfig.Address), chainConfig.ChainID, scribeTypes.IndexingConfirmed)
            Nil(s.T(), err)
            LessOrEqual(s.T(), desiredBlockHeight, int(lastIndexed))
        }
        // Check the storage of receipts.
        receiptFilter := db.ReceiptFilter{
            ChainID: chainConfig.ChainID,
        }
        receipts, err := s.testDB.RetrieveReceiptsWithFilter(s.GetTestContext(), receiptFilter, 1)
        Nil(s.T(), err)
        Equal(s.T(), 12, len(receipts))
    }
}

// TestLivefillParity runs livefill on certain prod chains. Then it checks parity with that chain's block explorer API.
//
// nolint:gocognit,cyclop,maintidx
func (s *ScribeSuite) TestLivefillParity() {
    if os.Getenv("CI") != "" {
        s.T().Skip("Network test flake")
    }
    const blockRange = uint64(100)
    const globalConfirmations = uint64(200)
    // ethRPCURL := "https://1rpc.io/eth"
    // arbRPCURL := "https://endpoints.omniatech.io/v1/arbitrum/one/public"
    // avaxRPCURL := "https://avalanche.public-rpc.com"

    ethRPCURL := "https://rpc.interoperability.institute/confirmations/1/rpc/1"
    arbRPCURL := "https://rpc.interoperability.institute/confirmations/1/rpc/42161"
    maticRPCURL := "https://rpc.interoperability.institute/confirmations/1/rpc/137"
    avaxRPCURL := "https://rpc.interoperability.institute/confirmations/1/rpc/43114"
    bscRPCURL := "https://rpc.interoperability.institute/confirmations/1/rpc/56"

    ethClient, err := backend.DialBackend(s.GetTestContext(), ethRPCURL, s.nullMetrics)
    Nil(s.T(), err)
    arbClient, err := backend.DialBackend(s.GetTestContext(), arbRPCURL, s.nullMetrics)
    Nil(s.T(), err)
    maticClient, err := backend.DialBackend(s.GetTestContext(), maticRPCURL, s.nullMetrics)
    Nil(s.T(), err)
    avaxClient, err := backend.DialBackend(s.GetTestContext(), avaxRPCURL, s.nullMetrics)
    Nil(s.T(), err)
    bscClient, err := backend.DialBackend(s.GetTestContext(), bscRPCURL, s.nullMetrics)
    Nil(s.T(), err)

    ethID := uint32(1)
    bscID := uint32(56)
    arbID := uint32(42161)
    maticID := uint32(137)
    avaxID := uint32(43114)
    chains := []uint32{ethID, bscID, arbID, maticID, avaxID}

    // Get the current block for each chain.
    ethCurrentBlock, err := ethClient.BlockNumber(s.GetTestContext())
    Nil(s.T(), err)
    ethCurrentBlock -= globalConfirmations
    arbCurrentBlock, err := arbClient.BlockNumber(s.GetTestContext())
    Nil(s.T(), err)
    arbCurrentBlock -= globalConfirmations
    maticCurrentBlock, err := maticClient.BlockNumber(s.GetTestContext())
    Nil(s.T(), err)
    maticCurrentBlock -= globalConfirmations
    avaxCurrentBlock, err := avaxClient.BlockNumber(s.GetTestContext())
    Nil(s.T(), err)
    avaxCurrentBlock -= globalConfirmations
    bscCurrentBlock, err := bscClient.BlockNumber(s.GetTestContext())
    Nil(s.T(), err)
    bscCurrentBlock -= globalConfirmations

    latestBlocks := map[uint32]uint64{
        ethID:   ethCurrentBlock,
        arbID:   arbCurrentBlock,
        maticID: maticCurrentBlock,
        avaxID:  avaxCurrentBlock,
        bscID:   bscCurrentBlock,
    }
    clients := map[uint32][]backend.ScribeBackend{
        ethID:   {ethClient, ethClient},
        bscID:   {bscClient, bscClient},
        arbID:   {arbClient, arbClient},
        maticID: {maticClient, maticClient},
        avaxID:  {avaxClient, avaxClient},
    }

    apiURLs := map[uint32]string{
        ethID:   "https://api.etherscan.io/api",
        arbID:   "https://api.arbiscan.io/api",
        avaxID:  "https://api.snowtrace.io/api",
        bscID:   "https://api.bscscan.com/api",
        maticID: "https://api.polygonscan.com/api",
    }
    scribeConfig := config.Config{
        Chains: []config.ChainConfig{
            {
                ChainID:              ethID,
                Confirmations:        0,
                GetLogsRange:         50,
                GetLogsBatchAmount:   3,
                GetBlockBatchAmount:  10,
                ConcurrencyThreshold: 20000,
                LivefillThreshold:    100,
                Contracts: []config.ContractConfig{
                    {
                        Address:    "0x2796317b0fF8538F253012862c06787Adfb8cEb6",
                        StartBlock: ethCurrentBlock - blockRange,
                    },
                    {
                        Address:    "0x1116898DdA4015eD8dDefb84b6e8Bc24528Af2d8",
                        StartBlock: ethCurrentBlock - blockRange,
                    },
                },
            },
            {
                ChainID:              bscID,
                Confirmations:        0,
                GetLogsRange:         50,
                GetLogsBatchAmount:   3,
                GetBlockBatchAmount:  10,
                ConcurrencyThreshold: 20000,
                LivefillThreshold:    100,
                Contracts: []config.ContractConfig{
                    {
                        Address:    "0x28ec0B36F0819ecB5005cAB836F4ED5a2eCa4D13",
                        StartBlock: bscCurrentBlock - blockRange,
                    },
                    {
                        Address:    "0x930d001b7efb225613aC7F35911c52Ac9E111Fa9",
                        StartBlock: bscCurrentBlock - blockRange,
                    },
                },
            },
            {
                ChainID:              arbID,
                Confirmations:        0,
                GetLogsRange:         50,
                GetLogsBatchAmount:   3,
                GetBlockBatchAmount:  10,
                ConcurrencyThreshold: 20000,
                LivefillThreshold:    100,
                Contracts: []config.ContractConfig{
                    {
                        Address:    "0x6F4e8eBa4D337f874Ab57478AcC2Cb5BACdc19c9",
                        StartBlock: arbCurrentBlock - blockRange,
                    },
                    {
                        Address:    "0x9Dd329F5411466d9e0C488fF72519CA9fEf0cb40",
                        StartBlock: arbCurrentBlock - blockRange,
                    },
                },
            },
            {
                ChainID:              maticID,
                Confirmations:        0,
                GetLogsRange:         50,
                GetLogsBatchAmount:   3,
                GetBlockBatchAmount:  10,
                ConcurrencyThreshold: 20000,
                LivefillThreshold:    100,
                Contracts: []config.ContractConfig{
                    {
                        Address:    "0x8F5BBB2BB8c2Ee94639E55d5F41de9b4839C1280",
                        StartBlock: maticCurrentBlock - blockRange,
                    },
                    {
                        Address:    "0x85fCD7Dd0a1e1A9FCD5FD886ED522dE8221C3EE5",
                        StartBlock: maticCurrentBlock - blockRange,
                    },
                },
            },
            {
                ChainID:              avaxID,
                Confirmations:        0,
                GetLogsRange:         50,
                GetLogsBatchAmount:   3,
                GetBlockBatchAmount:  10,
                ConcurrencyThreshold: 20000,
                LivefillThreshold:    100,
                Contracts: []config.ContractConfig{
                    {
                        Address:    "0xC05e61d0E7a63D27546389B7aD62FdFf5A91aACE",
                        StartBlock: avaxCurrentBlock - blockRange,
                    },
                    {
                        Address:    "0x77a7e60555bC18B4Be44C181b2575eee46212d44",
                        StartBlock: avaxCurrentBlock - blockRange,
                    },
                },
            },
        },
    }

    scribe, err := service.NewScribe(s.testDB, clients, scribeConfig, s.nullMetrics)
    Nil(s.T(), err)

    killableContext, cancel := context.WithCancel(s.GetTestContext())

    go func() {
        _ = scribe.Start(killableContext)
    }()

    doneChan := make(chan bool, len(chains))

    for i := range chains {
        go func(index int) {
            for {
                allContractsBackfilled := true
                chain := scribeConfig.Chains[index]
                for _, contract := range chain.Contracts {
                    currentBlock, err := s.testDB.RetrieveLastIndexed(s.GetTestContext(), common.HexToAddress(contract.Address), chain.ChainID, scribeTypes.IndexingConfirmed)

                    Nil(s.T(), err)
                    if currentBlock <= latestBlocks[chain.ChainID] {
                        allContractsBackfilled = false
                    }
                }
                if allContractsBackfilled {
                    doneChan <- true
                    fmt.Println("Done with chain", chain.ChainID, "index", index, "of", len(chains), "chains")

                    return
                }
                time.Sleep(time.Second)
            }
        }(i)
    }

    for range chains {
        <-doneChan
    }
    cancel()
    for i := range chains {
        chain := scribeConfig.Chains[i]
        for _, contract := range chain.Contracts {
            logFilter := db.LogFilter{
                ChainID:         chains[i],
                ContractAddress: contract.Address,
            }
            fromBlock := latestBlocks[chains[i]] - blockRange
            toBlock := latestBlocks[chains[i]]
            var dbLogCount int
            var dbLogs []*types.Log
            dbLogCount, dbLogs, err = getLogAmount(s.GetTestContext(), s.testDB, logFilter, fromBlock, toBlock)
            Nil(s.T(), err)

            txs := make(map[int64]string)
            var explorerLogCount int
            explorerLogCount, err = getLogs(s.GetTestContext(), contract.Address, fromBlock, toBlock, apiURLs[chain.ChainID], &txs)
            Nil(s.T(), err)

            for k := range dbLogs {
                logBlockNumber := int64(dbLogs[k].BlockNumber)

                txLog := txs[logBlockNumber]
                if dbLogs[k].TxHash.String() != txLog {
                    Error(s.T(), fmt.Errorf("mismatched TX\nchainid %d\nstart %d end %d\ndb txhash %s\nexplorer tx %s", chain.ChainID, contract.StartBlock, dbLogs[k].BlockNumber, dbLogs[k].TxHash.String(), txLog))
                }
            }
            // fmt.Println("chain", chain.ChainID, "contract", contract.Address, "dbLogCount", dbLogCount, "explorerLogCount", explorerLogCount)
            if dbLogCount != explorerLogCount {
                fmt.Println("chain", chain.ChainID, "contract", contract.Address, "dbLogCount", dbLogCount, "explorerLogCount", explorerLogCount)
            }
            Equal(s.T(), dbLogCount, explorerLogCount)
        }
    }
}

func createHTTPClient() *http.Client {
    return &http.Client{
        Timeout: 10 * time.Second,
        Transport: &http.Transport{
            ResponseHeaderTimeout: 10 * time.Second,
        },
    }
}

func processBatch(ctx context.Context, client *http.Client, url string, txs *map[int64]string) (int, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return 0, fmt.Errorf("error getting data: %w", err)
    }
    resRaw, err := client.Do(req)
    if err != nil {
        return 0, fmt.Errorf("could not get data from explorer %w", err)
    }

    var decodedRes map[string]json.RawMessage
    if err := json.NewDecoder(resRaw.Body).Decode(&decodedRes); err != nil {
        return 0, fmt.Errorf("error decoding response: %w", err)
    }

    var resultSlice []map[string]interface{}
    if err := json.Unmarshal(decodedRes["result"], &resultSlice); err != nil {
        return 0, fmt.Errorf("error unmarshaling result: %w", err)
    }

    if err = resRaw.Body.Close(); err != nil {
        logger.ReportScribeError(err, 0, logger.TestError)
    }

    for _, result := range resultSlice {
        hexBlock, ok := result["blockNumber"].(string)
        if !ok {
            return 0, fmt.Errorf("error parsing block number: %w", err)
        }

        txHashStr, ok := result["transactionHash"].(string)
        if !ok {
            return 0, fmt.Errorf("error parsing transaction hash: %w", err)
        }

        key, err := strconv.ParseInt(strings.TrimPrefix(hexBlock, "0x"), 16, 64)
        if err != nil {
            return 0, fmt.Errorf("error parsing block number: %w", err)
        }
        (*txs)[key] = txHashStr
    }
    return len(resultSlice), nil
}

func getLogs(ctx context.Context, contractAddress string, fromBlock uint64, toBlock uint64, apiURL string, txs *map[int64]string) (int, error) {
    blockRange := toBlock - fromBlock
    batchSize := uint64(400)
    numBatches := blockRange/batchSize + 1
    client := createHTTPClient()
    totalResults := 0

    for i := uint64(0); i < numBatches; i++ {
        startBlock := fromBlock + i*batchSize
        endBlock := startBlock + batchSize - 1
        if endBlock > toBlock {
            endBlock = toBlock
        }
        url := fmt.Sprintf("%s?module=logs&action=getLogs&address=%s&fromBlock=%d&toBlock=%d&page=1",
            apiURL, contractAddress, startBlock, endBlock)
        b := &backoff.Backoff{
            Factor: 2,
            Jitter: true,
            Min:    5 * time.Second,
            Max:    10 * time.Second,
        }
        timeout := 3 * time.Second

    RETRY:
        select {
        case <-ctx.Done():
            return 0, fmt.Errorf("context canceled: %w", ctx.Err())
        case <-time.After(timeout):
            resultCount, err := processBatch(ctx, client, url, txs)
            if err != nil {
                fmt.Println("error getting explorer logs", err)
                timeout = b.Duration()
                goto RETRY
            }
            totalResults += resultCount
        }

        if i < numBatches-1 {
            time.Sleep(3 * time.Second)
        }
    }

    return totalResults, nil
}

func getLogAmount(ctx context.Context, db db.EventDB, filter db.LogFilter, startBlock uint64, endBlock uint64) (int, []*types.Log, error) {
    page := 1
    var retrievedLogs []*types.Log
    for {
        logs, err := db.RetrieveLogsInRangeAsc(ctx, filter, startBlock, endBlock, page)
        if err != nil {
            return 0, nil, fmt.Errorf("failure while retreiving logs from database %w", err)
        }
        retrievedLogs = append(retrievedLogs, logs...)
        if len(logs) < base.PageSize {
            break
        }
        page++
    }
    return len(retrievedLogs), retrievedLogs, nil
}