status-im/status-go

View on GitHub
protocol/messenger_peersyncing.go

Summary

Maintainability
A
0 mins
Test Coverage
F
26%
package protocol

import (
    "context"
    "crypto/ecdsa"
    "encoding/hex"
    "time"

    "github.com/golang/protobuf/proto"
    datasyncproto "github.com/status-im/mvds/protobuf"
    "github.com/status-im/mvds/state"

    "github.com/pkg/errors"
    "go.uber.org/zap"

    "github.com/status-im/status-go/eth-node/types"
    "github.com/status-im/status-go/protocol/common"
    "github.com/status-im/status-go/protocol/communities"
    datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
    "github.com/status-im/status-go/protocol/encryption/sharedsecret"
    "github.com/status-im/status-go/protocol/peersyncing"
    v1protocol "github.com/status-im/status-go/protocol/v1"
)

var peerSyncingLoopInterval time.Duration = 60 * time.Second
var maxAdvertiseMessages = 40

func (m *Messenger) markDeliveredMessages(acks [][]byte) {
    for _, ack := range acks {
        //get message ID from database by datasync ID, with at-least-one
        // semantic
        messageIDBytes, err := m.persistence.MarkAsConfirmed(ack, true)
        if err != nil {
            m.logger.Info("got datasync acknowledge for message we don't have in db", zap.String("ack", hex.EncodeToString(ack)))
            continue
        }

        messageID := messageIDBytes.String()
        //mark messages as delivered

        m.logger.Debug("got datasync acknowledge for message", zap.String("ack", hex.EncodeToString(ack)), zap.String("messageID", messageID))

        err = m.UpdateMessageOutgoingStatus(messageID, common.OutgoingStatusDelivered)
        if err != nil {
            m.logger.Debug("Can't set message status as delivered", zap.Error(err))
        }

        err = m.UpdateRawMessageSent(messageID, true)
        if err != nil {
            m.logger.Debug("can't set raw message as sent", zap.Error(err))
        }

        m.transport.ConfirmMessageDelivered(messageID)

        //send signal to client that message status updated
        if m.config.messengerSignalsHandler != nil {
            message, err := m.persistence.MessageByID(messageID)
            if err != nil {
                m.logger.Debug("Can't get message from database", zap.Error(err))
                continue
            }
            m.config.messengerSignalsHandler.MessageDelivered(message.LocalChatID, messageID)
        }
    }
}

func (m *Messenger) handleDatasyncMetadata(response *common.HandleMessageResponse) error {
    m.OnDatasyncAcks(response.DatasyncSender, response.DatasyncAcks)

    if !m.featureFlags.Peersyncing {
        return nil
    }

    isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
    if err != nil {
        return err
    }
    if !isPeerSyncingEnabled {
        return nil
    }

    err = m.OnDatasyncOffer(response)
    if err != nil {
        return err
    }

    err = m.OnDatasyncRequests(response.DatasyncSender, response.DatasyncRequests)
    if err != nil {
        return err
    }

    return nil
}

func (m *Messenger) startPeerSyncingLoop() {
    logger := m.logger.Named("PeerSyncingLoop")

    ticker := time.NewTicker(peerSyncingLoopInterval)
    go func() {
        for {
            select {
            case <-ticker.C:
                err := m.sendDatasyncOffers()
                if err != nil {
                    m.logger.Warn("failed to send datasync offers", zap.Error(err))
                }

            case <-m.quit:
                ticker.Stop()
                logger.Debug("peersyncing loop stopped")
                return
            }
        }
    }()
}

func (m *Messenger) sendDatasyncOffers() error {
    if !m.featureFlags.Peersyncing {
        return nil
    }

    isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
    if err != nil {
        return err
    }
    if !isPeerSyncingEnabled {
        return nil
    }

    err = m.sendDatasyncOffersForCommunities()
    if err != nil {
        return err
    }

    err = m.sendDatasyncOffersForChats()
    if err != nil {
        return err
    }

    // Check all the group ids that need to be on offer
    // Get all the messages that need to be offered
    // Prepare datasync messages
    // Dispatch them to the right group
    return nil
}

func (m *Messenger) sendDatasyncOffersForCommunities() error {
    joinedCommunities, err := m.communitiesManager.Joined()
    if err != nil {
        return err
    }

    for _, community := range joinedCommunities {
        var chatIDs [][]byte
        for id := range community.Chats() {
            chatIDs = append(chatIDs, []byte(community.IDString()+id))
        }
        if len(chatIDs) == 0 {
            continue
        }
        availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
        if err != nil {
            return err
        }
        datasyncMessage := &datasyncproto.Payload{}
        if len(availableMessagesMap) == 0 {
            continue
        }
        for chatID, m := range availableMessagesMap {
            datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
        }
        payload, err := proto.Marshal(datasyncMessage)
        if err != nil {
            return err
        }
        rawMessage := common.RawMessage{
            Payload:             payload,
            Ephemeral:           true,
            SkipApplicationWrap: true,
            PubsubTopic:         community.PubsubTopic(),
        }
        _, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
        if err != nil {
            return err
        }
    }
    return nil
}

func (m *Messenger) sendDatasyncOffersForChats() error {
    for _, chat := range m.Chats() {
        chatIDBytes := []byte(chat.ID)
        availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
        if err != nil {
            return err
        }
        datasyncMessage := &datasyncproto.Payload{}
        if len(availableMessagesMap) == 0 {
            continue
        }
        for _, message := range availableMessagesMap {
            datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
        }
        payload, err := proto.Marshal(datasyncMessage)
        if err != nil {
            return err
        }

        publicKey, err := chat.PublicKey()
        if err != nil {
            return err
        }
        rawMessage := common.RawMessage{
            Payload:             payload,
            Ephemeral:           true,
            SkipApplicationWrap: true,
        }
        _, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
        if err != nil {
            return err
        }
    }
    return nil
}

