status-im/status-go

View on GitHub
protocol/messenger_storenode_request_test.go

Summary

Maintainability
C
1 day
Test Coverage
package protocol

import (
    "context"
    "fmt"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/suite"
    "go.uber.org/zap"
    "google.golang.org/protobuf/proto"

    "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"

    "github.com/ethereum/go-ethereum/common/hexutil"
    "github.com/ethereum/go-ethereum/crypto"

    "github.com/status-im/status-go/appdatabase"
    gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
    "github.com/status-im/status-go/eth-node/types"
    "github.com/status-im/status-go/multiaccounts/accounts"
    "github.com/status-im/status-go/params"
    "github.com/status-im/status-go/protocol/common"
    "github.com/status-im/status-go/protocol/common/shard"
    "github.com/status-im/status-go/protocol/communities"
    "github.com/status-im/status-go/protocol/communities/token"
    "github.com/status-im/status-go/protocol/protobuf"
    "github.com/status-im/status-go/protocol/requests"
    "github.com/status-im/status-go/protocol/transport"
    "github.com/status-im/status-go/protocol/tt"
    mailserversDB "github.com/status-im/status-go/services/mailservers"
    "github.com/status-im/status-go/services/wallet/bigint"
    "github.com/status-im/status-go/t/helpers"
    waku2 "github.com/status-im/status-go/wakuv2"
    wakuV2common "github.com/status-im/status-go/wakuv2/common"
)

const (
    localFleet              = "local-test-fleet-1"
    localMailserverID       = "local-test-mailserver"
    storeNodeConnectTimeout = 500 * time.Millisecond
    runLocalTests           = false
)

func TestMessengerStoreNodeRequestSuite(t *testing.T) {
    suite.Run(t, new(MessengerStoreNodeRequestSuite))
}

type MessengerStoreNodeRequestSuite struct {
    suite.Suite

    cancel chan struct{}

    owner *Messenger
    bob   *Messenger

    wakuStoreNode    *waku2.Waku
    storeNodeAddress string

    ownerWaku types.Waku
    bobWaku   types.Waku

    collectiblesServiceMock *CollectiblesServiceMock

    logger *zap.Logger
}

type singleResult struct {
    EnvelopesCount   int
    Envelopes        []*wakuV2common.ReceivedMessage
    ShardEnvelopes   []*wakuV2common.ReceivedMessage
    Error            error
    FetchedCommunity *communities.Community
}

func (r *singleResult) ShardEnvelopesHashes() []string {
    out := make([]string, 0, len(r.ShardEnvelopes))
    for _, e := range r.ShardEnvelopes {
        out = append(out, e.Hash().String())
    }
    return out
}

func (r *singleResult) EnvelopesHashes() []string {
    out := make([]string, 0, len(r.Envelopes))
    for _, e := range r.Envelopes {
        out = append(out, e.Hash().String())
    }
    return out
}

func (r *singleResult) toString() string {
    resultString := ""
    communityString := ""

    if r.FetchedCommunity != nil {
        communityString = fmt.Sprintf("clock: %d, name: '%s', members: %d",
            r.FetchedCommunity.Clock(),
            r.FetchedCommunity.Name(),
            len(r.FetchedCommunity.Members()),
        )
    }

    if r.Error != nil {
        resultString = fmt.Sprintf("error: %s", r.Error.Error())
    } else {
        resultString = fmt.Sprintf("envelopes fetched: %d, community - %s",
            r.EnvelopesCount, communityString)
    }

    for i, envelope := range r.ShardEnvelopes {
        resultString += fmt.Sprintf("\n\tshard envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
            i+1,
            envelope.Hash().Hex(),
            envelope.Envelope.Message().GetTimestamp(),
            time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
            len(envelope.Envelope.Message().Payload),
            envelope.Envelope.Message().ContentTopic,
            envelope.Envelope.PubsubTopic(),
        )
    }

    for i, envelope := range r.Envelopes {
        resultString += fmt.Sprintf("\n\tdescription envelope %3.0d: %s, timestamp: %d (%s), size: %d bytes, contentTopic: %s, pubsubTopic: %s",
            i+1,
            envelope.Hash().Hex(),
            envelope.Envelope.Message().GetTimestamp(),
            time.Unix(0, envelope.Envelope.Message().GetTimestamp()).UTC(),
            len(envelope.Envelope.Message().Payload),
            envelope.Envelope.Message().ContentTopic,
            envelope.Envelope.PubsubTopic(),
        )
    }

    return resultString
}

func (s *MessengerStoreNodeRequestSuite) SetupTest() {
    s.logger = tt.MustCreateTestLogger()

    s.cancel = make(chan struct{}, 10)

    s.collectiblesServiceMock = &CollectiblesServiceMock{}

    s.createStore()
}

func (s *MessengerStoreNodeRequestSuite) TearDown() {
    close(s.cancel)
    s.Require().NoError(s.wakuStoreNode.Stop())
    TearDownMessenger(&s.Suite, s.owner)
    TearDownMessenger(&s.Suite, s.bob)
}

func (s *MessengerStoreNodeRequestSuite) createStore() {
    cfg := testWakuV2Config{
        logger:                 s.logger.Named("store-waku"),
        enableStore:            true,
        useShardAsDefaultTopic: false,
        clusterID:              shard.UndefinedShardValue,
    }

    s.wakuStoreNode = NewTestWakuV2(&s.Suite, cfg)
    s.storeNodeAddress = s.wakuListenAddress(s.wakuStoreNode)
    s.logger.Info("store node ready", zap.String("address", s.storeNodeAddress))
}

func (s *MessengerStoreNodeRequestSuite) tearDownOwner() {
    _ = gethbridge.GetGethWakuV2From(s.ownerWaku).Stop()
    TearDownMessenger(&s.Suite, s.owner)
}

func (s *MessengerStoreNodeRequestSuite) createOwner() {

    cfg := testWakuV2Config{
        logger:                 s.logger.Named("owner-waku"),
        enableStore:            false,
        useShardAsDefaultTopic: false,
        clusterID:              shard.UndefinedShardValue,
    }

    wakuV2 := NewTestWakuV2(&s.Suite, cfg)
    s.ownerWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)

    messengerLogger := s.logger.Named("owner-messenger")
    s.owner = s.newMessenger(s.ownerWaku, messengerLogger, s.storeNodeAddress)

    // We force the owner to use the store node as relay peer
    WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() []string {
        err := s.owner.DialPeer(s.storeNodeAddress)
        s.Require().NoError(err)
        return []string{s.wakuStoreNode.PeerID().String()}
    })
}

