status-im/status-go

View on GitHub
transactions/pendingtxtracker_test.go

Summary

Maintainability
A
0 mins
Test Coverage
package transactions

import (
    "context"
    "database/sql"
    "encoding/json"
    "math/big"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/mock"
    "github.com/stretchr/testify/require"

    eth "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/event"
    "github.com/ethereum/go-ethereum/rpc"

    "github.com/status-im/status-go/services/wallet/common"
    "github.com/status-im/status-go/services/wallet/walletevent"
    "github.com/status-im/status-go/t/helpers"
    "github.com/status-im/status-go/walletdatabase"
)

// setupTestTransactionDB will use the default pending check interval if checkInterval is nil
func setupTestTransactionDB(t *testing.T, checkInterval *time.Duration) (*PendingTxTracker, func(), *MockChainClient, *event.Feed) {
    db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
    require.NoError(t, err)

    chainClient := NewMockChainClient()
    eventFeed := &event.Feed{}
    pendingCheckInterval := PendingCheckInterval
    if checkInterval != nil {
        pendingCheckInterval = *checkInterval
    }
    return NewPendingTxTracker(db, chainClient, nil, eventFeed, pendingCheckInterval), func() {
        require.NoError(t, db.Close())
    }, chainClient, eventFeed
}

func waitForTaskToStop(pt *PendingTxTracker) {
    for pt.taskRunner.IsRunning() {
        time.Sleep(1 * time.Microsecond)
    }
}

func TestPendingTxTracker_ValidateConfirmedWithSuccessStatus(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
    defer stop()

    txs := MockTestTransactions(t, chainClient, []TestTxSummary{{}})

    eventChan := make(chan walletevent.Event, 3)
    sub := eventFeed.Subscribe(eventChan)

    err := m.StoreAndTrackPendingTx(&txs[0])
    require.NoError(t, err)

    for i := 0; i < 3; i++ {
        select {
        case we := <-eventChan:
            if i == 0 || i == 1 {
                // Check add and delete
                require.Equal(t, EventPendingTransactionUpdate, we.Type)
            } else {
                require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
                var p StatusChangedPayload
                err = json.Unmarshal([]byte(we.Message), &p)
                require.NoError(t, err)
                require.Equal(t, txs[0].Hash, p.Hash)
                require.Equal(t, Success, p.Status)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for event")
        }
    }

    // Wait for the answer to be processed
    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res))

    sub.Unsubscribe()
}

func TestPendingTxTracker_ValidateConfirmedWithFailedStatus(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
    defer stop()

    txs := MockTestTransactions(t, chainClient, []TestTxSummary{{failStatus: true}})

    eventChan := make(chan walletevent.Event, 3)
    sub := eventFeed.Subscribe(eventChan)

    err := m.StoreAndTrackPendingTx(&txs[0])
    require.NoError(t, err)

    for i := 0; i < 3; i++ {
        select {
        case we := <-eventChan:
            if i == 0 || i == 1 {
                // Check add and delete
                require.Equal(t, EventPendingTransactionUpdate, we.Type)
            } else {
                require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
                var p StatusChangedPayload
                err = json.Unmarshal([]byte(we.Message), &p)
                require.NoError(t, err)
                require.Equal(t, txs[0].Hash, p.Hash)
                require.Equal(t, Failed, p.Status)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for event")
        }
    }

    // Wait for the answer to be processed
    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res))

    sub.Unsubscribe()
}

func TestPendingTxTracker_InterruptWatching(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
    defer stop()

    txs := GenerateTestPendingTransactions(0, 2)

    // Mock the first call to getTransactionByHash
    chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
    cl := chainClient.Clients[txs[0].ChainID]
    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        return (len(b) == 2 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash && b[1].Method == GetTransactionReceiptRPCName && b[1].Args[0] == txs[1].Hash)
    })).Return(nil).Once().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)

        // Simulate still pending due to "null" return from eth_getTransactionReceipt
        elems[0].Result.(*nullableReceipt).Receipt = nil

        // Simulate parsing of eth_getTransactionReceipt response
        elems[1].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })

    eventChan := make(chan walletevent.Event, 2)
    sub := eventFeed.Subscribe(eventChan)

    for i := range txs {
        err := m.addPending(&txs[i])
        require.NoError(t, err)
    }

    // Check add
    for i := 0; i < 2; i++ {
        select {
        case we := <-eventChan:
            require.Equal(t, EventPendingTransactionUpdate, we.Type)
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for event")
        }
    }

    err := m.Start()
    require.NoError(t, err)

    for i := 0; i < 2; i++ {
        select {
        case we := <-eventChan:
            if i == 0 {
                require.Equal(t, EventPendingTransactionUpdate, we.Type)
            } else {
                require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
                var p StatusChangedPayload
                err := json.Unmarshal([]byte(we.Message), &p)
                require.NoError(t, err)
                require.Equal(t, txs[1].Hash, p.Hash)
                require.Equal(t, txs[1].ChainID, p.ChainID)
                require.Equal(t, Success, p.Status)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for event")
        }
    }

    // Stop the next timed call
    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 1, len(res), "should have only one pending tx")

    // Restart the tracker to process leftovers
    //
    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash)
    })).Return(nil).Once().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)
        // Simulate parsing of eth_getTransactionReceipt response
        elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })

    err = m.Start()
    require.NoError(t, err)

    for i := 0; i < 2; i++ {
        select {
        case we := <-eventChan:
            if i == 0 {
                require.Equal(t, EventPendingTransactionUpdate, we.Type)
            } else {
                require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
                var p StatusChangedPayload
                err := json.Unmarshal([]byte(we.Message), &p)
                require.NoError(t, err)
                require.Equal(t, txs[0].ChainID, p.ChainID)
                require.Equal(t, txs[0].Hash, p.Hash)
                require.Equal(t, Success, p.Status)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for event")
        }
    }

    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err = m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res))

    sub.Unsubscribe()
}

