waku-org/go-waku

View on GitHub
waku/v2/node/wakunode2_test.go

Summary

Maintainability
C
1 day
Test Coverage
package node

import (
    "bytes"
    "context"
    "fmt"
    "math/big"
    "math/rand"
    "net"
    "os"
    "sync"
    "testing"
    "time"

    wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
    "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"

    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/stretchr/testify/require"
    "github.com/waku-org/go-waku/tests"
    "github.com/waku-org/go-waku/waku/persistence"
    "github.com/waku-org/go-waku/waku/persistence/sqlite"
    "github.com/waku-org/go-waku/waku/v2/dnsdisc"
    "github.com/waku-org/go-waku/waku/v2/peerstore"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/protocol/filter"
    "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
    "github.com/waku-org/go-waku/waku/v2/protocol/pb"
    "github.com/waku-org/go-waku/waku/v2/protocol/relay"

    "github.com/waku-org/go-waku/waku/v2/utils"
    "go.uber.org/zap"
    "google.golang.org/protobuf/proto"
)

func createTestMsg(version uint32) *pb.WakuMessage {
    message := new(pb.WakuMessage)
    message.Payload = []byte{0, 1, 2}
    message.Version = proto.Uint32(version)
    message.Timestamp = proto.Int64(123456)
    message.ContentTopic = "abc"
    return message
}

func TestWakuNode2(t *testing.T) {
    hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")

    key, err := tests.RandomHex(32)
    require.NoError(t, err)
    prvKey, err := crypto.HexToECDSA(key)
    require.NoError(t, err)

    ctx := context.Background()

    wakuNode, err := New(
        WithPrivateKey(prvKey),
        WithHostAddress(hostAddr),
        WithWakuRelay(),
    )
    require.NoError(t, err)

    err = wakuNode.Start(ctx)
    require.NoError(t, err)

    _, err = wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
    require.NoError(t, err)
    time.Sleep(time.Second * 1)

    err = wakuNode.Relay().Unsubscribe(ctx, protocol.NewContentFilter("waku/rs/1/1"))
    require.NoError(t, err)

    defer wakuNode.Stop()
}

func int2Bytes(i int) []byte {
    if i > 0 {
        return append(big.NewInt(int64(i)).Bytes(), byte(1))
    }
    return append(big.NewInt(int64(i)).Bytes(), byte(0))
}

func TestUpAndDown(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
    defer cancel()

    hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    key1, _ := tests.RandomHex(32)
    prvKey1, _ := crypto.HexToECDSA(key1)

    nodes, err := dnsdisc.RetrieveNodes(context.Background(), "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im")
    require.NoError(t, err)

    var bootnodes []*enode.Node
    for _, n := range nodes {
        if n.ENR != nil {
            bootnodes = append(bootnodes, n.ENR)
        }
    }

    wakuNode1, err := New(
        WithPrivateKey(prvKey1),
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithDiscoveryV5(0, bootnodes, true),
    )

    require.NoError(t, err)

    for i := 0; i < 5; i++ {
        utils.Logger().Info("Starting...", zap.Int("iteration", i))
        err = wakuNode1.Start(ctx)
        require.NoError(t, err)
        err = wakuNode1.DiscV5().Start(ctx)
        require.NoError(t, err)
        utils.Logger().Info("Started!", zap.Int("iteration", i))
        time.Sleep(3 * time.Second)
        utils.Logger().Info("Stopping...", zap.Int("iteration", i))
        wakuNode1.Stop()
        utils.Logger().Info("Stopped!", zap.Int("iteration", i))
    }
}

func Test500(t *testing.T) {
    maxMsgs := 500
    maxMsgBytes := int2Bytes(maxMsgs)

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    key1, _ := tests.RandomHex(32)
    prvKey1, _ := crypto.HexToECDSA(key1)

    hostAddr2, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    key2, _ := tests.RandomHex(32)
    prvKey2, _ := crypto.HexToECDSA(key2)

    wakuNode1, err := New(
        WithPrivateKey(prvKey1),
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    wakuNode2, err := New(
        WithPrivateKey(prvKey2),
        WithHostAddress(hostAddr2),
        WithWakuRelay(),
    )
    require.NoError(t, err)
    err = wakuNode2.Start(ctx)
    require.NoError(t, err)
    defer wakuNode2.Stop()

    err = wakuNode2.DialPeer(ctx, wakuNode1.ListenAddresses()[0].String())
    require.NoError(t, err)

    time.Sleep(2 * time.Second)

    sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
    require.NoError(t, err)
    sub2, err := wakuNode2.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
    require.NoError(t, err)

    wg := sync.WaitGroup{}
    wg.Add(3)
    go func() {
        defer wg.Done()

        ticker := time.NewTimer(30 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                require.Fail(t, "Timeout Sub1")
            case msg := <-sub1[0].Ch:
                if msg == nil {
                    return
                }
                if bytes.Equal(msg.Message().Payload, maxMsgBytes) {
                    return
                }
            }
        }
    }()

    go func() {
        defer wg.Done()

        ticker := time.NewTimer(30 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                require.Fail(t, "Timeout Sub2")
            case msg := <-sub2[0].Ch:
                if msg == nil {
                    return
                }
                if bytes.Equal(msg.Message().Payload, maxMsgBytes) {
                    return
                }
            }
        }
    }()

    go func() {
        defer wg.Done()
        for i := 1; i <= maxMsgs; i++ {
            msg := createTestMsg(0)
            msg.Payload = int2Bytes(i)
            msg.Timestamp = proto.Int64(int64(i))
            if _, err := wakuNode2.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
                require.Fail(t, "Could not publish all messages")
            }
            time.Sleep(5 * time.Millisecond)
        }
    }()

    wg.Wait()

}