func (s *MessengerStoreNodeRequestSuite) createBob() {
    cfg := testWakuV2Config{
        logger:                 s.logger.Named("bob-waku"),
        enableStore:            false,
        useShardAsDefaultTopic: false,
        clusterID:              shard.UndefinedShardValue,
    }
    wakuV2 := NewTestWakuV2(&s.Suite, cfg)
    s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)

    messengerLogger := s.logger.Named("bob-messenger")
    s.bob = s.newMessenger(s.bobWaku, messengerLogger, s.storeNodeAddress)
}

func (s *MessengerStoreNodeRequestSuite) tearDownBob() {
    _ = gethbridge.GetGethWakuV2From(s.bobWaku).Stop()
    TearDownMessenger(&s.Suite, s.bob)
}

func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *zap.Logger, mailserverAddress string) *Messenger {
    privateKey, err := crypto.GenerateKey()
    s.Require().NoError(err)

    options := []Option{
        WithAutoRequestHistoricMessages(false),
        WithCuratedCommunitiesUpdateLoop(false),
    }

    if mailserverAddress != "" {
        options = append(options,
            WithTestStoreNode(&s.Suite, localMailserverID, mailserverAddress, localFleet, s.collectiblesServiceMock),
        )
    }

    messenger, err := newMessengerWithKey(shh, privateKey, logger, options)

    s.Require().NoError(err)
    return messenger
}

func (s *MessengerStoreNodeRequestSuite) createCommunity(m *Messenger) *communities.Community {
    s.waitForAvailableStoreNode(m)

    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(nil)

    createCommunityRequest := &requests.CreateCommunity{
        Name:        RandomLettersString(10),
        Description: RandomLettersString(20),
        Color:       RandomColor(),
        Tags:        RandomCommunityTags(3),
        Membership:  protobuf.CommunityPermissions_AUTO_ACCEPT,
    }

    response, err := m.CreateCommunity(createCommunityRequest, false)
    s.Require().NoError(err)
    s.Require().NotNil(response)
    s.Require().Len(response.Communities(), 1)

    s.waitForEnvelopes(storeNodeSubscription, 1)

    return response.Communities()[0]
}

func (s *MessengerStoreNodeRequestSuite) requireCommunitiesEqual(c *communities.Community, expected *communities.Community) {
    if expected == nil {
        s.Require().Nil(c)
        return
    }
    s.Require().NotNil(c)
    s.Require().Equal(expected.IDString(), c.IDString())
    s.Require().Equal(expected.Clock(), c.Clock())
    s.Require().Equal(expected.Name(), c.Name())
    s.Require().Equal(expected.Identity().Description, c.Identity().Description)
    s.Require().Equal(expected.Color(), c.Color())
    s.Require().Equal(expected.Tags(), c.Tags())
    s.Require().Equal(expected.Shard(), c.Shard())
    s.Require().Equal(len(expected.TokenPermissions()), len(c.TokenPermissions()))
    for k, v := range expected.TokenPermissions() {
        s.Require().True(proto.Equal(v, c.TokenPermissions()[k]))
    }
    s.Require().Equal(len(expected.CommunityTokensMetadata()), len(c.CommunityTokensMetadata()))
    for i, v := range expected.CommunityTokensMetadata() {
        s.Require().True(proto.Equal(v, c.CommunityTokensMetadata()[i]))
    }
}

func (s *MessengerStoreNodeRequestSuite) requireContactsEqual(c *Contact, expected *Contact) {
    s.Require().Equal(expected.DisplayName, c.DisplayName)
    s.Require().Equal(expected.Bio, c.Bio)
}

func (s *MessengerStoreNodeRequestSuite) fetchCommunity(m *Messenger, communityShard communities.CommunityShard, expectedCommunity *communities.Community) StoreNodeRequestStats {
    options := []StoreNodeRequestOption{
        WithWaitForResponseOption(true),
    }

    fetchedCommunity, stats, err := m.storeNodeRequestsManager.FetchCommunity(communityShard, options)

    s.Require().NoError(err)
    s.requireCommunitiesEqual(fetchedCommunity, expectedCommunity)

    return stats
}

func (s *MessengerStoreNodeRequestSuite) fetchProfile(m *Messenger, contactID string, expectedContact *Contact) {
    fetchedContact, err := m.FetchContact(contactID, true)
    s.Require().NoError(err)
    s.Require().NotNil(fetchedContact)
    s.Require().Equal(contactID, fetchedContact.ID)

    if expectedContact != nil {
        s.requireContactsEqual(fetchedContact, expectedContact)
    }
}

func (s *MessengerStoreNodeRequestSuite) waitForAvailableStoreNode(messenger *Messenger) {
    WaitForAvailableStoreNode(&s.Suite, messenger, storeNodeConnectTimeout)
}

func (s *MessengerStoreNodeRequestSuite) setupEnvelopesWatcher(wakuNode *waku2.Waku, topic *wakuV2common.TopicType, cb func(envelope *wakuV2common.ReceivedMessage)) {
    envelopesWatcher := make(chan wakuV2common.EnvelopeEvent, 100)
    envelopesSub := wakuNode.SubscribeEnvelopeEvents(envelopesWatcher)

    go func() {
        defer envelopesSub.Unsubscribe()
        for {
            select {
            case <-s.cancel:
                return

            case envelopeEvent := <-envelopesWatcher:
                if envelopeEvent.Event != wakuV2common.EventEnvelopeAvailable {
                    continue
                }
                if topic != nil && *topic != envelopeEvent.Topic {
                    continue
                }
                envelope := wakuNode.GetEnvelope(envelopeEvent.Hash)
                cb(envelope)
                s.logger.Debug("envelope available event for fetched content topic",
                    zap.Any("envelopeEvent", envelopeEvent),
                    zap.Any("envelope", envelope),
                )
            }

        }
    }()
}

func (s *MessengerStoreNodeRequestSuite) setupStoreNodeEnvelopesWatcher(topic *wakuV2common.TopicType) <-chan string {
    storeNodeSubscription := make(chan string, 100)
    s.setupEnvelopesWatcher(s.wakuStoreNode, topic, func(envelope *wakuV2common.ReceivedMessage) {
        storeNodeSubscription <- envelope.Hash().String()
    })
    return storeNodeSubscription
}

func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan string, expectedEnvelopesCount int) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    for i := 0; i < expectedEnvelopesCount; i++ {
        select {
        case <-subscription:
        case <-ctx.Done():
            err := fmt.Sprintf("timeout waiting for store node to receive envelopes, received: %d, expected: %d", i, expectedEnvelopesCount)
            s.Require().Fail(err)
        }
    }
}

func (s *MessengerStoreNodeRequestSuite) wakuListenAddress(waku *waku2.Waku) string {
    addresses := waku.ListenAddresses()
    s.Require().LessOrEqual(1, len(addresses))
    return addresses[0]
}

