status-im/status-go

View on GitHub
protocol/messenger_communities_sharding_test.go

Summary

Maintainability
A
0 mins
Test Coverage
package protocol

import (
    "context"
    "crypto/ecdsa"
    "testing"

    "github.com/golang/protobuf/proto"
    "github.com/stretchr/testify/suite"
    "go.uber.org/zap"

    gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
    "github.com/status-im/status-go/eth-node/types"
    "github.com/status-im/status-go/protocol/common"
    "github.com/status-im/status-go/protocol/common/shard"
    "github.com/status-im/status-go/protocol/communities"
    "github.com/status-im/status-go/protocol/protobuf"
    "github.com/status-im/status-go/protocol/requests"
    "github.com/status-im/status-go/protocol/tt"
)

func TestMessengerCommunitiesShardingSuite(t *testing.T) {
    suite.Run(t, new(MessengerCommunitiesShardingSuite))
}

type MessengerCommunitiesShardingSuite struct {
    suite.Suite

    owner     *Messenger
    ownerWaku types.Waku

    alice                         *Messenger
    aliceWaku                     types.Waku
    aliceUnhandledMessagesTracker *unhandledMessagesTracker

    logger *zap.Logger

    mockedBalances          communities.BalancesByChain
    mockedCollectibles      communities.CollectiblesByChain
    collectiblesServiceMock *CollectiblesServiceMock
    collectiblesManagerMock *CollectiblesManagerMock
    accountsTestData        map[string][]string
    accountsPasswords       map[string]string
}

func (s *MessengerCommunitiesShardingSuite) SetupTest() {
    s.logger = tt.MustCreateTestLogger()
    s.collectiblesServiceMock = &CollectiblesServiceMock{}
    s.mockedCollectibles = make(communities.CollectiblesByChain)
    s.collectiblesManagerMock = &CollectiblesManagerMock{
        Collectibles: &s.mockedCollectibles,
    }
    s.accountsTestData = make(map[string][]string)
    s.accountsPasswords = make(map[string]string)

    wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "alice"})

    nodeConfig := defaultTestCommunitiesMessengerNodeConfig()

    s.ownerWaku = wakuNodes[0]
    s.owner = newTestCommunitiesMessenger(&s.Suite, s.ownerWaku, testCommunitiesMessengerConfig{
        testMessengerConfig: testMessengerConfig{
            name:   "owner",
            logger: s.logger,
        },
        walletAddresses:     []string{},
        password:            "",
        nodeConfig:          nodeConfig,
        mockedBalances:      &s.mockedBalances,
        collectiblesManager: s.collectiblesManagerMock,
    })

    s.aliceUnhandledMessagesTracker = &unhandledMessagesTracker{
        messages: map[protobuf.ApplicationMetadataMessage_Type][]*unhandedMessage{},
    }
    s.aliceWaku = wakuNodes[1]
    s.alice = newTestCommunitiesMessenger(&s.Suite, s.aliceWaku, testCommunitiesMessengerConfig{
        testMessengerConfig: testMessengerConfig{
            name:                     "alice",
            logger:                   s.logger,
            unhandledMessagesTracker: s.aliceUnhandledMessagesTracker,
        },
        walletAddresses: []string{aliceAddress1},
        password:        alicePassword,
        nodeConfig:      nodeConfig,
        mockedBalances:  &s.mockedBalances,
    })

    _, err := s.owner.Start()
    s.Require().NoError(err)
    _, err = s.alice.Start()
    s.Require().NoError(err)
}

func (s *MessengerCommunitiesShardingSuite) TearDownTest() {
    if s.owner != nil {
        TearDownMessenger(&s.Suite, s.owner)
    }
    if s.ownerWaku != nil {
        s.Require().NoError(gethbridge.GetGethWakuV2From(s.ownerWaku).Stop())
    }
    if s.alice != nil {
        TearDownMessenger(&s.Suite, s.alice)
    }
    if s.aliceWaku != nil {
        s.Require().NoError(gethbridge.GetGethWakuV2From(s.aliceWaku).Stop())
    }
    _ = s.logger.Sync()
}

