package watcher_test
import (
. ""
watcherMocks ""
chainMocks ""
// filterLogsTracker tracks various calls attempted on filterLogs() mock.
type filterLogsTracker struct {
tb testing.TB
maxCalledHeight uint64
numberOfCalls uint64
calledBlocks []uint64
mux sync.Mutex
// newFilterLogsTracker creates a filter logs tracker.
func newFilterLogsTracker(tb testing.TB) filterLogsTracker {
return filterLogsTracker{
tb: tb,
maxCalledHeight: 0,
numberOfCalls: 0,
// update updates the filter logs tracker and performs some basic assertions.
func (f *filterLogsTracker) update(query ethereum.FilterQuery) {
defer f.mux.Unlock()
True(f.tb, query.ToBlock.Uint64() >= query.FromBlock.Uint64())
f.maxCalledHeight = query.ToBlock.Uint64()
f.calledBlocks = append(f.calledBlocks, query.ToBlock.Uint64())
func (s *WatcherSuite) TestContractWatcherRetry() {
if os.Getenv("CI") != "" {
s.T().Skip("this flakes on ci. TODO fix this. This should never fail locally.")
const requiredConfs = 3
logTracker := newFilterLogsTracker(s.T())
mockBlockSubscription := NewMockBlockSubscriber(s.GetTestContext(), *big.NewInt(0))
blockSubscriber := chainwatcher.NewBlockHeightWatcher(s.GetTestContext(), 0, mockBlockSubscription)
mockEvmClient := new(chainMocks.Chain)
mockContract := mocks.MockAddress()
eventLog := make(chan interface{})
contractWatcher := watcher.NewTestContractWatcher(s.GetTestContext(), s.T(), mockEvmClient, blockSubscriber, requiredConfs)
err := contractWatcher.ListenOnContract(s.GetTestContext(), mockContract.String(), eventLog)
Nil(s.T(), err)
// let's push some heights where nothing happens and mmake sure it doesn't call more than required confs
mockEvmClient.On("FilterLogs", mock.Anything, mock.MatchedBy(func(filterQuery ethereum.FilterQuery) bool {
shouldReturn := filterQuery.ToBlock.Uint64() <= 10 || filterQuery.ToBlock.Uint64() == 12
if shouldReturn {
return shouldReturn
})).Return([]types.Log{}, nil)
producedHeights := 10
s.Eventually(func() bool {
return logTracker.maxCalledHeight == uint64(producedHeights-requiredConfs)
// push up to the 10 and wait til we get there
s.Eventually(func() bool {
return logTracker.maxCalledHeight == uint64(producedHeights)
hasSentErr := false
mockEvmClient.On("FilterLogs", mock.Anything, mock.MatchedBy(func(filterQuery ethereum.FilterQuery) bool {
hasSentErr = true
shouldReturn := filterQuery.ToBlock.Uint64() == 11
if shouldReturn {
return shouldReturn
})).Return([]types.Log{}, errors.New("I'm an error")).Once()
// produce an error on the 11th block and make sure we recover
// on subsequent calls produce successfully
mockEvmClient.On("FilterLogs", mock.Anything, mock.MatchedBy(func(filterQuery ethereum.FilterQuery) bool {
if !hasSentErr {
return false
shouldReturn := filterQuery.ToBlock.Uint64() == 11
if shouldReturn {
return shouldReturn
})).Return([]types.Log{{Address: mockContract}}, nil)
s.Eventually(func() bool {
return logTracker.maxCalledHeight == 12
// TestListeners tests a scenario with more than 1 listener listening to the contract observer.
func (s *WatcherSuite) TestListeners() {
// timeout the test after thie period
ctx, cancel := context.WithTimeout(s.GetTestContext(), 30*time.Second)
defer cancel()
contractWatcher := watcher.NewTestContractWatcher(ctx, s.T(), new(chainMocks.Chain), new(watcherMocks.BlockHeightWatcher), 0)
mockContract := mocks.MockAddress()
// eventCount is how many events to listen for
const eventCount = 30
// listener count is how many listeners to create
const listenerCount = 10
// testEvents are the producerChan produces and the listener verify happened
testEvents := mocks.GetMockLogs(s.T(), eventCount)
initializationSemaphore := semaphore.NewWeighted(listenerCount)
// use a waitgroup to manage the listeners
var wg sync.WaitGroup
// create the listeners
for i := 0; i < listenerCount; i++ {
Nil(s.T(), initializationSemaphore.Acquire(ctx, 1))
go func() {
defer wg.Done()
// verificationSlice is a copy of test events used to verify the listener received all logs
verificationSlice := make([]types.Log, len(testEvents))
copy(verificationSlice, testEvents)
listener := make(chan interface{})
contractWatcher.AddListener(mockContract, listener)
for {
select {
case <-ctx.Done():
Nil(s.T(), ctx.Err())
case rawEvent := <-listener:
// convert the raw event to an event
event, ok := rawEvent.(types.Log)
if !ok {
s.T().Error("could not decode event from channel")
// verify the event is in the verification slice
for i, verifiedEvent := range verificationSlice {
if reflect.DeepEqual(verifiedEvent, event) {
// remove the event from the verification slice
verificationSlice = append(verificationSlice[:i], verificationSlice[i+1:]...)
// all events have been verified
if len(verificationSlice) == 0 {
// wait until all the listeners are initialized
Nil(s.T(), initializationSemaphore.Acquire(ctx, listenerCount))
producerChan := make(chan types.Log)
err := contractWatcher.AddProducer(ctx, mockContract, producerChan)
Nil(s.T(), err)
for _, event := range testEvents {
producerChan <- event
// attempt to add another producer to the same contract
NotNil(s.T(), contractWatcher.AddProducer(ctx, mockContract, producerChan))