waku-org/go-waku

View on GitHub
examples/chat2-reliable/chat_reliability_test.go

Summary

Maintainability
A
1 hr
Test Coverage
package main

import (
    "chat2-reliable/pb"
    "context"
    "fmt"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
    "github.com/urfave/cli/v2"
    "github.com/waku-org/go-waku/waku/v2/node"
    "github.com/waku-org/go-waku/waku/v2/peerstore"
    "github.com/waku-org/go-waku/waku/v2/protocol/relay"
)

type TestEnvironment struct {
    nodes []*node.WakuNode
    chats []*Chat
}

func setupTestEnvironment(ctx context.Context, t *testing.T, nodeCount int) (*TestEnvironment, error) {
    t.Logf("Setting up test environment with %d nodes", nodeCount)
    env := &TestEnvironment{
        nodes: make([]*node.WakuNode, nodeCount),
        chats: make([]*Chat, nodeCount),
    }

    for i := 0; i < nodeCount; i++ {
        node, err := setupTestNode(ctx, t)
        if err != nil {
            return nil, fmt.Errorf("failed to set up node %d: %w", i, err)
        }
        env.nodes[i] = node

        chat, err := setupTestChat(ctx, node, fmt.Sprintf("Node%d", i))
        if err != nil {
            return nil, fmt.Errorf("failed to set up chat for node %d: %w", i, err)
        }
        env.chats[i] = chat
    }

    t.Log("Connecting nodes in ring topology")
    for i := 0; i < nodeCount; i++ {
        nextIndex := (i + 1) % nodeCount
        _, err := env.nodes[i].AddPeer(env.nodes[nextIndex].ListenAddresses()[0], peerstore.Static, env.chats[i].options.Relay.Topics.Value())
        if err != nil {
            return nil, fmt.Errorf("failed to connect node %d to node %d: %w", i, nextIndex, err)
        }
    }

    t.Log("Test environment setup complete")
    return env, nil
}

func setupTestNode(ctx context.Context, t *testing.T) (*node.WakuNode, error) {
    opts := []node.WakuNodeOption{
        node.WithWakuRelay(),
        // node.WithWakuStore(),
    }
    node, err := node.New(opts...)
    if err != nil {
        return nil, err
    }
    if err := node.Start(ctx); err != nil {
        return nil, err
    }

    // if node.Store() == nil {
    //     t.Logf("Store protocol is not enabled on node %d", index)
    // }

    return node, nil
}

type PeerConnection = node.PeerConnection

func setupTestChat(ctx context.Context, node *node.WakuNode, nickname string) (*Chat, error) {
    topics := cli.StringSlice{}
    topics.Set(relay.DefaultWakuTopic)

    options := Options{
        Nickname:     nickname,
        ContentTopic: "/test/1/chat/proto",
        Relay: RelayOptions{
            Enable: true,
            Topics: topics,
        },
    }

    // Create a channel of the correct type
    connNotifier := make(chan PeerConnection)

    chat := NewChat(ctx, node, connNotifier, options)
    if chat == nil {
        return nil, fmt.Errorf("failed to create chat instance")
    }
    return chat, nil
}

func areNodesConnected(nodes []*node.WakuNode, expectedPeers int) bool {
    for _, node := range nodes {
        if len(node.Host().Network().Peers()) != expectedPeers {
            return false
        }
    }
    return true
}

// TestLamportTimestamps verifies that Lamport timestamps are correctly updated
func TestLamportTimestamps(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    t.Log("Starting TestLamportTimestamps")

    nodeCount := 3
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 2)
    }, 30*time.Second, 1*time.Second, "Nodes failed to connect")

    for i, chat := range env.chats {
        t.Logf("Node %d initial Lamport timestamp: %d", i, chat.getLamportTimestamp())
    }

    t.Log("Sending message from Node 0")
    env.chats[0].SendMessage("Message from Node 0")

    t.Log("Waiting for message propagation")
    require.Eventually(t, func() bool {
        for _, chat := range env.chats {
            if chat.getLamportTimestamp() == 0 {
                return false
            }
        }
        return true
    }, 30*time.Second, 1*time.Second, "Message propagation failed")

    assert.Greater(t, env.chats[0].getLamportTimestamp(), int32(0), "Sender's Lamport timestamp should be greater than 0")
    assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0")
    assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0")

    assert.NotEmpty(t, env.chats[1].messageHistory, "Node 1 should have received the message")
    assert.NotEmpty(t, env.chats[2].messageHistory, "Node 2 should have received the message")

    if len(env.chats[1].messageHistory) > 0 {
        assert.Equal(t, "Message from Node 0", env.chats[1].messageHistory[0].Content, "Node 1 should have received the correct message")
    }
    if len(env.chats[2].messageHistory) > 0 {
        assert.Equal(t, "Message from Node 0", env.chats[2].messageHistory[0].Content, "Node 2 should have received the correct message")
    }

    t.Log("TestLamportTimestamps completed successfully")
}