func TestDecoupledStoreFromRelay(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // NODE1: Relay Node + Filter Server
    hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode1, err := New(
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithWakuFilterFullNode(),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    subs, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
    require.NoError(t, err)
    defer subs[0].Unsubscribe()

    // NODE2: Filter Client/Store
    db, err := sqlite.NewDB(":memory:", utils.Logger())
    require.NoError(t, err)
    dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
    require.NoError(t, err)

    hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode2, err := New(
        WithHostAddress(hostAddr2),
        WithWakuFilterLightNode(),
        WithWakuStore(),
        WithMessageProvider(dbStore),
    )
    require.NoError(t, err)
    err = wakuNode2.Start(ctx)
    require.NoError(t, err)
    defer wakuNode2.Stop()

    peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
    require.NoError(t, err)

    subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
        PubsubTopic:   relay.DefaultWakuTopic,
        ContentTopics: protocol.NewContentTopicSet("abc"),
    }, filter.WithPeer(peerID))
    require.NoError(t, err)

    // Sleep to make sure the filter is subscribed
    time.Sleep(1 * time.Second)

    // Send MSG1 on NODE1
    msg := createTestMsg(0)
    msg.Payload = []byte{1, 2, 3, 4, 5}
    msg.Timestamp = utils.GetUnixEpoch()

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        // MSG1 should be pushed in NODE2 via filter
        defer wg.Done()
        env, ok := <-subscription[0].C
        if !ok {
            require.Fail(t, "no message")
        }
        require.Equal(t, msg.Timestamp, env.Message().Timestamp)
    }()

    time.Sleep(500 * time.Millisecond)

    if _, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
        require.Fail(t, "Could not publish all messages")
    }

    wg.Wait()

    // NODE3: Query from NODE2
    hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode3, err := New(
        WithHostAddress(hostAddr3),
        WithWakuFilterLightNode(),
    )
    require.NoError(t, err)
    err = wakuNode3.Start(ctx)
    require.NoError(t, err)
    defer wakuNode3.Stop()

    _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, legacy_store.StoreID_v20beta4)
    require.NoError(t, err)
    time.Sleep(2 * time.Second)
    // NODE2 should have returned the message received via filter
    result, err := wakuNode3.LegacyStore().Query(ctx, legacy_store.Query{})
    require.NoError(t, err)
    require.Len(t, result.Messages, 1)
    require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
}

func TestStaticShardingMultipleTopics(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    testClusterID := uint16(20)

    // Node1 with Relay
    hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode1, err := New(
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithClusterID(testClusterID),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(0))
    pubSubTopic1Str := pubSubTopic1.String()
    contentTopic1 := "/test/2/my-app/sharded"

    pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(10))
    pubSubTopic2Str := pubSubTopic2.String()
    contentTopic2 := "/test/3/my-app/sharded"

    require.Equal(t, testClusterID, wakuNode1.ClusterID())

    r := wakuNode1.Relay()

    subs1, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic1Str, contentTopic1))
    require.NoError(t, err)

    subs2, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic2Str, contentTopic2))
    require.NoError(t, err)

    require.NotEqual(t, subs1[0].ID, subs2[0].ID)

    require.True(t, r.IsSubscribed(pubSubTopic1Str))
    require.True(t, r.IsSubscribed(pubSubTopic2Str))

    s1, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic1Str, contentTopic1)
    require.NoError(t, err)
    s2, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic2Str, contentTopic2)
    require.NoError(t, err)
    require.Equal(t, s1.ID, subs1[0].ID)
    require.Equal(t, s2.ID, subs2[0].ID)

    // Wait for subscriptions
    time.Sleep(1 * time.Second)

    // Send message to subscribed topic
    msg := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

    _, err = r.Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic1Str))
    require.NoError(t, err)

    time.Sleep(100 * time.Millisecond)

    var wg sync.WaitGroup
    wg.Add(1)
    // Message msg could be retrieved
    go func() {
        defer wg.Done()
        env, ok := <-subs1[0].Ch
        require.True(t, ok, "no message retrieved")
        require.Equal(t, msg.Timestamp, env.Message().Timestamp)
    }()

    wg.Wait()

    // Send another message to non-subscribed pubsub topic, but subscribed content topic
    msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2")
    pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321))
    pubSubTopic3Str := pubSubTopic3.String()
    _, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str))
    require.Error(t, err)

    time.Sleep(100 * time.Millisecond)

    // No message could be retrieved
    tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

    // Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up
    msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3")

    _, err = r.Publish(ctx, msg3, relay.WithPubSubTopic(pubSubTopic1Str))
    require.NoError(t, err)

    time.Sleep(100 * time.Millisecond)

    // No message could be retrieved
    tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

}