func (s *MessengerStoreNodeRequestSuite) ensureStoreNodeEnvelopes(contentTopic *wakuV2common.TopicType, minimumCount int) {
    // Give some time for store node to put envelope into database. Otherwise, the test is flaky.
    // Although we subscribed to EnvelopeEvents and waited, the actual saving to database happens asynchronously.
    // It would be nice to implement a subscription for database storing event, but it isn't worth it right now.
    time.Sleep(100 * time.Millisecond)

    // Directly ensure profile is available on store node
    queryOptions := []legacy_store.HistoryRequestOption{
        legacy_store.WithLocalQuery(),
    }
    query := legacy_store.Query{
        PubsubTopic:   "",
        ContentTopics: []string{contentTopic.ContentTopic()},
    }
    result, err := s.wakuStoreNode.StoreNode().Query(context.Background(), query, queryOptions...)
    s.Require().NoError(err)
    s.Require().GreaterOrEqual(len(result.Messages), minimumCount)
    s.logger.Debug("store node query result", zap.Int("messagesCount", len(result.Messages)))
}

func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    s.fetchCommunity(s.bob, community.CommunityShard(), community)
}

func (s *MessengerStoreNodeRequestSuite) TestConsecutiveRequests() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    // Test consecutive requests to check that requests in manager are finalized
    // At second request we expect to fetch nothing, because the community is already in the database
    s.fetchCommunity(s.bob, community.CommunityShard(), community)
    s.fetchCommunity(s.bob, community.CommunityShard(), nil)
}

func (s *MessengerStoreNodeRequestSuite) TestSimultaneousCommunityInfoRequests() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    storeNodeRequestsCount := 0
    s.bob.storeNodeRequestsManager.onPerformingBatch = func(batch MailserverBatch) {
        storeNodeRequestsCount++
    }

    s.waitForAvailableStoreNode(s.bob)

    wg := sync.WaitGroup{}

    // Make 2 simultaneous fetch requests
    // 1 fetch request = 2 requests to store node (fetch shard and fetch community)
    // only 2 request to store node is expected
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.fetchCommunity(s.bob, community.CommunityShard(), community)
        }()
    }

    wg.Wait()
    s.Require().Equal(2, storeNodeRequestsCount)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestNonExistentCommunity() {
    // On test start store node database is empty, so just request any valid community ID.
    request := FetchCommunityRequest{
        CommunityKey:    "0x036dc11a0663f88e15912f0adb68c3c5f68ca0ca7a233f1a88ff923a3d39b2cf07",
        Shard:           nil,
        TryDatabase:     false,
        WaitForResponse: true,
    }

    s.createBob()

    s.waitForAvailableStoreNode(s.bob)
    fetchedCommunity, err := s.bob.FetchCommunity(&request)

    s.Require().NoError(err)
    s.Require().Nil(fetchedCommunity)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfoWithStoreNodeDisconnected() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    // WaitForAvailableStoreNode is done internally
    s.fetchCommunity(s.bob, community.CommunityShard(), community)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityPagingAlgorithm() {
    const spamAmount = defaultStoreNodeRequestPageSize + initialStoreNodeRequestPageSize

    s.createOwner()
    s.createBob()

    // Create a community
    community := s.createCommunity(s.owner)
    contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)

    // Push spam to the same ContentTopic & PubsubTopic
    // The first requested page size is 1. All subsequent pages are limited to 20.
    // We want to test the algorithm, so we push 21 spam envelopes.
    for i := 0; i < spamAmount; i++ {
        spamMessage := common.RawMessage{
            Payload:             RandomBytes(16),
            Sender:              community.PrivateKey(),
            SkipEncryptionLayer: true,
            MessageType:         protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
            PubsubTopic:         community.PubsubTopic(),
        }
        _, err := s.owner.sender.SendPublic(context.Background(), community.IDString(), spamMessage)
        s.Require().NoError(err)
    }

    // Wait the store node to receive envelopes
    s.waitForEnvelopes(storeNodeSubscription, spamAmount)

    // Fetch the community
    stats := s.fetchCommunity(s.bob, community.CommunityShard(), community)

    // Expect 3 pages and 23 (24 spam + 1 community description + 1 general channel description) envelopes to be fetched.
    // First we fetch a more up-to-date, but an invalid spam message, fail to decrypt it as community description,
    // then we fetch another page of data and successfully decrypt a community description.
    s.Require().Equal(spamAmount+1, stats.FetchedEnvelopesCount)
    s.Require().Equal(3, stats.FetchedPagesCount)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityWithSameContentTopic() {
    s.createOwner()
    s.createBob()

    // Create 2 communities
    community1 := s.createCommunity(s.owner)
    community2 := s.createCommunity(s.owner)

    description2, err := community2.MarshaledDescription()
    s.Require().NoError(err)

    // Push community2 description to the same ContentTopic & PubsubTopic as community1.
    // This way we simulate 2 communities with same ContentTopic.
    spamMessage := common.RawMessage{
        Payload:             description2,
        Sender:              community2.PrivateKey(),
        SkipEncryptionLayer: true,
        MessageType:         protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
        PubsubTopic:         community1.PubsubTopic(),
    }
    _, err = s.owner.sender.SendPublic(context.Background(), community1.IDString(), spamMessage)
    s.Require().NoError(err)

    // Fetch the community
    s.fetchCommunity(s.bob, community1.CommunityShard(), community1)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestMultipleCommunities() {
    s.createOwner()
    s.createBob()

    // Create 2 communities
    community1 := s.createCommunity(s.owner)
    community2 := s.createCommunity(s.owner)

    fetchedCommunities := map[string]*communities.Community{}

    err := WaitOnSignaledCommunityFound(s.bob,
        func() {
            err := s.bob.fetchCommunities([]communities.CommunityShard{
                community1.CommunityShard(),
                community2.CommunityShard(),
            })
            s.Require().NoError(err)
        },
        func(community *communities.Community) bool {
            fetchedCommunities[community.IDString()] = community
            return len(fetchedCommunities) == 2
        },
        1*time.Second,
        "communities were not signalled in time",
    )

    s.Require().NoError(err)
    s.Require().Contains(fetchedCommunities, community1.IDString())
    s.Require().Contains(fetchedCommunities, community2.IDString())
}

func (s *MessengerStoreNodeRequestSuite) TestRequestWithoutWaitingResponse() {
    s.createOwner()
    s.createBob()

    // Create a community
    community := s.createCommunity(s.owner)

    request := FetchCommunityRequest{
        CommunityKey:    community.IDString(),
        Shard:           nil,
        TryDatabase:     false,
        WaitForResponse: false,
    }

    fetchedCommunities := map[string]*communities.Community{}

    err := WaitOnSignaledCommunityFound(s.bob,
        func() {
            fetchedCommunity, err := s.bob.FetchCommunity(&request)
            s.Require().NoError(err)
            s.Require().Nil(fetchedCommunity)
        },
        func(community *communities.Community) bool {
            fetchedCommunities[community.IDString()] = community
            return len(fetchedCommunities) == 1
        },
        1*time.Second,
        "communities weren't signalled",
    )

    s.Require().NoError(err)
    s.Require().Len(fetchedCommunities, 1)
    s.Require().Contains(fetchedCommunities, community.IDString())

    s.requireCommunitiesEqual(fetchedCommunities[community.IDString()], community)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() {
    s.createOwner()
    defer s.tearDownOwner()

    // Set keypair (to be able to set displayName)
    ownerProfileKp := accounts.GetProfileKeypairForTest(true, false, false)
    ownerProfileKp.KeyUID = s.owner.account.KeyUID
    ownerProfileKp.Accounts[0].KeyUID = s.owner.account.KeyUID

    err := s.owner.settings.SaveOrUpdateKeypair(ownerProfileKp)
    s.Require().NoError(err)

    contentTopicString := transport.ContactCodeTopic(&s.owner.identity.PublicKey)
    contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(contentTopicString))
    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)

    // Set display name, this will also publish contact code
    err = s.owner.SetDisplayName("super-owner")
    s.Require().NoError(err)

    s.waitForEnvelopes(storeNodeSubscription, 1)
    s.ensureStoreNodeEnvelopes(&contentTopic, 1)

    // Fetch profile
    s.createBob()
    defer s.tearDownBob()
    s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact)
}