// TestCausalOrdering ensures messages are processed in the correct causal order
func TestCausalOrdering(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    t.Log("Starting TestCausalOrdering")

    nodeCount := 3
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 2)
    }, 30*time.Second, 1*time.Second, "Nodes failed to connect")

    t.Log("Sending messages from different nodes")
    env.chats[0].SendMessage("Message 1 from Node 0")
    time.Sleep(100 * time.Millisecond)
    env.chats[1].SendMessage("Message 2 from Node 1")
    time.Sleep(100 * time.Millisecond)
    env.chats[2].SendMessage("Message 3 from Node 2")
    time.Sleep(100 * time.Millisecond)

    t.Log("Waiting for message propagation")
    require.Eventually(t, func() bool {
        for i, chat := range env.chats {
            t.Logf("Node %d message history length: %d", i, len(chat.messageHistory))
            if len(chat.messageHistory) != 3 {
                return false
            }
        }
        return true
    }, 30*time.Second, 1*time.Second, "Messages did not propagate to all nodes")

    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, 3, "Node %d should have 3 messages", i)
        assert.Equal(t, "Message 1 from Node 0", chat.messageHistory[0].Content, "Node %d: First message incorrect", i)
        assert.Equal(t, "Message 2 from Node 1", chat.messageHistory[1].Content, "Node %d: Second message incorrect", i)
        assert.Equal(t, "Message 3 from Node 2", chat.messageHistory[2].Content, "Node %d: Third message incorrect", i)
    }

    t.Log("TestCausalOrdering completed successfully")
}

func TestBloomFilterDuplicateDetection(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    t.Log("Starting TestBloomFilterDuplicateDetection")

    nodeCount := 2
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 1)
    }, 30*time.Second, 1*time.Second, "Nodes failed to connect")

    t.Log("Sending a message")
    testMessage := "Test message"
    env.chats[0].SendMessage(testMessage)

    t.Log("Waiting for message propagation")
    var receivedMsg *pb.Message
    require.Eventually(t, func() bool {
        if len(env.chats[1].messageHistory) == 1 {
            receivedMsg = env.chats[1].messageHistory[0]
            return true
        }
        return false
    }, 30*time.Second, 1*time.Second, "Message did not propagate to second node")

    require.NotNil(t, receivedMsg, "Received message should not be nil")

    t.Log("Simulating receiving the same message again")

    // Create a duplicate message
    duplicateMsg := &pb.Message{
        SenderId:         receivedMsg.SenderId,
        MessageId:        receivedMsg.MessageId, // Use the same MessageId to simulate a true duplicate
        LamportTimestamp: receivedMsg.LamportTimestamp,
        CausalHistory:    receivedMsg.CausalHistory,
        ChannelId:        receivedMsg.ChannelId,
        BloomFilter:      receivedMsg.BloomFilter,
        Content:          receivedMsg.Content,
    }

    env.chats[1].processReceivedMessage(duplicateMsg)

    assert.Len(t, env.chats[1].messageHistory, 1, "Node 1 should still have only one message (no duplicates)")

    t.Log("TestBloomFilterDuplicateDetection completed successfully")
}