func TestStaticShardingLimits(t *testing.T) {

    log := utils.Logger()

    if os.Getenv("RUN_FLAKY_TESTS") != "true" {

        log.Info("Skipping", zap.String("test", t.Name()),
            zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true"))
        t.SkipNow()
    }

    ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
    defer cancel()

    testClusterID := uint16(21)

    // Node1 with Relay
    hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
    require.NoError(t, err)
    wakuNode1, err := New(
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithDiscoveryV5(uint(discv5UDPPort1), nil, true),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    // Node2 with Relay
    hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
    require.NoError(t, err)
    wakuNode2, err := New(
        WithHostAddress(hostAddr2),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true),
    )
    require.NoError(t, err)
    err = wakuNode2.Start(ctx)
    require.NoError(t, err)
    defer wakuNode2.Stop()

    err = wakuNode1.DiscV5().Start(ctx)
    require.NoError(t, err)
    err = wakuNode2.DiscV5().Start(ctx)
    require.NoError(t, err)

    // Wait for discovery
    time.Sleep(3 * time.Second)

    contentTopic1 := "/test/2/my-app/sharded"

    r1 := wakuNode1.Relay()
    r2 := wakuNode2.Relay()

    var shardedPubSubTopics []string

    // Subscribe topics related to static sharding
    for i := 0; i < 1024; i++ {
        shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i))
        _, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
        require.NoError(t, err)
        time.Sleep(10 * time.Millisecond)
    }

    // Let ENR updates to finish
    time.Sleep(3 * time.Second)

    // Subscribe topics related to static sharding
    for i := 0; i < 1024; i++ {
        _, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
        require.NoError(t, err)
        time.Sleep(10 * time.Millisecond)
    }

    // Let ENR updates to finish
    time.Sleep(3 * time.Second)

    // Check ENR value after 1024 subscriptions
    shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record())
    require.NoError(t, err)
    require.Equal(t, testClusterID, shardsENR.ClusterID)
    require.Equal(t, 1, len(shardsENR.ShardIDs))

    // Prepare message
    msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

    // Select shard to publish
    randomShard := rand.Intn(1024)

    // Check both nodes are subscribed
    require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard]))
    require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard]))

    time.Sleep(1 * time.Second)

    // Publish on node1
    _, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard]))
    require.NoError(t, err)

    time.Sleep(1 * time.Second)

    s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1)
    require.NoError(t, err)

    var wg sync.WaitGroup

    // Retrieve on node2
    tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch)

}

func TestPeerExchangeRatelimit(t *testing.T) {
    log := utils.Logger()

    if os.Getenv("RUN_FLAKY_TESTS") != "true" {

        log.Info("Skipping", zap.String("test", t.Name()),
            zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true"))
        t.SkipNow()
    }

    ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
    defer cancel()

    testClusterID := uint16(21)

    // Node1 with Relay
    hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode1, err := New(
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithPeerExchange(peer_exchange.WithRateLimiter(1, 1)),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    // Node2 with Relay
    hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    wakuNode2, err := New(
        WithHostAddress(hostAddr2),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithPeerExchange(peer_exchange.WithRateLimiter(1, 1)),
    )
    require.NoError(t, err)
    err = wakuNode2.Start(ctx)
    require.NoError(t, err)
    defer wakuNode2.Stop()

    err = wakuNode2.DialPeer(ctx, wakuNode1.ListenAddresses()[0].String())
    require.NoError(t, err)

    //time.Sleep(1 * time.Second)

    err = wakuNode1.PeerExchange().Request(ctx, 1)
    require.NoError(t, err)

    err = wakuNode1.PeerExchange().Request(ctx, 1)
    require.Error(t, err)

    time.Sleep(1 * time.Second)
    err = wakuNode1.PeerExchange().Request(ctx, 1)
    require.NoError(t, err)
}