// TestSequentialUpdates checks that making updates to the community
// immediately results in new store node fetched information.
// Before adding storeNodeSubscription we had problems with the test setup that we didn't have a mechanism to wait for store node to
// receive and process new messages.
func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)
    s.fetchCommunity(s.bob, community.CommunityShard(), community)

    contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
    communityName := community.Name()

    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)

    for i := 0; i < 3; i++ {
        // Change community name, this will automatically publish a new community description
        ownerEditRequest := &requests.EditCommunity{
            CommunityID: community.ID(),
            CreateCommunity: requests.CreateCommunity{
                Name:        fmt.Sprintf("%s-%d", communityName, i),
                Description: community.DescriptionText(),
                Color:       community.Color(),
                Membership:  community.Permissions().Access,
            },
        }
        _, err := s.owner.EditCommunity(ownerEditRequest)
        s.Require().NoError(err)

        s.waitForEnvelopes(storeNodeSubscription, 1)

        // Get updated community from the database
        community, err = s.owner.communitiesManager.GetByID(community.ID())
        s.Require().NoError(err)
        s.Require().NotNil(community)

        s.fetchCommunity(s.bob, community.CommunityShard(), community)
    }
}

func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    expectedShard := &shard.Shard{
        Cluster: shard.MainStatusShardCluster,
        Index:   23,
    }

    shardRequest := &requests.SetCommunityShard{
        CommunityID: community.ID(),
        Shard:       expectedShard,
    }

    shardTopic := transport.CommunityShardInfoTopic(community.IDString())
    contentContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(shardTopic))
    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentContentTopic)

    _, err := s.owner.SetCommunityShard(shardRequest)
    s.Require().NoError(err)

    s.waitForEnvelopes(storeNodeSubscription, 1)

    s.waitForAvailableStoreNode(s.bob)

    communityShard := community.CommunityShard()

    community, err = s.owner.communitiesManager.GetByID(community.ID())
    s.Require().NoError(err)
    s.Require().NotNil(community)
    s.Require().NotNil(community.Shard())

    s.fetchCommunity(s.bob, communityShard, community)
}

func (s *MessengerStoreNodeRequestSuite) TestFiltersNotRemoved() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    // The owner is a member of the community, so he has a filter for community description content topic.
    // We want to check that filter is not removed by `FetchCommunity` call.
    filterBefore := s.owner.transport.FilterByChatID(community.IDString())
    s.Require().NotNil(filterBefore)

    s.fetchCommunity(s.owner, community.CommunityShard(), nil)

    filterAfter := s.owner.transport.FilterByChatID(community.IDString())
    s.Require().NotNil(filterAfter)

    s.Require().Equal(filterBefore.FilterID, filterAfter.FilterID)
}

func (s *MessengerStoreNodeRequestSuite) TestFiltersRemoved() {
    s.createOwner()
    s.createBob()

    community := s.createCommunity(s.owner)

    // The bob is a member of the community, so he has no filters for community description content topic.
    // We want to check that filter created by `FetchCommunity` is removed on request finish.
    filterBefore := s.bob.transport.FilterByChatID(community.IDString())
    s.Require().Nil(filterBefore)

    s.fetchCommunity(s.bob, community.CommunityShard(), community)

    filterAfter := s.bob.transport.FilterByChatID(community.IDString())
    s.Require().Nil(filterAfter)
}

func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityEnvelopesOrder() {
    s.createOwner()
    s.createBob()

    const descriptionsCount = 4
    community := s.createCommunity(s.owner)
    contentTopic := wakuV2common.BytesToTopic(transport.ToTopic(community.IDString()))
    storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)

    // Push a few descriptions to the store node
    for i := 0; i < descriptionsCount-1; i++ {
        err := s.owner.publishOrg(community, false)
        s.Require().NoError(err)
    }

    // Wait for store node to receive envelopes
    s.waitForEnvelopes(storeNodeSubscription, descriptionsCount-1)

    // Subscribe to received envelope
    bobWakuV2 := gethbridge.GetGethWakuV2From(s.bobWaku)

    var receivedEnvelopes []*wakuV2common.ReceivedMessage
    s.setupEnvelopesWatcher(bobWakuV2, &contentTopic, func(envelope *wakuV2common.ReceivedMessage) {
        receivedEnvelopes = append(receivedEnvelopes, envelope)
    })

    // Force a single-envelope page size to be able to check the order.
    // Also force all envelopes to be fetched.
    options := []StoreNodeRequestOption{
        WithWaitForResponseOption(true),
        WithStopWhenDataFound(false),
        WithInitialPageSize(1),
        WithFurtherPageSize(1),
    }

    // Fetch the community
    fetchedCommunity, _, err := s.bob.storeNodeRequestsManager.FetchCommunity(community.CommunityShard(), options)
    s.Require().NoError(err)
    s.requireCommunitiesEqual(fetchedCommunity, community)

    // Ensure all expected envelopes were received
    s.Require().Equal(descriptionsCount, len(receivedEnvelopes))

    // We check that each next envelope fetched is newer than the previous one
    for i := 1; i < len(receivedEnvelopes); i++ {
        s.Require().Less(
            receivedEnvelopes[i].Envelope.Message().GetTimestamp(),
            receivedEnvelopes[i-1].Envelope.Message().GetTimestamp())
    }
}