// TestNetworkPartition ensures that missing messages can be recovered
func TestNetworkPartition(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
    defer cancel()

    t.Log("Starting TestMessageRecovery")

    nodeCount := 3
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    nc := NewNetworkController(ctx, env.nodes, env.chats)

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 2)
    }, 60*time.Second, 1*time.Second, "Nodes failed to connect")

    t.Log("Stage 1: Sending initial messages")
    env.chats[0].SendMessage("Message 1")
    time.Sleep(100 * time.Millisecond)
    env.chats[1].SendMessage("Message 2")
    time.Sleep(100 * time.Millisecond)

    t.Log("Waiting for message propagation")
    require.Eventually(t, func() bool {
        for _, chat := range env.chats {
            if len(chat.messageHistory) != 2 {
                return false
            }
        }
        return true
    }, 30*time.Second, 1*time.Second, "Messages did not propagate to all nodes")

    // Verify that Node 2 has messages before disconnection
    require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 does not have all messages")

    t.Log("Stage 2: Simulating network partition for Node 2")
    nc.DisconnectNode(env.nodes[2])
    time.Sleep(1 * time.Second) // Allow time for disconnection to take effect

    t.Log("Stage 3: Sending message that Node 2 will miss")
    env.chats[0].SendMessage("Missed Message")
    time.Sleep(100 * time.Millisecond)

    t.Log("Stage 4: Reconnecting Node 2")
    nc.ReconnectNode(env.nodes[2])
    time.Sleep(5 * time.Second) // Allow time for reconnection to take effect

    // Verify that Node 2 didn't receive the message
    require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 should not have received the missed message")

    t.Log("Stage 5: Sending a new message that depends on the missed message")
    env.chats[1].SendMessage("New Message")

    // Verify that Node 2 received the new message
    require.Eventually(t, func() bool {
        msgCount := len(env.chats[2].messageHistory)
        return msgCount >= 3
    }, 30*time.Second, 5*time.Second, "Node 2 should have received the new message")

    // Stage 6: Wait for message recovery
    t.Log("Stage 6: Waiting for message recovery")
    require.Eventually(t, func() bool {
        msgCount := len(env.chats[2].messageHistory)
        return msgCount == 4
    }, 30*time.Second, 5*time.Second, "Message recovery failed")

    // Print final message history for all nodes
    for i, chat := range env.chats {
        t.Logf("Node %d final message history:", i)
        for j, msg := range chat.messageHistory {
            t.Logf("  Message %d: %s", j+1, msg.Content)
        }
    }

    // Verify the results
    for i, msg := range env.chats[2].messageHistory {
        t.Logf("Message %d: %s", i+1, msg.Content)
    }

    assert.Equal(t, "Message 1", env.chats[2].messageHistory[0].Content, "First message incorrect")
    assert.Equal(t, "Message 2", env.chats[2].messageHistory[1].Content, "Second message incorrect")
    assert.Equal(t, "Missed Message", env.chats[2].messageHistory[2].Content, "Missed message not recovered")
    assert.Equal(t, "New Message", env.chats[2].messageHistory[3].Content, "New message incorrect")
}

func TestConcurrentMessageSending(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
    defer cancel()

    t.Log("Starting TestConcurrentMessageSending")

    nodeCount := 5
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 2)
    }, 60*time.Second, 3*time.Second, "Nodes failed to connect")

    messageCount := 10
    var wg sync.WaitGroup

    t.Log("Sending messages concurrently")
    for i := 0; i < len(env.chats); i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            for j := 0; j < messageCount; j++ {
                env.chats[index].SendMessage(fmt.Sprintf("Message %d from Node %d", j, index))
                time.Sleep(10 * time.Millisecond)
            }
        }(i)
    }

    wg.Wait()

    t.Log("Waiting for message propagation")
    totalExpectedMessages := len(env.chats) * messageCount
    require.Eventually(t, func() bool {
        for _, chat := range env.chats {
            if len(chat.messageHistory) != totalExpectedMessages {
                return false
            }
        }
        return true
    }, 2*time.Minute, 1*time.Second, "Messages did not propagate to all nodes")

    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, totalExpectedMessages, "Node %d should have received all messages", i)
    }

    t.Log("TestConcurrentMessageSending completed successfully")
}

func TestLargeGroupScaling(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    t.Log("Starting TestLargeGroupScaling")

    nodeCount := 20
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 2)
    }, 2*time.Minute, 3*time.Second, "Nodes failed to connect")

    // Send a message from the first node
    env.chats[0].SendMessage("Broadcast message to large group")

    // Allow time for propagation
    time.Sleep(time.Duration(nodeCount*100) * time.Millisecond)

    // Verify all nodes received the message
    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, 1, "Node %d should have received the broadcast message", i)
        assert.Equal(t, "Broadcast message to large group", chat.messageHistory[0].Content)
    }

    t.Log("TestLargeGroupScaling completed successfully")
}

func TestEagerPushMechanism(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    nodeCount := 2
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    nc := NewNetworkController(ctx, env.nodes, env.chats)

    // Disconnect node 1
    nc.DisconnectNode(env.nodes[1])

    // Send a message from node 0
    env.chats[0].SendMessage("Test eager push")

    // Wait for the message to be added to the outgoing buffer
    time.Sleep(1 * time.Second)

    // Reconnect node 1
    nc.ReconnectNode(env.nodes[1])

    // Wait for eager push to resend the message
    time.Sleep(5 * time.Second)

    // Check if node 1 received the message
    assert.Eventually(t, func() bool {
        return len(env.chats[1].messageHistory) == 1
    }, 10*time.Second, 1*time.Second, "Node 1 should have received the message via eager push")
}