func TestPendingTxTracker_MultipleClients(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
    defer stop()

    txs := GenerateTestPendingTransactions(0, 2)
    txs[1].ChainID++

    // Mock the both clients to be available
    chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID, txs[1].ChainID})
    cl := chainClient.Clients[txs[0].ChainID]
    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash)
    })).Return(nil).Once().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)
        // Simulate parsing of eth_getTransactionReceipt response
        elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })
    cl = chainClient.Clients[txs[1].ChainID]
    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash)
    })).Return(nil).Once().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)
        // Simulate parsing of eth_getTransactionReceipt response
        elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })

    eventChan := make(chan walletevent.Event, 6)
    sub := eventFeed.Subscribe(eventChan)

    for i := range txs {
        err := m.TrackPendingTransaction(txs[i].ChainID, txs[i].Hash, txs[i].From, txs[i].To, txs[i].Type, AutoDelete, "")
        require.NoError(t, err)
    }

    err := m.Start()
    require.NoError(t, err)

    storeEventCount := 0
    statusEventCount := 0

    validateStatusChange := func(we *walletevent.Event) {
        if we.Type == EventPendingTransactionUpdate {
            storeEventCount++
        } else if we.Type == EventPendingTransactionStatusChanged {
            statusEventCount++
            require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
            var p StatusChangedPayload
            err := json.Unmarshal([]byte(we.Message), &p)
            require.NoError(t, err)
            require.Equal(t, Success, p.Status)
        }
    }

    for i := 0; i < 2; i++ {
        for j := 0; j < 3; j++ {
            select {
            case we := <-eventChan:
                validateStatusChange(&we)
            case <-time.After(1 * time.Second):
                t.Fatal("timeout waiting for event", i, j, storeEventCount, statusEventCount)
            }
        }
    }

    require.Equal(t, 4, storeEventCount)
    require.Equal(t, 2, statusEventCount)

    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res))

    sub.Unsubscribe()
}

func TestPendingTxTracker_Watch(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
    defer stop()

    txs := GenerateTestPendingTransactions(0, 2)
    // Make the second already confirmed
    *txs[0].Status = Success

    // Mock the first call to getTransactionByHash
    chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
    cl := chainClient.Clients[txs[0].ChainID]
    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        return len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash
    })).Return(nil).Once().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)
        // Simulate parsing of eth_getTransactionReceipt response
        elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })

    eventChan := make(chan walletevent.Event, 3)
    sub := eventFeed.Subscribe(eventChan)

    // Track the first transaction
    err := m.TrackPendingTransaction(txs[1].ChainID, txs[1].Hash, txs[1].From, txs[1].To, txs[1].Type, Keep, "")
    require.NoError(t, err)

    // Store the confirmed already
    err = m.StoreAndTrackPendingTx(&txs[0])
    require.NoError(t, err)

    storeEventCount := 0
    statusEventCount := 0
    for j := 0; j < 3; j++ {
        select {
        case we := <-eventChan:
            if EventPendingTransactionUpdate == we.Type {
                storeEventCount++
            } else if EventPendingTransactionStatusChanged == we.Type {
                statusEventCount++
                var p StatusChangedPayload
                err := json.Unmarshal([]byte(we.Message), &p)
                require.NoError(t, err)
                require.Equal(t, txs[1].ChainID, p.ChainID)
                require.Equal(t, txs[1].Hash, p.Hash)
                require.Equal(t, Success, p.Status)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for the status update event")
        }
    }
    require.Equal(t, 2, storeEventCount)
    require.Equal(t, 1, statusEventCount)

    // Stop the next timed call
    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res), "should have no pending tx")

    status, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
    require.NoError(t, err)
    require.NotEqual(t, Pending, status)

    err = m.Delete(context.Background(), txs[1].ChainID, txs[1].Hash)
    require.NoError(t, err)

    select {
    case we := <-eventChan:
        require.Equal(t, EventPendingTransactionUpdate, we.Type)
    case <-time.After(1 * time.Second):
        t.Fatal("timeout waiting for the delete event")
    }

    sub.Unsubscribe()
}

