waku-org/go-waku

View on GitHub
waku/v2/protocol/store/client_test.go

Summary

Maintainability
B
4 hrs
Test Coverage
//go:build include_storev3_tests
// +build include_storev3_tests

package store

import (
    "context"
    "crypto/rand"
    "os"
    "testing"
    "time"

    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/multiformats/go-multiaddr"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/stretchr/testify/require"
    "github.com/waku-org/go-waku/tests"
    "github.com/waku-org/go-waku/waku/v2/peermanager"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/protocol/enr"
    "github.com/waku-org/go-waku/waku/v2/protocol/metadata"
    "github.com/waku-org/go-waku/waku/v2/protocol/pb"
    "github.com/waku-org/go-waku/waku/v2/protocol/relay"
    "github.com/waku-org/go-waku/waku/v2/timesource"
    "github.com/waku-org/go-waku/waku/v2/utils"
    "google.golang.org/protobuf/proto"
)

func TestStoreClient(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    port, err := tests.FindFreePort(t, "", 5)
    require.NoError(t, err)

    host, err := tests.MakeHost(context.Background(), port, rand.Reader)
    require.NoError(t, err)

    db, err := enode.OpenDB("")
    require.NoError(t, err)
    priv, err := crypto.GenerateKey()
    require.NoError(t, err)
    localnode := enode.NewLocalNode(db, priv)

    pubsubTopic := "/waku/2/rs/99/1"
    clusterID := uint16(99)
    rs, _ := protocol.NewRelayShards(clusterID, 1)
    enr.Update(utils.Logger(), localnode, enr.WithWakuRelaySharding(rs))

    metadata := metadata.NewWakuMetadata(clusterID, localnode, utils.Logger())
    metadata.SetHost(host)
    metadata.Start(context.Background())

    // Creating a relay instance for pushing messages to the store node
    b := relay.NewBroadcaster(10)
    require.NoError(t, b.Start(context.Background()))
    wakuRelay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
    wakuRelay.SetHost(host)
    require.NoError(t, err)
    err = wakuRelay.Start(context.Background())
    require.NoError(t, err)

    pm := peermanager.NewPeerManager(5, 5, nil, wakuRelay, true, utils.Logger())
    pm.SetHost(host)
    err = pm.SubscribeToRelayEvtBus(wakuRelay.Events())
    require.NoError(t, err)
    pm.Start(ctx)

    // Creating a storeV3 instance for all queries
    wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8)
    wakuStore.SetHost(host)

    _, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer())
    require.NoError(t, err)

    // Obtain multiaddr from env
    envStorenode := os.Getenv("TEST_STOREV3_NODE")
    if envStorenode == "" {
        envStorenode = "/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAmMGhfSTUzKbsjMWxc6T1X4wiTWSF1bEWSLjAukCm7KiHV"
    }
    storenode_multiaddr, err := multiaddr.NewMultiaddr(envStorenode)
    require.NoError(t, err)

    storenode, err := peer.AddrInfoFromP2pAddr(storenode_multiaddr)
    require.NoError(t, err)

    err = host.Connect(ctx, *storenode)
    require.NoError(t, err)

    // Wait until mesh forms
    time.Sleep(2 * time.Second)

    // Send messages
    messages := []*pb.WakuMessage{}
    startTime := utils.GetUnixEpoch(timesource.NewDefaultClock())
    for i := 0; i < 5; i++ {
        msg := &pb.WakuMessage{
            Payload:      []byte{byte(i), 1, 2, 3, 4, 5},
            ContentTopic: "test",
            Version:      proto.Uint32(0),
            Timestamp:    utils.GetUnixEpoch(timesource.NewDefaultClock()),
        }
        _, err := wakuRelay.Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic))
        require.NoError(t, err)

        messages = append(messages, msg)
        time.Sleep(20 * time.Millisecond)
    }
    endTime := utils.GetUnixEpoch(timesource.NewDefaultClock())

    time.Sleep(1 * time.Second)

    // Check for message existence
    exists, err := wakuStore.Exists(ctx, messages[0].Hash(pubsubTopic), WithPeer(storenode.ID))
    require.NoError(t, err)
    require.True(t, exists)

    // Message should not exist
    exists, err = wakuStore.Exists(ctx, pb.MessageHash{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2}, WithPeer(storenode.ID))
    require.NoError(t, err)
    require.False(t, exists)

    // Query messages with forward pagination
    response, err := wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2))
    require.NoError(t, err)

    // -- First page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 2)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())
    require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[1].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Second page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 2)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[2].GetTimestamp())
    require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[3].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Third page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 1)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[4].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Trying to continue a completed cursor
    require.True(t, response.IsComplete())
    require.Len(t, response.Messages(), 0)

    err = response.Next(ctx)
    require.NoError(t, err)

    // Query messages with backward pagination
    response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(false, 2))
    require.NoError(t, err)

    // -- First page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 2)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[3].GetTimestamp())
    require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[4].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Second page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 2)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[1].GetTimestamp())
    require.Equal(t, response.Messages()[1].Message.GetTimestamp(), messages[2].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Third page:
    require.False(t, response.IsComplete())
    require.Len(t, response.Messages(), 1)
    require.Equal(t, response.Messages()[0].Message.GetTimestamp(), messages[0].GetTimestamp())

    err = response.Next(ctx)
    require.NoError(t, err)

    // -- Trying to continue a completed cursor
    err = response.Next(ctx)
    require.NoError(t, err)
    require.True(t, response.IsComplete())

    // No cursor should be returned if there are no messages that match the criteria
    response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "no-messages"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 2))
    require.NoError(t, err)
    require.Len(t, response.Messages(), 0)
    require.Empty(t, response.Cursor())

    // If the page size is larger than the number of existing messages, it should not return a cursor
    response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(true, 100))
    require.NoError(t, err)
    require.Len(t, response.Messages(), 5)
    require.Empty(t, response.Cursor())

    // Invalid cursors should fail
    // TODO: nwaku does not support this feature yet
    //_, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithCursor([]byte{1, 2, 3, 4, 5, 6}))
    //require.Error(t, err)

    // Inexistent cursors should return an empty response
    // TODO: nwaku does not support this feature yet
    //response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithCursor(make([]byte, 32))) // Requesting cursor 0x00...00
    //require.NoError(t, err)
    //require.Len(t, response.messages, 0)
    //require.Empty(t, response.Cursor())

    // Handle temporal history query with an invalid time window
    _, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: endTime, TimeEnd: startTime})
    require.NotNil(t, err)

    // Handle temporal history query with a zero-size time window
    response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test"), TimeStart: startTime, TimeEnd: startTime})
    require.NoError(t, err)
    require.Len(t, response.Messages(), 0)
    require.Empty(t, response.Cursor())

    // Should not include data
    response, err = wakuStore.Request(ctx, MessageHashCriteria{MessageHashes: []pb.MessageHash{messages[0].Hash(pubsubTopic)}}, IncludeData(false), WithPeer(storenode.ID))
    require.NoError(t, err)
    require.Len(t, response.Messages(), 1)
    require.Nil(t, response.Messages()[0].Message)

    response, err = wakuStore.Request(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(pubsubTopic, "test")}, IncludeData(false))
    require.NoError(t, err)
    require.GreaterOrEqual(t, len(response.Messages()), 1)
    require.Nil(t, response.Messages()[0].Message)
}