examples/chat2-reliable/chat_reliability_test.go
package main
import (
"chat2-reliable/pb"
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type TestEnvironment struct {
nodes []*node.WakuNode
chats []*Chat
}
func setupTestEnvironment(ctx context.Context, t *testing.T, nodeCount int) (*TestEnvironment, error) {
t.Logf("Setting up test environment with %d nodes", nodeCount)
env := &TestEnvironment{
nodes: make([]*node.WakuNode, nodeCount),
chats: make([]*Chat, nodeCount),
}
for i := 0; i < nodeCount; i++ {
node, err := setupTestNode(ctx, t)
if err != nil {
return nil, fmt.Errorf("failed to set up node %d: %w", i, err)
}
env.nodes[i] = node
chat, err := setupTestChat(ctx, node, fmt.Sprintf("Node%d", i))
if err != nil {
return nil, fmt.Errorf("failed to set up chat for node %d: %w", i, err)
}
env.chats[i] = chat
}
t.Log("Connecting nodes in ring topology")
for i := 0; i < nodeCount; i++ {
nextIndex := (i + 1) % nodeCount
_, err := env.nodes[i].AddPeer(env.nodes[nextIndex].ListenAddresses()[0], peerstore.Static, env.chats[i].options.Relay.Topics.Value())
if err != nil {
return nil, fmt.Errorf("failed to connect node %d to node %d: %w", i, nextIndex, err)
}
}
t.Log("Test environment setup complete")
return env, nil
}
func setupTestNode(ctx context.Context, t *testing.T) (*node.WakuNode, error) {
opts := []node.WakuNodeOption{
node.WithWakuRelay(),
// node.WithWakuStore(),
}
node, err := node.New(opts...)
if err != nil {
return nil, err
}
if err := node.Start(ctx); err != nil {
return nil, err
}
// if node.Store() == nil {
// t.Logf("Store protocol is not enabled on node %d", index)
// }
return node, nil
}
type PeerConnection = node.PeerConnection
func setupTestChat(ctx context.Context, node *node.WakuNode, nickname string) (*Chat, error) {
topics := cli.StringSlice{}
topics.Set(relay.DefaultWakuTopic)
options := Options{
Nickname: nickname,
ContentTopic: "/test/1/chat/proto",
Relay: RelayOptions{
Enable: true,
Topics: topics,
},
}
// Create a channel of the correct type
connNotifier := make(chan PeerConnection)
chat := NewChat(ctx, node, connNotifier, options)
if chat == nil {
return nil, fmt.Errorf("failed to create chat instance")
}
return chat, nil
}
func areNodesConnected(nodes []*node.WakuNode, expectedPeers int) bool {
for _, node := range nodes {
if len(node.Host().Network().Peers()) != expectedPeers {
return false
}
}
return true
}
// TestLamportTimestamps verifies that Lamport timestamps are correctly updated
func TestLamportTimestamps(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
t.Log("Starting TestLamportTimestamps")
nodeCount := 3
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 2)
}, 30*time.Second, 1*time.Second, "Nodes failed to connect")
for i, chat := range env.chats {
t.Logf("Node %d initial Lamport timestamp: %d", i, chat.getLamportTimestamp())
}
t.Log("Sending message from Node 0")
env.chats[0].SendMessage("Message from Node 0")
t.Log("Waiting for message propagation")
require.Eventually(t, func() bool {
for _, chat := range env.chats {
if chat.getLamportTimestamp() == 0 {
return false
}
}
return true
}, 30*time.Second, 1*time.Second, "Message propagation failed")
assert.Greater(t, env.chats[0].getLamportTimestamp(), int32(0), "Sender's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[1].getLamportTimestamp(), int32(0), "Node 1's Lamport timestamp should be greater than 0")
assert.Greater(t, env.chats[2].getLamportTimestamp(), int32(0), "Node 2's Lamport timestamp should be greater than 0")
assert.NotEmpty(t, env.chats[1].messageHistory, "Node 1 should have received the message")
assert.NotEmpty(t, env.chats[2].messageHistory, "Node 2 should have received the message")
if len(env.chats[1].messageHistory) > 0 {
assert.Equal(t, "Message from Node 0", env.chats[1].messageHistory[0].Content, "Node 1 should have received the correct message")
}
if len(env.chats[2].messageHistory) > 0 {
assert.Equal(t, "Message from Node 0", env.chats[2].messageHistory[0].Content, "Node 2 should have received the correct message")
}
t.Log("TestLamportTimestamps completed successfully")
}
// TestCausalOrdering ensures messages are processed in the correct causal order
func TestCausalOrdering(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
t.Log("Starting TestCausalOrdering")
nodeCount := 3
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 2)
}, 30*time.Second, 1*time.Second, "Nodes failed to connect")
t.Log("Sending messages from different nodes")
env.chats[0].SendMessage("Message 1 from Node 0")
time.Sleep(100 * time.Millisecond)
env.chats[1].SendMessage("Message 2 from Node 1")
time.Sleep(100 * time.Millisecond)
env.chats[2].SendMessage("Message 3 from Node 2")
time.Sleep(100 * time.Millisecond)
t.Log("Waiting for message propagation")
require.Eventually(t, func() bool {
for i, chat := range env.chats {
t.Logf("Node %d message history length: %d", i, len(chat.messageHistory))
if len(chat.messageHistory) != 3 {
return false
}
}
return true
}, 30*time.Second, 1*time.Second, "Messages did not propagate to all nodes")
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, 3, "Node %d should have 3 messages", i)
assert.Equal(t, "Message 1 from Node 0", chat.messageHistory[0].Content, "Node %d: First message incorrect", i)
assert.Equal(t, "Message 2 from Node 1", chat.messageHistory[1].Content, "Node %d: Second message incorrect", i)
assert.Equal(t, "Message 3 from Node 2", chat.messageHistory[2].Content, "Node %d: Third message incorrect", i)
}
t.Log("TestCausalOrdering completed successfully")
}
func TestBloomFilterDuplicateDetection(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
t.Log("Starting TestBloomFilterDuplicateDetection")
nodeCount := 2
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 1)
}, 30*time.Second, 1*time.Second, "Nodes failed to connect")
t.Log("Sending a message")
testMessage := "Test message"
env.chats[0].SendMessage(testMessage)
t.Log("Waiting for message propagation")
var receivedMsg *pb.Message
require.Eventually(t, func() bool {
if len(env.chats[1].messageHistory) == 1 {
receivedMsg = env.chats[1].messageHistory[0]
return true
}
return false
}, 30*time.Second, 1*time.Second, "Message did not propagate to second node")
require.NotNil(t, receivedMsg, "Received message should not be nil")
t.Log("Simulating receiving the same message again")
// Create a duplicate message
duplicateMsg := &pb.Message{
SenderId: receivedMsg.SenderId,
MessageId: receivedMsg.MessageId, // Use the same MessageId to simulate a true duplicate
LamportTimestamp: receivedMsg.LamportTimestamp,
CausalHistory: receivedMsg.CausalHistory,
ChannelId: receivedMsg.ChannelId,
BloomFilter: receivedMsg.BloomFilter,
Content: receivedMsg.Content,
}
env.chats[1].processReceivedMessage(duplicateMsg)
assert.Len(t, env.chats[1].messageHistory, 1, "Node 1 should still have only one message (no duplicates)")
t.Log("TestBloomFilterDuplicateDetection completed successfully")
}
// TestNetworkPartition ensures that missing messages can be recovered
func TestNetworkPartition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
t.Log("Starting TestMessageRecovery")
nodeCount := 3
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
nc := NewNetworkController(ctx, env.nodes, env.chats)
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 2)
}, 60*time.Second, 1*time.Second, "Nodes failed to connect")
t.Log("Stage 1: Sending initial messages")
env.chats[0].SendMessage("Message 1")
time.Sleep(100 * time.Millisecond)
env.chats[1].SendMessage("Message 2")
time.Sleep(100 * time.Millisecond)
t.Log("Waiting for message propagation")
require.Eventually(t, func() bool {
for _, chat := range env.chats {
if len(chat.messageHistory) != 2 {
return false
}
}
return true
}, 30*time.Second, 1*time.Second, "Messages did not propagate to all nodes")
// Verify that Node 2 has messages before disconnection
require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 does not have all messages")
t.Log("Stage 2: Simulating network partition for Node 2")
nc.DisconnectNode(env.nodes[2])
time.Sleep(1 * time.Second) // Allow time for disconnection to take effect
t.Log("Stage 3: Sending message that Node 2 will miss")
env.chats[0].SendMessage("Missed Message")
time.Sleep(100 * time.Millisecond)
t.Log("Stage 4: Reconnecting Node 2")
nc.ReconnectNode(env.nodes[2])
time.Sleep(5 * time.Second) // Allow time for reconnection to take effect
// Verify that Node 2 didn't receive the message
require.Equal(t, 2, len(env.chats[2].messageHistory), "Node 2 should not have received the missed message")
t.Log("Stage 5: Sending a new message that depends on the missed message")
env.chats[1].SendMessage("New Message")
// Verify that Node 2 received the new message
require.Eventually(t, func() bool {
msgCount := len(env.chats[2].messageHistory)
return msgCount >= 3
}, 30*time.Second, 5*time.Second, "Node 2 should have received the new message")
// Stage 6: Wait for message recovery
t.Log("Stage 6: Waiting for message recovery")
require.Eventually(t, func() bool {
msgCount := len(env.chats[2].messageHistory)
return msgCount == 4
}, 30*time.Second, 5*time.Second, "Message recovery failed")
// Print final message history for all nodes
for i, chat := range env.chats {
t.Logf("Node %d final message history:", i)
for j, msg := range chat.messageHistory {
t.Logf(" Message %d: %s", j+1, msg.Content)
}
}
// Verify the results
for i, msg := range env.chats[2].messageHistory {
t.Logf("Message %d: %s", i+1, msg.Content)
}
assert.Equal(t, "Message 1", env.chats[2].messageHistory[0].Content, "First message incorrect")
assert.Equal(t, "Message 2", env.chats[2].messageHistory[1].Content, "Second message incorrect")
assert.Equal(t, "Missed Message", env.chats[2].messageHistory[2].Content, "Missed message not recovered")
assert.Equal(t, "New Message", env.chats[2].messageHistory[3].Content, "New message incorrect")
}
func TestConcurrentMessageSending(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
t.Log("Starting TestConcurrentMessageSending")
nodeCount := 5
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 2)
}, 60*time.Second, 3*time.Second, "Nodes failed to connect")
messageCount := 10
var wg sync.WaitGroup
t.Log("Sending messages concurrently")
for i := 0; i < len(env.chats); i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for j := 0; j < messageCount; j++ {
env.chats[index].SendMessage(fmt.Sprintf("Message %d from Node %d", j, index))
time.Sleep(10 * time.Millisecond)
}
}(i)
}
wg.Wait()
t.Log("Waiting for message propagation")
totalExpectedMessages := len(env.chats) * messageCount
require.Eventually(t, func() bool {
for _, chat := range env.chats {
if len(chat.messageHistory) != totalExpectedMessages {
return false
}
}
return true
}, 2*time.Minute, 1*time.Second, "Messages did not propagate to all nodes")
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, totalExpectedMessages, "Node %d should have received all messages", i)
}
t.Log("TestConcurrentMessageSending completed successfully")
}
func TestLargeGroupScaling(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
t.Log("Starting TestLargeGroupScaling")
nodeCount := 20
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 2)
}, 2*time.Minute, 3*time.Second, "Nodes failed to connect")
// Send a message from the first node
env.chats[0].SendMessage("Broadcast message to large group")
// Allow time for propagation
time.Sleep(time.Duration(nodeCount*100) * time.Millisecond)
// Verify all nodes received the message
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, 1, "Node %d should have received the broadcast message", i)
assert.Equal(t, "Broadcast message to large group", chat.messageHistory[0].Content)
}
t.Log("TestLargeGroupScaling completed successfully")
}
func TestEagerPushMechanism(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
nodeCount := 2
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
nc := NewNetworkController(ctx, env.nodes, env.chats)
// Disconnect node 1
nc.DisconnectNode(env.nodes[1])
// Send a message from node 0
env.chats[0].SendMessage("Test eager push")
// Wait for the message to be added to the outgoing buffer
time.Sleep(1 * time.Second)
// Reconnect node 1
nc.ReconnectNode(env.nodes[1])
// Wait for eager push to resend the message
time.Sleep(5 * time.Second)
// Check if node 1 received the message
assert.Eventually(t, func() bool {
return len(env.chats[1].messageHistory) == 1
}, 10*time.Second, 1*time.Second, "Node 1 should have received the message via eager push")
}
func TestBloomFilterWindow(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
nodeCount := 2
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
// Reduce bloom filter window for testing
for _, chat := range env.chats {
chat.bloomFilter.window = 2 * time.Second
}
// Send a message
env.chats[0].SendMessage("Test bloom filter window")
messageID := env.chats[0].messageHistory[0].MessageId
// Check if the message is in the bloom filter
assert.Eventually(t, func() bool {
return env.chats[1].bloomFilter.Test(messageID)
}, 30*time.Second, 1*time.Second, "Message should be in the bloom filter")
// Wait for the bloom filter window to pass
time.Sleep(3 * time.Second)
// Clean the bloom filter
env.chats[1].bloomFilter.Clean()
time.Sleep(3 * time.Second)
// Check if the message is no longer in the bloom filter
assert.False(t, env.chats[1].bloomFilter.Test(messageID), "Message should no longer be in the bloom filter")
// Send another message to ensure the filter still works for new messages
env.chats[0].SendMessage("New test message")
time.Sleep(1 * time.Second)
newMessageID := env.chats[0].messageHistory[1].MessageId
// Check if the new message is in the bloom filter
assert.Eventually(t, func() bool {
return env.chats[1].bloomFilter.Test(newMessageID)
}, 30*time.Second, 1*time.Second, "New message should be in the bloom filter")
}
func TestConflictResolution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
nodeCount := 3
env, err := setupTestEnvironment(ctx, t, nodeCount)
require.NoError(t, err, "Failed to set up test environment")
// Create conflicting messages with the same Lamport timestamp
conflictingMsg1 := &pb.Message{
SenderId: "Node0",
MessageId: "msg1",
LamportTimestamp: 1,
Content: "Conflict 1",
}
conflictingMsg2 := &pb.Message{
SenderId: "Node1",
MessageId: "msg2",
LamportTimestamp: 1,
Content: "Conflict 2",
}
// Process the conflicting messages in different orders on different nodes
env.chats[0].processReceivedMessage(conflictingMsg1)
env.chats[0].processReceivedMessage(conflictingMsg2)
env.chats[1].processReceivedMessage(conflictingMsg2)
env.chats[1].processReceivedMessage(conflictingMsg1)
// Check if the messages are ordered consistently across nodes
assert.Equal(t, env.chats[0].messageHistory[0].MessageId, env.chats[1].messageHistory[0].MessageId, "Conflicting messages should be ordered consistently")
assert.Equal(t, env.chats[0].messageHistory[1].MessageId, env.chats[1].messageHistory[1].MessageId, "Conflicting messages should be ordered consistently")
}
func TestNewNodeSyncAndMessagePropagation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
t.Log("Starting TestNewNodeSyncAndMessagePropagation")
// Set up initial network with 2 nodes
initialNodeCount := 2
env, err := setupTestEnvironment(ctx, t, initialNodeCount)
require.NoError(t, err, "Failed to set up initial test environment")
// Ensure initial nodes are connected
require.Eventually(t, func() bool {
return areNodesConnected(env.nodes, 1)
}, 60*time.Second, 1*time.Second, "Initial nodes failed to connect")
t.Log("Sending initial messages")
env.chats[0].SendMessage("Initial message 1")
env.chats[1].SendMessage("Initial message 2")
// Wait for message propagation
time.Sleep(5 * time.Second)
// Verify initial messages are received by both nodes
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, 2, "Node %d should have 2 initial messages", i)
}
t.Log("Adding new node to the network")
newNode, err := setupTestNode(ctx, t)
require.NoError(t, err, "Failed to set up new node")
newChat, err := setupTestChat(ctx, newNode, "NewNode")
require.NoError(t, err, "Failed to set up new chat")
env.nodes = append(env.nodes, newNode)
env.chats = append(env.chats, newChat)
// Connect new node to the network
_, err = env.nodes[2].AddPeer(env.nodes[0].ListenAddresses()[0], peerstore.Static, env.chats[2].options.Relay.Topics.Value())
require.NoError(t, err, "Failed to connect new node to the network")
t.Log("Waiting for new node to sync")
require.Eventually(t, func() bool {
msgCount := len(env.chats[2].messageHistory)
return msgCount == 2
}, 1*time.Minute, 5*time.Second, "New node failed to sync message history")
t.Log("Sending message from old node")
env.chats[0].SendMessage("Message from old node")
// Wait for message propagation
time.Sleep(10 * time.Second)
// Verify the message is received by all nodes
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, 3, "Node %d should have 3 messages", i)
}
t.Log("Sending message from new node")
env.chats[2].SendMessage("Message from new node")
// Wait for message propagation
time.Sleep(10 * time.Second)
// Verify the message from new node is received by all nodes
for i, chat := range env.chats {
assert.Len(t, chat.messageHistory, 4, "Node %d should have 4 messages", i)
}
for i := 0; i < 3; i++ {
lastMsg := env.chats[i].messageHistory[len(env.chats[i].messageHistory)-1]
assert.Equal(t, "Message from new node", lastMsg.Content, "The last message is incorrect for node %d", i)
}
t.Log("TestNewNodeSyncAndMessagePropagation completed")
}