func (s *MessengerCommunitiesShardingSuite) testPostToCommunityChat(shard *shard.Shard, community *communities.Community, chat *Chat) {
    _, err := s.owner.SetCommunityShard(&requests.SetCommunityShard{
        CommunityID: community.ID(),
        Shard:       shard,
    })
    s.Require().NoError(err)

    _, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
        if len(mr.communities) == 0 {
            return false
        }
        if shard == nil {
            return mr.Communities()[0].Shard() == nil
        }
        return mr.Communities()[0].Shard() != nil && mr.Communities()[0].Shard().Index == shard.Index
    }, "shard info not updated")
    s.Require().NoError(err)

    message := buildTestMessage(*chat)
    _, err = s.owner.SendChatMessage(context.Background(), message)
    s.Require().NoError(err)

    _, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
        return len(mr.messages) > 0 && mr.Messages()[0].ID == message.ID
    }, "message not received")
    s.Require().NoError(err)
}

func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
    community, chat := createCommunity(&s.Suite, s.owner)

    advertiseCommunityToUserOldWay(&s.Suite, community, s.owner, s.alice)
    joinCommunity(&s.Suite, community.ID(), s.owner, s.alice, alicePassword, []string{aliceAddress1})

    // Members should be able to receive messages in a community with sharding enabled.
    {
        shard := &shard.Shard{
            Cluster: shard.MainStatusShardCluster,
            Index:   128,
        }
        s.testPostToCommunityChat(shard, community, chat)
    }

    // Members should be able to receive messages in a community where the sharding configuration has been edited.
    {
        shard := &shard.Shard{
            Cluster: shard.MainStatusShardCluster,
            Index:   256,
        }
        s.testPostToCommunityChat(shard, community, chat)
    }

    // Members should continue to receive messages in a community if it is moved back to default shard.
    {
        shard := &shard.Shard{
            Cluster: shard.MainStatusShardCluster,
            Index:   32,
        }
        s.testPostToCommunityChat(shard, community, chat)
    }
}

func (s *MessengerCommunitiesShardingSuite) TestIgnoreOutdatedShardKey() {
    community, _ := createCommunity(&s.Suite, s.owner)

    advertiseCommunityToUserOldWay(&s.Suite, community, s.owner, s.alice)
    joinCommunity(&s.Suite, community.ID(), s.owner, s.alice, alicePassword, []string{aliceAddress1})

    shard := &shard.Shard{
        Cluster: shard.MainStatusShardCluster,
        Index:   128,
    }

    // Members should receive shard update.
    {
        response, err := s.owner.SetCommunityShard(&requests.SetCommunityShard{
            CommunityID: community.ID(),
            Shard:       shard,
        })
        s.Require().NoError(err)
        s.Require().Len(response.Communities(), 1)
        community = response.Communities()[0]

        _, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
            return len(mr.communities) > 0 && mr.Communities()[0].Shard() != nil && mr.Communities()[0].Shard().Index == shard.Index
        }, "shard info not updated")
        s.Require().NoError(err)
    }

    // Members should ignore outdated shard update.
    {
        // Simulate outdated CommunityShardKey message.
        shard.Index = 256
        communityShardKey := &protobuf.CommunityShardKey{
            Clock:       community.Clock() - 1, // simulate outdated clock
            CommunityId: community.ID(),
            Shard:       shard.Protobuffer(),
        }

        encodedMessage, err := proto.Marshal(communityShardKey)
        s.Require().NoError(err)

        rawMessage := common.RawMessage{
            Recipients:  []*ecdsa.PublicKey{&s.alice.identity.PublicKey},
            ResendType:  common.ResendTypeDataSync,
            MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_SHARD_KEY,
            Payload:     encodedMessage,
        }

        _, err = s.owner.sender.SendPubsubTopicKey(context.Background(), &rawMessage)
        s.Require().NoError(err)

        _, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
            msgType := protobuf.ApplicationMetadataMessage_COMMUNITY_SHARD_KEY
            msgs, exists := s.aliceUnhandledMessagesTracker.messages[msgType]
            if !exists {
                return false
            }

            for _, msg := range msgs {
                p := &protobuf.CommunityShardKey{}
                err := proto.Unmarshal(msg.ApplicationLayer.Payload, p)
                if err != nil {
                    panic(err)
                }

                if msg.err == communities.ErrOldShardInfo && p.Shard != nil && p.Shard.Index == int32(shard.Index) {
                    return true
                }
            }

            return false
        }, "shard info with outdated clock either not received or not ignored")
        s.Require().NoError(err)
    }
}