func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) error {
    sender := response.DatasyncSender
    offers := response.DatasyncOffers

    if len(offers) == 0 {
        return nil
    }

    if common.PubkeyToHex(sender) == m.myHexIdentity() {
        return nil
    }

    var offeredMessages []peersyncing.SyncMessage

    for _, o := range offers {
        offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.GroupID, ID: o.MessageID})
    }

    messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
    if err != nil {
        return err
    }

    if len(messagesToFetch) == 0 {
        return nil
    }

    datasyncMessage := &datasyncproto.Payload{}
    for _, msg := range messagesToFetch {
        idString := types.Bytes2Hex(msg.ID)
        lastOffered := m.peersyncingOffers[idString]
        timeNow := m.GetCurrentTimeInMillis() / 1000
        if lastOffered+30 < timeNow {
            m.peersyncingOffers[idString] = timeNow
            datasyncMessage.Requests = append(datasyncMessage.Requests, msg.ID)
        }
    }
    payload, err := proto.Marshal(datasyncMessage)
    if err != nil {
        return err
    }
    rawMessage := common.RawMessage{
        LocalChatID:         common.PubkeyToHex(sender),
        Payload:             payload,
        Ephemeral:           true,
        SkipApplicationWrap: true,
    }
    _, err = m.sender.SendPrivate(context.Background(), sender, &rawMessage)
    if err != nil {
        return err
    }

    // Check if any of the things need to be added
    // Reply if anything needs adding
    // Ack any message that is out
    return nil
}

// canSyncMessageWith checks the permission of a message
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
    switch message.Type {
    case peersyncing.SyncMessageCommunityType:
        chat, ok := m.allChats.Load(string(message.ChatID))
        if !ok {
            return false, nil
        }
        community, err := m.communitiesManager.GetByIDString(chat.CommunityID)
        if err != nil {
            return false, err
        }

        return m.canSyncCommunityMessageWith(chat, community, peer)
    case peersyncing.SyncMessageOneToOneType:
        chat, ok := m.allChats.Load(string(message.ChatID))
        if !ok {
            return false, nil
        }
        return m.canSyncOneToOneMessageWith(chat, peer)
    default:
        return false, nil
    }
}

// NOTE: This is not stricly correct. It's possible that you sync a message that has been
// posted after the banning of a user from a community, but before we realized that.
// As an approximation it should be ok, but worth thinking about how to address this.
func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communities.Community, peer *ecdsa.PublicKey) (bool, error) {
    return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
}

func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
    return chat.HasMember(common.PubkeyToHex(peer)), nil
}

func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
    if len(messageIDs) == 0 {
        return nil
    }

    messages, err := m.peersyncing.MessagesByIDs(messageIDs)
    if err != nil {
        return err
    }
    for _, msg := range messages {
        canSync, err := m.canSyncMessageWith(msg, requester)
        if err != nil {
            return err
        }
        if !canSync {
            continue
        }
        idString := common.PubkeyToHex(requester) + types.Bytes2Hex(msg.ID)
        lastRequested := m.peersyncingRequests[idString]
        timeNow := m.GetCurrentTimeInMillis() / 1000
        if lastRequested+30 < timeNow {
            m.peersyncingRequests[idString] = timeNow

            // Check permissions
            rawMessage := common.RawMessage{
                LocalChatID:         common.PubkeyToHex(requester),
                Payload:             msg.Payload,
                Ephemeral:           true,
                SkipApplicationWrap: true,
            }
            _, err = m.sender.SendPrivate(context.Background(), requester, &rawMessage)
            if err != nil {
                return err
            }
        }

    }
    // no need of group id, since we can derive from message
    return nil
}

func (m *Messenger) OnDatasyncAcks(sender *ecdsa.PublicKey, acks [][]byte) {
    // we should make sure the sender can acknowledge those messages
    m.markDeliveredMessages(acks)
}

// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.Payload) error {
    ctx := context.Background()
    if !payload.IsValid() {
        m.logger.Error("payload is invalid")
        return errors.New("payload is invalid")
    }

    marshalledPayload, err := proto.Marshal(payload)
    if err != nil {
        m.logger.Error("failed to marshal payload")
        return err
    }

    publicKey, err := datasyncpeer.IDToPublicKey(receiver)
    if err != nil {
        m.logger.Error("failed to convert id to public key", zap.Error(err))
        return err
    }

    // Calculate the messageIDs
    messageIDs := make([][]byte, 0, len(payload.Messages))
    hexMessageIDs := make([]string, 0, len(payload.Messages))
    for _, payload := range payload.Messages {
        mid := v1protocol.MessageID(&m.identity.PublicKey, payload.Body)
        messageIDs = append(messageIDs, mid)
        hexMessageIDs = append(hexMessageIDs, mid.String())
    }

    messageSpec, err := m.encryptor.BuildEncryptedMessage(m.identity, publicKey, marshalledPayload)
    if err != nil {
        return errors.Wrap(err, "failed to encrypt message")
    }

    // The shared secret needs to be handle before we send a message
    // otherwise the topic might not be set up before we receive a message
    err = m.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
    if err != nil {
        return err
    }

    hashes, newMessages, err := m.sender.SendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
    if err != nil {
        m.logger.Error("failed to send a datasync message", zap.Error(err))
        return err
    }

    m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
    m.transport.TrackMany(messageIDs, hashes, newMessages)

    return nil
}