/*
    TestFetchRealCommunity is not actually a test, but an utility to check the community description in all of the store nodes.
    It's intended to only run locally and shouldn't be executed in CI, because it relies on connection to the real network.

    TODO: It would be nice to move this code to a real utility in /cmd.
          It should allow us to fairly verify the community owner and do other good things.

    To run this test, first set `runLocalTests` to true.
    Then carefully set all of communityID, communityShard, fleet and other const variables.

    NOTE: I only tested it with the default parameters, but in theory it should work for any configuration.
*/

type testFetchRealCommunityExampleTokenInfo struct {
    ChainID         uint64
    ContractAddress string
}

var testFetchRealCommunityExample = []struct {
    CommunityID            string
    CommunityURL           string       // If set, takes precedence over CommunityID
    CommunityShard         *shard.Shard // WARNING: I didn't test a sharded community
    Fleet                  string
    UseShardAsDefaultTopic bool
    ClusterID              uint16
    UserPrivateKeyString   string // When empty a new user will be created
    // Optional request parameters
    CustomOptions []StoreNodeRequestOption
    // Setup OwnerPublicKey and CommunityTokens if the community has owner token
    // This is needed to mock the owner verification
    OwnerPublicKey  string
    CommunityTokens []testFetchRealCommunityExampleTokenInfo
    // Fill these if you know what envelopes are expected.
    // The test will fail if fetched array doesn't equal to the expected one.
    CheckExpectedEnvelopes       bool
    ExpectedShardEnvelopes       []string
    ExpectedDescriptionEnvelopes []string
}{
    {
        //Example 1, status.prod fleet
        CommunityID:            "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
        CommunityShard:         nil,
        Fleet:                  params.FleetStatusProd,
        UseShardAsDefaultTopic: false,
        ClusterID:              shard.UndefinedShardValue,
    },
    {
        // Example 3, shards.test fleet
        // https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL
        CommunityID:            "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
        Fleet:                  params.FleetShardsTest,
        UseShardAsDefaultTopic: true,
        ClusterID:              shard.MainStatusShardCluster,

        CheckExpectedEnvelopes: true,
        ExpectedShardEnvelopes: []string{
            "0x8173eecd7ff9ebcaae3dde0e704daf9bdeb6d33b0d8505a67e7dc56d0d8fc07c",
            "0x596bbafbe0e0b625d165378cd4c7641a4d23aa1145c705aad666ddeaf60c88cd",
            "0x8a1ee798f3657da5a463e5f878ab2455d05b8f552359b58330ccd7fa4f5624b0",
            "0x97bcde2103a01984bb45a8590a6cb6972411445a1b2d40e181d5f2b5366fa5f1",
            "0x26e3c0c880d1a2c4e81bf4fffbdb8b7e1ecc91fce7c6a05ee87d200d62ffc11e",
            "0x1a8820bd61ebcc9de75c25f31c9b05eb6e880a5a4902679bb6ce2f43f61bf159",
            "0xce450cfb5f79d761f34dea5b2ccec63751886e43ae63477e12f517c31f800aeb",
            "0x9607bd1cf08355c44bcce055da197ba177201882736fa8874910194ccdaa8760",
            "0x0c4b989ca69f529e571e6ea8b3230a85e057d8b2ae6147d1fedc2a01f2816ed6",
            "0xe40ea64c9007a064b6324b614976510f2a433c9f84d87139df8f66b536e37ee4",
            "0x7a028466a095e40650bb0ef16e903309b0c38c5a7cb7e2e9debd0acf2151448d",
            "0x96419c6be375b2b348778d4694e3a491de84eecde601d5d405a0e72e9cece4a1",
            "0xbcaeb5e86128638fab7203428daddd741df44ceeabe7d9d25936a10cd0a8b808",
            "0x2e0b5872cb5a7c9a3273048eb2dfcb1d6a28faad3fe307a7db6c2dbaca9ce462",
            "0xfa96bbe4125514ce73c52ef3ccb1c4ad9c4ad4afe8803de8ab9309fe9483b1d0",
            "0xdeddfc82f70cce77c26959d91851fbc33afe648428c3e6ea349b2a2456b92111",
            "0x5b12f17d7b712071f57bb48b7dcd0d6568ff5e7c3f8b3811013aea8dde9c6243",
            "0x18928fd044482c75518162104d487e6fe504f086eb8c5e9f21aa4bce2811d0fa",
            "0x543c156ced76138d69229a39425a0a1cabd617770e023333c10501b979f52d61",
            "0xf46ea6bf5ab6a70662bcf227cc5d2c8c7a70ce42a88e5bb7ebe9e598668a8ae2",
            "0xedb9628dc1ce5b0ec899c3813dd4159a2e06fb3dc88ffaae047e927c804ad0b3",
            "0x16248eccc3544af3fc4a73467d0925ca2f3741eb623516ee369f710e4aa8a3ef",
            "0x6a85f784a9004b56bbb47d87f5541173f05bd61ff5b26e41c714adbb5516e9ff",
            "0x91e320be2cb5c6178027390cbce165fe088a1a35e1442382064ddbc9aabea8a2",
            "0x676496dd36ae40e184863725bfb7425e46d916f73f4b0dd5d10324f4e9325da6",
        },
        ExpectedDescriptionEnvelopes: []string{
            "0xe2c38667ee160861b3dc5a00e4422f47de1303c8b61f0a33c4853ce0b71d0ae4",
            "0x7d8392baa9dd134e43287e58d69b8c9f50aa5c144adda6a3c7d32f00c5dea309",
            "0xf74918d445709ccf9c29e776d27e9b7dc31f25a28473e8fcd89ed9de8a2e6df4",
            "0xafd7e9b6245d88ea2b4fe70265aa3c5ce5618827c1092f8e3058315ac27c5b98",
            "0x4d16bd4fcfc2d8736dc29d8c7287671f7e1df62af74943dc40a39ae18f388a07",
            "0xd101f14bcf6bb9a5e72b934b3e74ebe3f77774037cb5b193803b264d69bfb9bf",
            "0xaa857389e8886401678690bb4dbc664486bd7039427ca53e826197b303696cff",
            "0x4443448e575824330a96d5114a9a7d3fe0ee7168ab6c7a646057ca4502fb91d2",
            "0xfbb79b8d0ee1a61109c543cbe580412fe6d23de33d163006f74fef4addbdad37",
            "0x51bc55c732e0e9db40fd8865a8ae20cbd99bfd4f95c62cf8b591e793d9b642d2",
            "0xc28ca742c0e159a60941fb68ee5832016b510afe54e8ee8bb2200495ac29f1e3",
            "0xf44f1714743a55f0170ba3627390d84cb9307326028abf7236222c93104db833",
            "0x69d067da262b124d2eb6a8ba9f08e0a1ad66afb8d7ff3641a256992fff53a6de",
            "0xa86a08457854fbea347cdd92fd390a330a9971967c551bebc53b18ccfea876fe",
            "0x592850f8bc3c6079826f168971746bd2a1b50a5011fe3b233ead6c72c92a3373",
            "0x1b3151d7b9b37350e86c937dc2e7964d472815bbf9275714bcb16a0c4327fe3d",
            "0x1f2beca64e52f996127647cf3f3abd2e4fe501646fc39e98713ae064333388b3",
            "0xeafc8f9d3114426c08748e6171710874074fb1eb732d9364830ce9b58955c83c",
            "0x6e014a5ec75465efaf036353ec5811b8f710f608a04002925eba3a0a37a30423",
            "0xf7e12a5829cf90e4272132e7b62c5bf0dd09100c8d498c66c9740a798db559b3",
            "0x62da7e828862c3c8692cbd077c5a62266d811764184378e55f9f1066510b4652",
            "0x74201394d05b914bc7e6ce4d2fcf0b119491c80644f48ac9fd37b842e4a0275a",
            "0x9d4c0b1be53810c45c2fa744baa8d16ef4ae3a319b09043f4b4e053127461bf0",
            "0x1937979514ea1dba8ab3b621fc3d0a3f6246b4bf1f9b4073888b8dbd9b4a765a",
            "0xf767f3f36fdecb5ef6650542232334df836bfca1e7f72f1215df50d3f9f9c9bc",
            "0x3a06002325502bc39a77962241fe274d4e88f61762194d321f9cc95272ed4a74",
            "0x13caae58261c181d4974d7a68e0b7c8580c3cc569840179d53ae76407548d8b8",
            "0x36f3b4afbcc4177a7aef26ad567839ffce51896d4c40d0a08d222cebd1255e3b",
            "0x159e685bbab26a5d54ab817c93c9c610055bcf2af75290abcc9a84f1b85a2de9",
            "0x1fd5ff73d7ea9a19f282bd0716f04a5e86b7c515839f0c721f66b3fe99161054",
            "0x95b1e9ada4913ca809c9c28fc225a21753f18a90253660750900c78f79ad2a00",
            "0x4334826934a7cbfb7446ec9d581fa6433c5d1f7f51b97f24717f55cffa320c65",
            "0x0c01d07108c448797ffd14a2152fd38d1764c8a9c5e2f3da12f70551588add7a",
            "0xe7071c6587fc277c4f4c0d7e4575de1a0843d3cf6c2a4aac79be79edc1608038",
            "0x5da4e482f3e6eacf080db685e00c199c8cbbad9a8f43b1d94944426444a7a84a",
            "0x638f551acdd7ccffd6a40ad12bbba1da8fd8a58157fdf9625b12d4a95b4eef71",
            "0xa1a52c28e0481f6004d98bbce906676fff67f04246454bd33fba02c640355af0",
            "0xe0300eb9e0f215ace491b1104665b48b9f6bff039af40e0cfc52a3ce766e747f",
            "0xd092c04d51ee963d59953324d84188a0c1636e8600cb0f5f6f3f4f826d70c8f3",
            "0x8d94bbfee687d534361fc3069079cf4e4f7db2a179d24e6419f67e38b5f0bd34",
            "0x1fd7a4d2c04fca3875126b7a951f619b4da0000ca47496df0c2fb1048a145108",
            "0xbbefbd116cbb23de193318b328412addc500af965d31ba481d70fa1d9e99461d",
            "0xaa4e0e8bd820438e22b93371bda24a29922d33c15fb312b343d2e81a22cbdd95",
            "0x76aef29ea4dde107c22c520efb2a4516b69ae83bc237281d9990f68397d801f5",
            "0x804789119513a065d892cba5d240cb4d89d7329aeee93fcd8e85379a4d362fc9",
            "0x9029b4a13903a3369e3466f1bfabae3f26b6721628db138eaba25c1f55f6fc1e",
            "0xee38c209cb95035a289034c737e5775877145efea31b2a01f7c9241ff02f3e92",
            "0x3e76da87895ca821db3b7ed7dc6949557c620d9cbcaf97af39ed4955d37b734e",
            "0xf5e77eb8f9a5c52e09a56dd5e461bfee6cf9a73e1253f1d41bcde81fe3646997",
            "0x083e06375c366283e541b249ea8646c3f31feb970078e95861ea399f0a57d09a",
            "0xcd7db07ba557ec1ba0104909fdb958661c60c82213a75e8d15e7b262ef4f58b7",
            "0x57b49dc83e1d3ac7b56bb7d758a9bf448339593311103bce4f0a53028587d577",
            "0xb08cd92a5ec6f44a6129a60107132ca17d5fef29fb2bc5ebba14028d57a8038e",
            "0x76959f98c8c734307a985294c26f008f3f705912aab02a5b3a0602a8598c02e2",
            "0xd8ad7df58ffeec20b16a140bdd91484a34fca3ad7fc602043530ca63c307809a",
            "0x6872ef39653bb208ef51f5b11c4bda3eb494a13f5da14e33687d30aef99ef383",
            "0x0fabebc2e0c02f4add94886216b01ecdbd34929ad303d1a20a4505bf729038f6",
            "0xc18270cd532bb3d34f62704feb722c40be48aedb5ad38d4d09fd67f5843b686d",
            "0xbc16217ece82998783d4209ed3bc3f2f33d92630e43933b0129eb8b792500a3f",
            "0xbda651e3b9c82f4bcf5b16252407fc888952820c842c49c06b4f01c8127e359a",
            "0xb4b1799950c6aca3b011ffb775d0f973437d7d46e40cf7b379ff736d08f24eb2",
            "0x38f12fb09c71dd720cacbb2102ac78ad6fbf830558adc7af9fb773f39e728bdc",
            "0x489eb6fa2f5ee5b2a071c7083bf36a0a6cb4ec96049707d25843d9a97b4ac7be",
            "0x64ea5655c8caf89a53c94edd5a47ba750d9fbcf099ec0dcd4026656b044486f1",
            "0x501aee1c5da6aaeaae14abffefbc377b59ebe3fcaa9981bc83bfeffb25344749",
            "0x9a3d360ea866102a6268ffd2001617c442b74b221d131fb3c08ae29bfac18203",
        },
    },
    {
        //Example 1, shards.test fleet
        CommunityID:            "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9",
        Fleet:                  params.FleetShardsTest,
        UseShardAsDefaultTopic: true,
        ClusterID:              shard.MainStatusShardCluster,
        CheckExpectedEnvelopes: true,
        ExpectedShardEnvelopes: []string{
            "0xc3e68e838d09e0117b3f3fd27aabe5f5a509d13e9045263c78e6890953d43547",
            "0x5ee13d052bedb855ce2b9ba6f43c78233fbd4e6539a3bdf156497053c6ddf76d",
            "0xfb6638b7e050f9323a0fe7b84986b5c6f8827965e67e3b3bd0fea21cf24e43de",
        },
        ExpectedDescriptionEnvelopes: []string{
            "0x5b4fa95d430c939c1cbbb26175eabfb4ee058d508c6b4c0e26624958ba02c3ce",
            "0xbf44409ee40dea7816186b37a45dfebabcee59f76855ad5af663ccdf598861ab",
            "0x98d98453f6017517d0114989da0938aad59a3ad9a10839c181f453283f64f5c9",
        },
    },
    {
        CommunityURL: "https://status.app/c/G4IAAMQn9ucHF-V3W5Ouuy0xf0BtTjlwCANJEmwB2CG5p2xKUYzK_l37kzXulUppltT1t6mBcCEJsljRoGrKCP7rWommQomrMA2gBN7RrvCMkFqQwnCNzkNYWrLG85E6GVoM_nolTtfIzl53J1N-tj8fz4_TnO4IIw==#zQ3shZeEJqTC1xhGUjxuS4rtHSrhJ8vUYp64v6qWkLpvdy9L9",
        //CommunityID:            "0x02b5bdaf5a25fcfe2ee14c501fab1836b8de57f61621080c3d52073d16de0d98d6",
        Fleet:                  params.FleetShardsTest,
        UseShardAsDefaultTopic: true,
        OwnerPublicKey:         "0x04953f5f0d355b37c39d1d6460a31ed1114455f8263b3fd1b84406c5f12c9eb7dfb76ba7513b92186010928254984fe98aee069b4c7e20f9ea3da497c3ae769477",
        CommunityTokens: []testFetchRealCommunityExampleTokenInfo{
            {
                ChainID:         10,
                ContractAddress: "0x9eDc11E5932372387E76ff3dcF66DB5465893823",
            },
            {
                ChainID:         10,
                ContractAddress: "0xD91d2E898f996308D643E3b9C78f5FEb5c5F404F",
            },
            {
                ChainID:         10,
                ContractAddress: "0x852E13D2BDFC4C3a761DA7B450f631b555b39C5E",
            },
            {
                ChainID:         10,
                ContractAddress: "0x000CfAd029EE94d120e0c136278582F94Cdc532c",
            },
            {
                ChainID:         10,
                ContractAddress: "0x21F6F5Cb75E81e5104D890D750270eD6538C50cb",
            },
        },
        ClusterID:              shard.MainStatusShardCluster,
        CheckExpectedEnvelopes: false,
        CustomOptions: []StoreNodeRequestOption{
            WithInitialPageSize(1),
            WithStopWhenDataFound(true),
        },
    },
}