func TestPendingTxTracker_Watch_StatusChangeIncrementally(t *testing.T) {
    m, stop, chainClient, eventFeed := setupTestTransactionDB(t, common.NewAndSet(1*time.Nanosecond))
    defer stop()

    txs := GenerateTestPendingTransactions(0, 2)

    var firsDoneWG sync.WaitGroup
    firsDoneWG.Add(1)

    // Mock the first call to getTransactionByHash
    chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
    cl := chainClient.Clients[txs[0].ChainID]

    cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
        if len(cl.Calls) == 0 {
            res := len(b) > 0 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash
            // If the first processing call picked up the second validate this case also
            if len(b) == 2 {
                res = res && b[1].Method == GetTransactionReceiptRPCName && b[1].Args[0] == txs[1].Hash
            }
            return res
        }
        // Second call we expect only one left
        return len(b) == 1 && (b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash)
    })).Return(nil).Twice().Run(func(args mock.Arguments) {
        elems := args.Get(1).([]rpc.BatchElem)
        if len(cl.Calls) == 2 {
            firsDoneWG.Wait()
        }
        // Only first item is processed, second is left pending
        // Simulate parsing of eth_getTransactionReceipt response
        elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
            BlockNumber: new(big.Int).SetUint64(1),
            Status:      1,
        }
    })

    eventChan := make(chan walletevent.Event, 6)
    sub := eventFeed.Subscribe(eventChan)

    for i := range txs {
        // Track the first transaction
        err := m.TrackPendingTransaction(txs[i].ChainID, txs[i].Hash, txs[i].From, txs[i].To, txs[i].Type, Keep, "")
        require.NoError(t, err)
    }

    storeEventCount := 0
    statusEventCount := 0

    validateStatusChange := func(we *walletevent.Event) {
        var p StatusChangedPayload
        err := json.Unmarshal([]byte(we.Message), &p)
        require.NoError(t, err)

        if statusEventCount == 0 {
            require.Equal(t, txs[0].ChainID, p.ChainID)
            require.Equal(t, txs[0].Hash, p.Hash)
            require.Equal(t, Success, p.Status)

            status, err := m.Watch(context.Background(), txs[0].ChainID, txs[0].Hash)
            require.NoError(t, err)
            require.Equal(t, Success, *status)
            err = m.Delete(context.Background(), txs[0].ChainID, txs[0].Hash)
            require.NoError(t, err)

            status, err = m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
            require.NoError(t, err)
            require.Equal(t, Pending, *status)
            firsDoneWG.Done()
        } else {
            _, err := m.Watch(context.Background(), txs[0].ChainID, txs[0].Hash)
            require.Equal(t, err, sql.ErrNoRows)

            status, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
            require.NoError(t, err)
            require.Equal(t, Success, *status)
            err = m.Delete(context.Background(), txs[1].ChainID, txs[1].Hash)
            require.NoError(t, err)
        }

        statusEventCount++
    }

    for j := 0; j < 6; j++ {
        select {
        case we := <-eventChan:
            if EventPendingTransactionUpdate == we.Type {
                storeEventCount++
            } else if EventPendingTransactionStatusChanged == we.Type {
                validateStatusChange(&we)
            }
        case <-time.After(1 * time.Second):
            t.Fatal("timeout waiting for the status update event")
        }
    }

    _, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
    require.Equal(t, err, sql.ErrNoRows)

    // One for add and one for delete
    require.Equal(t, 4, storeEventCount)
    require.Equal(t, 2, statusEventCount)

    err = m.Stop()
    require.NoError(t, err)

    waitForTaskToStop(m)

    res, err := m.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(res), "should have no pending tx")

    sub.Unsubscribe()
}

func TestPendingTransactions(t *testing.T) {
    manager, stop, _, _ := setupTestTransactionDB(t, nil)
    defer stop()

    tx := GenerateTestPendingTransactions(0, 1)[0]

    rst, err := manager.GetAllPending()
    require.NoError(t, err)
    require.Nil(t, rst)

    rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
    require.NoError(t, err)
    require.Nil(t, rst)

    err = manager.addPending(&tx)
    require.NoError(t, err)

    rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
    require.NoError(t, err)
    require.Equal(t, 1, len(rst))
    require.Equal(t, tx, *rst[0])

    rst, err = manager.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 1, len(rst))
    require.Equal(t, tx, *rst[0])

    rst, err = manager.GetPendingByAddress([]uint64{777}, eth.Address{2})
    require.NoError(t, err)
    require.Nil(t, rst)

    err = manager.Delete(context.Background(), common.ChainID(777), tx.Hash)
    require.Error(t, err, ErrStillPending)

    rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
    require.NoError(t, err)
    require.Equal(t, 0, len(rst))

    rst, err = manager.GetAllPending()
    require.NoError(t, err)
    require.Equal(t, 0, len(rst))
}