func TestBloomFilterWindow(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    nodeCount := 2
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    // Reduce bloom filter window for testing
    for _, chat := range env.chats {
        chat.bloomFilter.window = 2 * time.Second
    }

    // Send a message
    env.chats[0].SendMessage("Test bloom filter window")
    messageID := env.chats[0].messageHistory[0].MessageId

    // Check if the message is in the bloom filter
    assert.Eventually(t, func() bool {
        return env.chats[1].bloomFilter.Test(messageID)
    }, 30*time.Second, 1*time.Second, "Message should be in the bloom filter")

    // Wait for the bloom filter window to pass
    time.Sleep(3 * time.Second)

    // Clean the bloom filter
    env.chats[1].bloomFilter.Clean()

    time.Sleep(3 * time.Second)

    // Check if the message is no longer in the bloom filter
    assert.False(t, env.chats[1].bloomFilter.Test(messageID), "Message should no longer be in the bloom filter")

    // Send another message to ensure the filter still works for new messages
    env.chats[0].SendMessage("New test message")
    time.Sleep(1 * time.Second)

    newMessageID := env.chats[0].messageHistory[1].MessageId
    // Check if the new message is in the bloom filter
    assert.Eventually(t, func() bool {
        return env.chats[1].bloomFilter.Test(newMessageID)
    }, 30*time.Second, 1*time.Second, "New message should be in the bloom filter")
}

func TestConflictResolution(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
    defer cancel()

    nodeCount := 3
    env, err := setupTestEnvironment(ctx, t, nodeCount)
    require.NoError(t, err, "Failed to set up test environment")

    // Create conflicting messages with the same Lamport timestamp
    conflictingMsg1 := &pb.Message{
        SenderId:         "Node0",
        MessageId:        "msg1",
        LamportTimestamp: 1,
        Content:          "Conflict 1",
    }
    conflictingMsg2 := &pb.Message{
        SenderId:         "Node1",
        MessageId:        "msg2",
        LamportTimestamp: 1,
        Content:          "Conflict 2",
    }

    // Process the conflicting messages in different orders on different nodes
    env.chats[0].processReceivedMessage(conflictingMsg1)
    env.chats[0].processReceivedMessage(conflictingMsg2)

    env.chats[1].processReceivedMessage(conflictingMsg2)
    env.chats[1].processReceivedMessage(conflictingMsg1)

    // Check if the messages are ordered consistently across nodes
    assert.Equal(t, env.chats[0].messageHistory[0].MessageId, env.chats[1].messageHistory[0].MessageId, "Conflicting messages should be ordered consistently")
    assert.Equal(t, env.chats[0].messageHistory[1].MessageId, env.chats[1].messageHistory[1].MessageId, "Conflicting messages should be ordered consistently")
}

func TestNewNodeSyncAndMessagePropagation(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    t.Log("Starting TestNewNodeSyncAndMessagePropagation")

    // Set up initial network with 2 nodes
    initialNodeCount := 2
    env, err := setupTestEnvironment(ctx, t, initialNodeCount)
    require.NoError(t, err, "Failed to set up initial test environment")

    // Ensure initial nodes are connected
    require.Eventually(t, func() bool {
        return areNodesConnected(env.nodes, 1)
    }, 60*time.Second, 1*time.Second, "Initial nodes failed to connect")

    t.Log("Sending initial messages")
    env.chats[0].SendMessage("Initial message 1")
    env.chats[1].SendMessage("Initial message 2")

    // Wait for message propagation
    time.Sleep(5 * time.Second)

    // Verify initial messages are received by both nodes
    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, 2, "Node %d should have 2 initial messages", i)
    }

    t.Log("Adding new node to the network")
    newNode, err := setupTestNode(ctx, t)
    require.NoError(t, err, "Failed to set up new node")
    newChat, err := setupTestChat(ctx, newNode, "NewNode")
    require.NoError(t, err, "Failed to set up new chat")

    env.nodes = append(env.nodes, newNode)
    env.chats = append(env.chats, newChat)

    // Connect new node to the network
    _, err = env.nodes[2].AddPeer(env.nodes[0].ListenAddresses()[0], peerstore.Static, env.chats[2].options.Relay.Topics.Value())
    require.NoError(t, err, "Failed to connect new node to the network")

    t.Log("Waiting for new node to sync")
    require.Eventually(t, func() bool {
        msgCount := len(env.chats[2].messageHistory)
        return msgCount == 2
    }, 1*time.Minute, 5*time.Second, "New node failed to sync message history")

    t.Log("Sending message from old node")
    env.chats[0].SendMessage("Message from old node")

    // Wait for message propagation
    time.Sleep(10 * time.Second)

    // Verify the message is received by all nodes
    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, 3, "Node %d should have 3 messages", i)
    }

    t.Log("Sending message from new node")
    env.chats[2].SendMessage("Message from new node")

    // Wait for message propagation
    time.Sleep(10 * time.Second)

    // Verify the message from new node is received by all nodes
    for i, chat := range env.chats {
        assert.Len(t, chat.messageHistory, 4, "Node %d should have 4 messages", i)
    }

    for i := 0; i < 3; i++ {
        lastMsg := env.chats[i].messageHistory[len(env.chats[i].messageHistory)-1]
        assert.Equal(t, "Message from new node", lastMsg.Content, "The last message is incorrect for node %d", i)
    }

    t.Log("TestNewNodeSyncAndMessagePropagation completed")
}