func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
    if !runLocalTests {
        return
    }

    exampleToRun := testFetchRealCommunityExample[3]

    // Test configuration
    communityID := exampleToRun.CommunityID
    communityShard := exampleToRun.CommunityShard
    fleet := exampleToRun.Fleet
    useShardAsDefaultTopic := exampleToRun.UseShardAsDefaultTopic
    clusterID := exampleToRun.ClusterID
    userPrivateKeyString := exampleToRun.UserPrivateKeyString
    ownerPublicKey := exampleToRun.OwnerPublicKey
    communityTokens := exampleToRun.CommunityTokens

    if exampleToRun.CommunityURL != "" {
        urlResponse, err := ParseSharedURL(exampleToRun.CommunityURL)
        s.Require().NoError(err)
        s.Require().NotNil(urlResponse.Community)
        communityID = urlResponse.Community.CommunityID
    }

    // Prepare things depending on the configuration
    nodesList := mailserversDB.DefaultMailserversByFleet(fleet)
    descriptionContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(communityID))
    shardContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(transport.CommunityShardInfoTopic(communityID)))

    communityIDBytes, err := types.DecodeHex(communityID)
    s.Require().NoError(err)

    // update mock - the signer for the community returned by the contracts should be owner
    for _, communityToken := range communityTokens {
        s.collectiblesServiceMock.SetSignerPubkeyForCommunity(communityIDBytes, ownerPublicKey)
        s.collectiblesServiceMock.SetMockCollectibleContractData(communityToken.ChainID, communityToken.ContractAddress,
            &communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}})
    }

    results := map[string]singleResult{}
    wg := sync.WaitGroup{}

    // We run a separate request for each node in the fleet.
    for i, mailserver := range nodesList {
        wg.Add(1)
        go func(i int, mailserver mailserversDB.Mailserver) {
            defer wg.Done()

            fmt.Printf("--- starting request [%d] from %s\n", i, mailserver.ID)

            result := singleResult{}

            //
            // Create WakuV2 node
            // NOTE: Another option was to create a bare waku node and fetch envelopes directly with it
            //          and after that push all of the envelopes to a new messenger and check the result.
            //          But this turned out to be harder to implement.
            //

            wakuLogger := s.logger.Named(fmt.Sprintf("user-waku-%d", i))
            messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i))

            cfg := testWakuV2Config{
                logger:                 wakuLogger,
                enableStore:            false,
                useShardAsDefaultTopic: useShardAsDefaultTopic,
                clusterID:              clusterID,
            }
            wakuV2 := NewTestWakuV2(&s.Suite, cfg)
            userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)

            //
            // Create a messenger to process envelopes
            //

            var privateKeyString = userPrivateKeyString

            if privateKeyString == "" {
                privateKey, err := crypto.GenerateKey()
                s.Require().NoError(err)
                privateKeyString = hexutil.Encode(crypto.FromECDSA(privateKey))
            }

            privateKeyBytes, err := hexutil.Decode(privateKeyString)
            s.Require().NoError(err)
            privateKey, err := crypto.ToECDSA(privateKeyBytes)
            s.Require().NoError(err)

            // Mock a local fleet with single store node
            // This is done by settings custom store nodes in the database

            mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
            s.Require().NoError(err)
            mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)

            mailserver.Fleet = localFleet
            err = mailserversDatabase.Add(mailserver)
            s.Require().NoError(err)

            options := []Option{
                WithMailserversDatabase(mailserversDatabase),
                WithClusterConfig(params.ClusterConfig{
                    Fleet:     localFleet,
                    ClusterID: clusterID,
                }),
                WithCommunityTokensService(s.collectiblesServiceMock),
            }

            // Create user without `createBob` func to force desired fleet
            user, err := newMessengerWithKey(userWaku, privateKey, messengerLogger, options)
            s.Require().NoError(err)
            defer TearDownMessenger(&s.Suite, user)

            communityAddress := communities.CommunityShard{
                CommunityID: communityID,
                Shard:       communityShard,
            }

            // Setup envelopes watcher to gather fetched envelopes

            s.setupEnvelopesWatcher(wakuV2, &shardContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
                result.ShardEnvelopes = append(result.ShardEnvelopes, envelope)
            })

            s.setupEnvelopesWatcher(wakuV2, &descriptionContentTopic, func(envelope *wakuV2common.ReceivedMessage) {
                result.Envelopes = append(result.Envelopes, envelope)
            })

            // Start fetching

            storeNodeRequestOptions := []StoreNodeRequestOption{
                WithWaitForResponseOption(true),
                WithStopWhenDataFound(false),                         // In this test we want all envelopes to be fetched
                WithInitialPageSize(defaultStoreNodeRequestPageSize), // Because we're fetching all envelopes anyway
            }
            storeNodeRequestOptions = append(storeNodeRequestOptions, exampleToRun.CustomOptions...)

            fetchedCommunity, stats, err := user.storeNodeRequestsManager.FetchCommunity(communityAddress, storeNodeRequestOptions)

            result.EnvelopesCount = stats.FetchedEnvelopesCount
            result.FetchedCommunity = fetchedCommunity
            result.Error = err

            results[mailserver.ID] = result
        }(i, mailserver)
    }

    // Wait for all requests to finish

    wg.Wait()

    // Print the results
    for storeNodeName, result := range results {
        fmt.Printf("%s --- %s\n", storeNodeName, result.toString())
    }

    // Check that results has no errors and contain correct envelopes
    for storeNodeName, result := range results {
        s.Require().NoError(result.Error)
        if exampleToRun.CheckExpectedEnvelopes {
            s.Require().Equal(exampleToRun.ExpectedShardEnvelopes, result.ShardEnvelopesHashes(),
                fmt.Sprintf("wrong shard envelopes for store node %s", storeNodeName))
            s.Require().Equal(exampleToRun.ExpectedDescriptionEnvelopes, result.EnvelopesHashes(),
                fmt.Sprintf("wrong envelopes for store node %s", storeNodeName))
        }
    }
}

