services/scribe/service/indexer/fetcher_test.go
package indexer_test
import (
"context"
"fmt"
"github.com/brianvoe/gofakeit/v6"
"github.com/ethereum/go-ethereum/core/types"
"github.com/synapsecns/sanguine/services/scribe/backend"
"github.com/synapsecns/sanguine/services/scribe/testutil"
scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"
"time"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
. "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/synapsecns/sanguine/ethergo/backends/geth"
"github.com/synapsecns/sanguine/ethergo/chain/client/mocks"
etherMocks "github.com/synapsecns/sanguine/ethergo/mocks"
"github.com/synapsecns/sanguine/ethergo/util"
"github.com/synapsecns/sanguine/services/scribe/service/indexer"
)
// TestFilterLogsMaxAttempts ensures after the maximum number of attempts, an error is returned.
func (x *IndexerSuite) TestFilterLogsMaxAttempts() {
x.T().Skip("flake")
chainID := big.NewInt(int64(1))
simulatedChain := geth.NewEmbeddedBackendForChainID(x.GetTestContext(), x.T(), chainID)
simulatedClient, err := backend.DialBackend(x.GetTestContext(), simulatedChain.RPCAddress(), x.metrics)
Nil(x.T(), err)
mockFilterer := new(mocks.EVMClient)
contractAddress := etherMocks.MockAddress()
config := &scribeTypes.IndexerConfig{
ChainID: 1,
GetLogsBatchAmount: 1,
GetLogsRange: 1,
Addresses: []common.Address{contractAddress},
}
rangeFilter := indexer.NewLogFetcher(simulatedClient, big.NewInt(1), big.NewInt(10), config, true)
// Use the range filterer created above to create a mock log filter.
mockFilterer.
On("FilterLogs", mock.Anything, mock.Anything).
Return(nil, errors.New("I'm a test error"))
chunks := []*util.Chunk{{
StartBlock: big.NewInt(1),
EndBlock: big.NewInt(10),
}}
logInfo, err := rangeFilter.FetchLogs(x.GetTestContext(), chunks)
Nil(x.T(), logInfo)
NotNil(x.T(), err)
}
// TestGetChunkArr ensures that the batching orchestration function (collecting block range chunks into arrays) works properly.
func (x *IndexerSuite) TestGetChunkArr() {
chainID := big.NewInt(int64(1))
simulatedChain := geth.NewEmbeddedBackendForChainID(x.GetTestContext(), x.T(), chainID)
simulatedClient, err := backend.DialBackend(x.GetTestContext(), simulatedChain.RPCAddress(), x.metrics)
Nil(x.T(), err)
contractAddress := etherMocks.MockAddress()
config := &scribeTypes.IndexerConfig{
ChainID: 1,
ConcurrencyThreshold: 1,
GetLogsBatchAmount: 1,
GetLogsRange: 1,
Addresses: []common.Address{contractAddress},
}
startBlock := int64(1)
endBlock := int64(10)
rangeFilter := indexer.NewLogFetcher(simulatedClient, big.NewInt(startBlock), big.NewInt(endBlock), config, true)
numberOfRequests := int64(0)
for i := int64(0); i < endBlock; i++ {
chunks := rangeFilter.GetChunkArr()
if len(chunks) == 0 {
break
}
Equal(x.T(), len(chunks), int(config.GetLogsBatchAmount))
numberOfRequests++
}
Equal(x.T(), numberOfRequests, endBlock)
// Test with a larger batch size
config.GetLogsBatchAmount = 4
rangeFilter = indexer.NewLogFetcher(simulatedClient, big.NewInt(1), big.NewInt(10), config, true)
numberOfRequests = int64(0)
loopCount := endBlock/int64(config.GetLogsBatchAmount) + 1
for i := int64(0); i < loopCount; i++ {
chunks := rangeFilter.GetChunkArr()
if len(chunks) == 0 {
break
}
if i < loopCount-1 {
Equal(x.T(), len(chunks), int(config.GetLogsBatchAmount))
} else {
Equal(x.T(), len(chunks), int(endBlock%int64(config.GetLogsBatchAmount)))
}
numberOfRequests++
}
Equal(x.T(), numberOfRequests, loopCount)
// Test with a larger range size
config.GetLogsRange = 2
rangeFilter = indexer.NewLogFetcher(simulatedClient, big.NewInt(1), big.NewInt(10), config, true)
numberOfRequests = int64(0)
loopCount = endBlock/int64(config.GetLogsBatchAmount*config.GetLogsRange) + 1
for i := int64(0); i < loopCount; i++ {
chunks := rangeFilter.GetChunkArr()
if len(chunks) == 0 {
break
}
if i < loopCount-1 {
Equal(x.T(), len(chunks), int(config.GetLogsBatchAmount))
} else {
Equal(x.T(), len(chunks), 1)
}
numberOfRequests++
}
Equal(x.T(), numberOfRequests, loopCount)
}
// TestGetChunkArr ensures that the batching orchestration function (collecting block range chunks into arrays) works properly.
func (x *IndexerSuite) TestFetchLogs() {
testBackend := geth.NewEmbeddedBackend(x.GetTestContext(), x.T())
// start an omnirpc proxy and run 10 test transactions so we can batch call blocks 1-10
var wg sync.WaitGroup
var testChainHandler *testutil.TestChainHandler
var err error
wg.Add(2)
const desiredBlockHeight = 10
go func() {
defer wg.Done()
testChainHandler, err = testutil.PopulateWithLogs(x.GetTestContext(), x.T(), testBackend, desiredBlockHeight, []*testutil.DeployManager{x.manager})
Nil(x.T(), err)
}()
var host string
go func() {
defer wg.Done()
host = testutil.StartOmnirpcServer(x.GetTestContext(), x.T(), testBackend)
}()
wg.Wait()
scribeBackend, err := backend.DialBackend(x.GetTestContext(), host, x.metrics)
Nil(x.T(), err)
chunks := []*util.Chunk{
{
StartBlock: big.NewInt(1),
EndBlock: big.NewInt(2),
},
{
StartBlock: big.NewInt(3),
EndBlock: big.NewInt(4),
},
{
StartBlock: big.NewInt(5),
EndBlock: big.NewInt(6),
},
{
StartBlock: big.NewInt(7),
EndBlock: big.NewInt(8),
},
{
StartBlock: big.NewInt(9),
EndBlock: big.NewInt(10),
},
}
chainID, err := scribeBackend.ChainID(x.GetTestContext())
Nil(x.T(), err)
config := &scribeTypes.IndexerConfig{
ChainID: uint32(chainID.Uint64()),
ConcurrencyThreshold: 1,
GetLogsBatchAmount: 1,
GetLogsRange: 2,
Addresses: testChainHandler.Addresses,
}
rangeFilter := indexer.NewLogFetcher(scribeBackend, big.NewInt(1), big.NewInt(desiredBlockHeight), config, true)
logs, err := rangeFilter.FetchLogs(x.GetTestContext(), chunks)
Nil(x.T(), err)
Equal(x.T(), 2, len(logs))
cancelCtx, cancel := context.WithCancel(x.GetTestContext())
cancel()
_, err = rangeFilter.FetchLogs(cancelCtx, chunks)
NotNil(x.T(), err)
Contains(x.T(), err.Error(), "context was canceled")
}
// TestFetchLogsHighVolume tests the behavior of populating and consuming logs from the log fetcher in block ranges with many logs.
func (x *IndexerSuite) TestFetchLogsHighVolume() {
testBackend := geth.NewEmbeddedBackend(x.GetTestContext(), x.T())
// start an omnirpc proxy and run 10 test transactions so we can batch call blocks 1-10
var err error
host := testutil.StartOmnirpcServer(x.GetTestContext(), x.T(), testBackend)
scribeBackend, err := backend.DialBackend(x.GetTestContext(), host, x.metrics)
Nil(x.T(), err)
chainID, err := scribeBackend.ChainID(x.GetTestContext())
Nil(x.T(), err)
config := &scribeTypes.IndexerConfig{
ChainID: uint32(chainID.Uint64()),
ConcurrencyThreshold: 1,
GetLogsBatchAmount: 1,
GetLogsRange: 2,
StoreConcurrency: 6,
Addresses: []common.Address{common.BigToAddress(big.NewInt(1))},
}
logFetcher := indexer.NewLogFetcher(scribeBackend, big.NewInt(1), big.NewInt(1000), config, true)
logsChan := logFetcher.GetFetchedLogsChan()
addContext, addCancel := context.WithTimeout(x.GetTestContext(), 20*time.Second)
defer addCancel()
numLogs := 0
go func() {
for {
select {
case <-addContext.Done():
// test done
close(*logsChan)
return
case <-time.After(10 * time.Millisecond):
// add a log
randomTxHash := common.BigToHash(big.NewInt(gofakeit.Int64()))
randomLog := testutil.MakeRandomLog(randomTxHash)
*logsChan <- randomLog
numLogs++
// check buffer
GreaterOrEqual(x.T(), config.StoreConcurrency, len(*logsChan))
}
}
}()
var collectedLogs []types.Log
for {
select {
case <-x.GetTestContext().Done():
Error(x.T(), fmt.Errorf("test context was canceled"))
case <-time.After(1000 * time.Millisecond):
log, ok := <-*logsChan
if !ok {
goto Done
}
collectedLogs = append(collectedLogs, log)
}
}
Done:
Equal(x.T(), numLogs, len(collectedLogs))
}