func (s *MessengerStoreNodeRequestSuite) TestFetchingCommunityWithOwnerToken() {
    s.createOwner()
    s.createBob()

    s.waitForAvailableStoreNode(s.owner)
    community := s.createCommunity(s.owner)

    // owner mints owner token
    var chainID uint64 = 1
    tokenAddress := "token-address"
    tokenName := "tokenName"
    tokenSymbol := "TSM"
    _, err := s.owner.SaveCommunityToken(&token.CommunityToken{
        TokenType:       protobuf.CommunityTokenType_ERC721,
        CommunityID:     community.IDString(),
        Address:         tokenAddress,
        ChainID:         int(chainID),
        Name:            tokenName,
        Supply:          &bigint.BigInt{},
        Symbol:          tokenSymbol,
        PrivilegesLevel: token.OwnerLevel,
    }, nil)
    s.Require().NoError(err)

    // owner adds minted owner token to community
    err = s.owner.AddCommunityToken(community.IDString(), int(chainID), tokenAddress)
    s.Require().NoError(err)

    // update mock - the signer for the community returned by the contracts should be owner
    s.collectiblesServiceMock.SetSignerPubkeyForCommunity(community.ID(), common.PubkeyToHex(&s.owner.identity.PublicKey))
    s.collectiblesServiceMock.SetMockCollectibleContractData(chainID, tokenAddress,
        &communities.CollectibleContractData{TotalSupply: &bigint.BigInt{}})

    community, err = s.owner.communitiesManager.GetByID(community.ID())
    s.Require().NoError(err)
    s.Require().Len(community.TokenPermissions(), 1)

    s.waitForAvailableStoreNode(s.bob)

    s.fetchCommunity(s.bob, community.CommunityShard(), community)
}

func (s *MessengerStoreNodeRequestSuite) TestFetchingHistoryWhenOnline() {
    storeAddress := s.storeNodeAddress
    storePeerID := s.wakuStoreNode.PeerID().String()

    // Create messengers
    s.createOwner()
    s.createBob()

    s.logger.Debug("store node info", zap.String("peerID", s.wakuStoreNode.PeerID().String()))
    s.logger.Debug("owner node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.ownerWaku).PeerID().String()))
    s.logger.Debug("bob node info", zap.String("peerID", gethbridge.GetGethWakuV2From(s.bobWaku).PeerID().String()))

    // Connect to store node to force "online" status
    {
        WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() []string {
            err := s.bob.DialPeer(storeAddress)
            s.Require().NoError(err)
            return []string{storePeerID}
        })
        s.Require().True(s.bob.Online())

        // Wait for bob to fetch backup and historic messages
        time.Sleep(2 * time.Second)
    }

    // bob goes offline
    {
        WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() bool {
            err := s.bob.DropPeer(storePeerID)
            s.Require().NoError(err)
            return false
        })
        s.Require().False(s.bob.Online())
    }

    // Owner sends a contact request while bob is offline
    {
        // Setup store nodes envelopes watcher
        partitionedTopic := transport.PartitionedTopic(s.bob.IdentityPublicKey())
        topic := transport.ToTopic(partitionedTopic)
        contentTopic := wakuV2common.BytesToTopic(topic)
        storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentTopic)

        // Send contact request
        response, err := s.owner.SendContactRequest(context.Background(), &requests.SendContactRequest{
            ID:      s.bob.IdentityPublicKeyString(),
            Message: "1",
        })
        s.Require().NoError(err)
        s.Require().NotNil(response)
        s.Require().Len(response.Messages(), 2)

        // Ensure contact request is stored
        s.waitForEnvelopes(storeNodeSubscription, 1)
    }

    // owner goes offline to prevent message resend and any other side effects
    // to go offline we disconnect from both relay and store peers
    WaitForConnectionStatus(&s.Suite, gethbridge.GetGethWakuV2From(s.ownerWaku), func() bool {
        err := s.owner.DropPeer(storePeerID)
        s.Require().NoError(err)
        return false
    })
    s.Require().False(s.owner.Online())

    // bob goes back online, this should trigger fetching historic messages
    {
        // Enable auto request historic messages, so that when bob goes online it will fetch historic messages
        // We don't enable it earlier to control when we connect to the store node.
        s.bob.config.codeControlFlags.AutoRequestHistoricMessages = true

        WaitForPeersConnected(&s.Suite, gethbridge.GetGethWakuV2From(s.bobWaku), func() []string {
            err := s.bob.DialPeer(storeAddress)
            s.Require().NoError(err)
            return []string{storePeerID}
        })
        s.Require().True(s.bob.Online())

        // Don't  dial the peer, message should be fetched from store node
        response, err := WaitOnMessengerResponse(
            s.bob,
            func(r *MessengerResponse) bool {
                return len(r.Contacts) == 1
            },
            "no contact request received",
        )
        s.Require().NoError(err)
        s.Require().NotNil(response)
        s.Require().Len(response.Contacts, 1)
    }
}