status-im/status-go

View on GitHub
protocol/common/message_sender.go

Summary

Maintainability
C
1 day
Test Coverage
F
57%
package common

import (
    "context"
    "crypto/ecdsa"
    "database/sql"
    "sync"
    "time"

    "github.com/golang/protobuf/proto"
    "github.com/pkg/errors"
    datasyncnode "github.com/status-im/mvds/node"
    datasyncproto "github.com/status-im/mvds/protobuf"
    "github.com/status-im/mvds/state"
    "go.uber.org/zap"

    "github.com/status-im/status-go/eth-node/crypto"
    "github.com/status-im/status-go/eth-node/types"
    "github.com/status-im/status-go/protocol/datasync"
    datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
    "github.com/status-im/status-go/protocol/encryption"
    "github.com/status-im/status-go/protocol/encryption/sharedsecret"
    "github.com/status-im/status-go/protocol/protobuf"
    "github.com/status-im/status-go/protocol/transport"
    v1protocol "github.com/status-im/status-go/protocol/v1"
)

// Whisper message properties.
const (
    whisperTTL        = 15
    whisperDefaultPoW = 0.002
    // whisperLargeSizePoW is the PoWTarget for larger payload sizes
    whisperLargeSizePoW = 0.000002
    // largeSizeInBytes is when should we be using a lower POW.
    // Roughly this is 50KB
    largeSizeInBytes = 50000
    whisperPoWTime   = 5
)

// RekeyCompatibility indicates whether we should be sending
// keys in 1-to-1 messages as well as in the newer format
var RekeyCompatibility = true

// SentMessage reprent a message that has been passed to the transport layer
type SentMessage struct {
    PublicKey  *ecdsa.PublicKey
    Spec       *encryption.ProtocolMessageSpec
    MessageIDs [][]byte
}

type MessageEventType uint32

const (
    MessageScheduled = iota + 1
    MessageSent
)

type MessageEvent struct {
    Recipient   *ecdsa.PublicKey
    Type        MessageEventType
    SentMessage *SentMessage
    RawMessage  *RawMessage
}

type MessageSender struct {
    identity    *ecdsa.PrivateKey
    datasync    *datasync.DataSync
    database    *sql.DB
    protocol    *encryption.Protocol
    transport   *transport.Transport
    logger      *zap.Logger
    persistence *RawMessagesPersistence

    datasyncEnabled bool

    // ephemeralKeys is a map that contains the ephemeral keys of the client, used
    // to decrypt messages
    ephemeralKeys      map[string]*ecdsa.PrivateKey
    ephemeralKeysMutex sync.Mutex

    // messageEventsSubscriptions contains all the subscriptions for message events
    messageEventsSubscriptions      []chan<- *MessageEvent
    messageEventsSubscriptionsMutex sync.Mutex

    featureFlags FeatureFlags

    // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
    handleSharedSecrets func([]*sharedsecret.Secret) error
}

func NewMessageSender(
    identity *ecdsa.PrivateKey,
    database *sql.DB,
    enc *encryption.Protocol,
    transport *transport.Transport,
    logger *zap.Logger,
    features FeatureFlags,
) (*MessageSender, error) {
    p := &MessageSender{
        identity:        identity,
        datasyncEnabled: features.Datasync,
        protocol:        enc,
        database:        database,
        persistence:     NewRawMessagesPersistence(database),
        transport:       transport,
        logger:          logger,
        ephemeralKeys:   make(map[string]*ecdsa.PrivateKey),
        featureFlags:    features,
    }

    return p, nil
}

func (s *MessageSender) Stop() {
    s.messageEventsSubscriptionsMutex.Lock()
    defer s.messageEventsSubscriptionsMutex.Unlock()

    for _, c := range s.messageEventsSubscriptions {
        close(c)
    }
    s.messageEventsSubscriptions = nil
    s.StopDatasync()
}

func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
    s.handleSharedSecrets = handler
}

func (s *MessageSender) StartDatasync(statusChangeEvent chan datasyncnode.PeerStatusChangeEvent, handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error {
    if !s.datasyncEnabled {
        return nil
    }

    dataSyncTransport := datasync.NewNodeTransport()
    dataSyncNode, err := datasyncnode.NewPersistentNode(
        s.database,
        dataSyncTransport,
        datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey),
        datasyncnode.BATCH,
        datasync.CalculateSendTime,
        statusChangeEvent,
        s.logger,
    )
    if err != nil {
        return err
    }

    s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)

    s.datasync.Init(handler, s.logger)
    s.datasync.Start(datasync.DatasyncTicker)

    return nil
}

// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPrivate(
    ctx context.Context,
    recipient *ecdsa.PublicKey,
    rawMessage *RawMessage,
) ([]byte, error) {
    s.logger.Debug(
        "sending a private message",
        zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
        zap.String("site", "SendPrivate"),
    )
    // Currently we don't support sending through datasync and setting custom waku fields,
    // as the datasync interface is not rich enough to propagate that information, so we
    // would have to add some complexity to handle this.
    if rawMessage.ResendType == ResendTypeDataSync && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
        return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
    }

    // Set sender identity if not specified
    if rawMessage.Sender == nil {
        rawMessage.Sender = s.identity
    }

    return s.sendPrivate(ctx, recipient, rawMessage)
}

// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
    ctx context.Context,
    rawMessage *RawMessage,
) ([]byte, error) {
    s.logger.Debug(
        "sending a community message",
        zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
        zap.String("site", "SendCommunityMessage"),
    )
    rawMessage.Sender = s.identity

    return s.sendCommunity(ctx, rawMessage)
}

// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
func (s *MessageSender) SendPubsubTopicKey(
    ctx context.Context,
    rawMessage *RawMessage,
) ([]byte, error) {
    s.logger.Debug(
        "sending the protected topic key for a community",
        zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
        zap.String("site", "SendPubsubTopicKey"),
    )
    rawMessage.Sender = s.identity
    messageID, err := s.getMessageID(rawMessage)
    if err != nil {
        return nil, err
    }

    rawMessage.ID = types.EncodeHex(messageID)

    // Notify before dispatching, otherwise the dispatch subscription might happen
    // earlier than the scheduled
    s.notifyOnScheduledMessage(nil, rawMessage)

    // Send to each recipients
    for _, recipient := range rawMessage.Recipients {
        _, err = s.sendPrivate(ctx, recipient, rawMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to send message")
        }
    }
    return messageID, nil

}

// SendGroup takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (s *MessageSender) SendGroup(
    ctx context.Context,
    recipients []*ecdsa.PublicKey,
    rawMessage RawMessage,
) ([]byte, error) {
    s.logger.Debug(
        "sending a private group message",
        zap.String("site", "SendGroup"),
    )
    // Set sender if not specified
    if rawMessage.Sender == nil {
        rawMessage.Sender = s.identity
    }

    // Calculate messageID first and set on raw message
    wrappedMessage, err := s.wrapMessageV1(&rawMessage)
    if err != nil {
        return nil, errors.Wrap(err, "failed to wrap message")
    }
    messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
    rawMessage.ID = types.EncodeHex(messageID)

    // We call it only once, and we nil the function after so it doesn't get called again
    if rawMessage.BeforeDispatch != nil {
        if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
            return nil, err
        }
    }

    // Send to each recipients
    for _, recipient := range recipients {
        _, err = s.sendPrivate(ctx, recipient, &rawMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to send message")
        }
    }
    return messageID, nil
}

func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) {
    wrappedMessage, err := s.wrapMessageV1(rawMessage)
    if err != nil {
        return nil, errors.Wrap(err, "failed to wrap message")
    }

    messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

    return messageID, nil
}

func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool {
    return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE ||
        msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE ||
        msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE ||
        msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE ||
        msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION
}

// sendCommunity sends a message that's to be sent in a community
// If it's a chat message, it will go to the respective topic derived by the
// chat id, if it's not a chat message, it will go to the community topic.
func (s *MessageSender) sendCommunity(
    ctx context.Context,
    rawMessage *RawMessage,
) ([]byte, error) {
    s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey))))

    // Set sender
    if rawMessage.Sender == nil {
        rawMessage.Sender = s.identity
    }

    messageID, err := s.getMessageID(rawMessage)
    if err != nil {
        return nil, err
    }
    rawMessage.ID = types.EncodeHex(messageID)

    if rawMessage.BeforeDispatch != nil {
        if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
            return nil, err
        }
    }
    // Notify before dispatching, otherwise the dispatch subscription might happen
    // earlier than the scheduled
    s.notifyOnScheduledMessage(nil, rawMessage)

    var hashes [][]byte
    var newMessages []*types.NewMessage

    forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey

    // Check if it's a key exchange message. In this case we send it
    // to all the recipients
    if rawMessage.CommunityKeyExMsgType != KeyExMsgNone {
        // If rekeycompatibility is on, we always
        // want to execute below, otherwise we execute
        // only when we want to fill up old keys to a given user
        if RekeyCompatibility || !forceRekey {
            keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey)
            if err != nil {
                return nil, err
            }

            for i, spec := range keyExMessageSpecs {
                recipient := rawMessage.Recipients[i]
                _, _, err = s.sendMessageSpec(ctx, recipient, spec, [][]byte{messageID})
                if err != nil {
                    return nil, err
                }
            }
        }
    }

    wrappedMessage, err := s.wrapMessageV1(rawMessage)
    if err != nil {
        return nil, err
    }

    // If it's a chat message, we send it on the community chat topic
    if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) {
        messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage)
        if err != nil {
            return nil, err
        }

        payload, err := proto.Marshal(messageSpec.Message)
        if err != nil {
            return nil, errors.Wrap(err, "failed to marshal")
        }
        hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload, forceRekey)
        if err != nil {
            return nil, err
        }

        sentMessage := &SentMessage{
            Spec:       messageSpec,
            MessageIDs: [][]byte{messageID},
        }

        s.notifyOnSentMessage(sentMessage)

    } else {

        pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID)
        if err != nil {
            return nil, errors.Wrap(err, "failed to decompress pubkey")
        }
        hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, wrappedMessage, rawMessage.PubsubTopic, forceRekey, rawMessage)
        if err != nil {
            s.logger.Error("failed to send a community message", zap.Error(err))
            return nil, errors.Wrap(err, "failed to send a message spec")
        }
    }

    s.logger.Debug("sent-message: community ",
        zap.Strings("recipient", PubkeysToHex(rawMessage.Recipients)),
        zap.String("messageID", messageID.String()),
        zap.String("messageType", "community"),
        zap.Any("contentType", rawMessage.MessageType),
        zap.Strings("hashes", types.EncodeHexes(hashes)))
    s.transport.Track(messageID, hashes, newMessages)

    return messageID, nil
}

// sendPrivate sends data to the recipient identifying with a given public key.
func (s *MessageSender) sendPrivate(
    ctx context.Context,
    recipient *ecdsa.PublicKey,
    rawMessage *RawMessage,
) ([]byte, error) {
    s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))

    var wrappedMessage []byte
    var err error
    if rawMessage.SkipApplicationWrap {
        wrappedMessage = rawMessage.Payload
    } else {
        wrappedMessage, err = s.wrapMessageV1(rawMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to wrap message")
        }
    }

    messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
    rawMessage.ID = types.EncodeHex(messageID)
    if rawMessage.BeforeDispatch != nil {
        if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
            return nil, err
        }
    }

    // Notify before dispatching, otherwise the dispatch subscription might happen
    // earlier than the scheduled
    s.notifyOnScheduledMessage(recipient, rawMessage)

    if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendType == ResendTypeDataSync {
        // No need to call transport tracking.
        // It is done in a data sync dispatch step.
        datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to send message with datasync")
        }
        // We don't need to receive confirmations from our own devices
        if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
            confirmation := &RawMessageConfirmation{
                DataSyncID: datasyncID,
                MessageID:  messageID,
                PublicKey:  crypto.CompressPubkey(recipient),
            }

            err = s.persistence.InsertPendingConfirmation(confirmation)
            if err != nil {
                return nil, err
            }
        }
    } else if rawMessage.SkipEncryptionLayer {

        messageBytes := wrappedMessage
        if rawMessage.CommunityKeyExMsgType == KeyExMsgReuse {
            groupID := rawMessage.HashRatchetGroupID

            ratchets, err := s.protocol.GetKeysForGroup(groupID)
            if err != nil {
                return nil, err
            }

            message, err := s.protocol.BuildHashRatchetKeyExchangeMessageWithPayload(s.identity, recipient, groupID, ratchets, wrappedMessage)
            if err != nil {
                return nil, err
            }

            messageBytes, err = proto.Marshal(message.Message)
            if err != nil {
                return nil, err
            }
        }

        // When SkipProtocolLayer is set we don't pass the message to the encryption layer
        hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, messageBytes)
        if err != nil {
            s.logger.Error("failed to send a private message", zap.Error(err))
            return nil, errors.Wrap(err, "failed to send a message spec")
        }

        s.logger.Debug("sent-message: private skipProtocolLayer",
            zap.Strings("recipient", PubkeysToHex(rawMessage.Recipients)),
            zap.String("messageID", messageID.String()),
            zap.String("messageType", "private"),
            zap.Any("contentType", rawMessage.MessageType),
            zap.Strings("hashes", types.EncodeHexes(hashes)))
        s.transport.Track(messageID, hashes, newMessages)

    } else {
        messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to encrypt message")
        }

        hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
        if err != nil {
            s.logger.Error("failed to send a private message", zap.Error(err))
            return nil, errors.Wrap(err, "failed to send a message spec")
        }

        s.logger.Debug("sent-message: private without datasync",
            zap.Strings("recipient", PubkeysToHex(rawMessage.Recipients)),
            zap.String("messageID", messageID.String()),
            zap.Any("contentType", rawMessage.MessageType),
            zap.String("messageType", "private"),
            zap.Strings("hashes", types.EncodeHexes(hashes)))
        s.transport.Track(messageID, hashes, newMessages)
    }

    return messageID, nil
}

// sendPairInstallation sends data to the recipients, using DH
func (s *MessageSender) SendPairInstallation(
    ctx context.Context,
    recipient *ecdsa.PublicKey,
    rawMessage RawMessage,
) ([]byte, error) {
    s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))

    wrappedMessage, err := s.wrapMessageV1(&rawMessage)
    if err != nil {
        return nil, errors.Wrap(err, "failed to wrap message")
    }

    messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
    if err != nil {
        return nil, errors.Wrap(err, "failed to encrypt message")
    }

    messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)

    hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
    if err != nil {
        return nil, errors.Wrap(err, "failed to send a message spec")
    }

    s.transport.Track(messageID, hashes, newMessages)

    return messageID, nil
}

func (s *MessageSender) encodeMembershipUpdate(
    message v1protocol.MembershipUpdateMessage,
    chatEntity ChatEntity,
) ([]byte, error) {

    if chatEntity != nil {
        chatEntityProtobuf := chatEntity.GetProtobuf()
        switch chatEntityProtobuf := chatEntityProtobuf.(type) {
        case *protobuf.ChatMessage:
            message.Message = chatEntityProtobuf
        case *protobuf.EmojiReaction:
            message.EmojiReaction = chatEntityProtobuf

        }
    }

    encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
    if err != nil {
        return nil, errors.Wrap(err, "failed to encode membership update message")
    }

    return encodedMessage, nil
}

// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (s *MessageSender) EncodeMembershipUpdate(
    group *v1protocol.Group,
    chatEntity ChatEntity,
) ([]byte, error) {
    message := v1protocol.MembershipUpdateMessage{
        ChatID: group.ChatID(),
        Events: group.Events(),
    }

    return s.encodeMembershipUpdate(message, chatEntity)
}

// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// Only the events relevant to the current group are encoded
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
    group *v1protocol.Group,
    chatEntity ChatEntity,
) ([]byte, error) {
    message := v1protocol.MembershipUpdateMessage{
        ChatID: group.ChatID(),
        Events: group.AbridgedEvents(),
    }
    return s.encodeMembershipUpdate(message, chatEntity)
}

func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte, rekey bool) ([][]byte, []*types.NewMessage, error) {
    payload := wrappedMessage
    var err error
    if rekey && len(rawMessage.HashRatchetGroupID) != 0 {

        var ratchet *encryption.HashRatchetKeyCompatibility
        // We have just rekeyed, pull the latest
        if RekeyCompatibility {
            ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
            if err != nil {
                return nil, nil, err
            }

        }
        // We send the message over the community topic
        spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
        if err != nil {
            return nil, nil, err
        }
        payload, err = proto.Marshal(spec.Message)
        if err != nil {
            return nil, nil, err
        }
    }

    newMessage := &types.NewMessage{
        TTL:         whisperTTL,
        Payload:     payload,
        PowTarget:   calculatePoW(payload),
        PowTime:     whisperPoWTime,
        PubsubTopic: rawMessage.PubsubTopic,
    }

    if rawMessage.BeforeDispatch != nil {
        if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
            return nil, nil, err
        }
    }

    // notify before dispatching
    s.notifyOnScheduledMessage(nil, rawMessage)

    newMessages, err := s.segmentMessage(newMessage)
    if err != nil {
        return nil, nil, err
    }

    hashes := make([][]byte, 0, len(newMessages))
    for _, newMessage := range newMessages {
        hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
        if err != nil {
            return nil, nil, err
        }
        hashes = append(hashes, hash)
    }

    return hashes, newMessages, nil
}

// SendPublic takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPublic(
    ctx context.Context,
    chatName string,
    rawMessage RawMessage,
) ([]byte, error) {
    // Set sender
    if rawMessage.Sender == nil {
        rawMessage.Sender = s.identity
    }

    var wrappedMessage []byte
    var err error
    if rawMessage.SkipApplicationWrap {
        wrappedMessage = rawMessage.Payload
    } else {
        wrappedMessage, err = s.wrapMessageV1(&rawMessage)
        if err != nil {
            return nil, errors.Wrap(err, "failed to wrap message")
        }
    }

    var newMessage *types.NewMessage

    messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
    if err != nil {
        s.logger.Error("failed to send a public message", zap.Error(err))
        return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
    }

    if len(rawMessage.HashRatchetGroupID) != 0 {

        var ratchet *encryption.HashRatchetKeyCompatibility
        var err error
        // We have just rekeyed, pull the latest
        ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
        if err != nil {
            return nil, err
        }

        keyID, err := ratchet.GetKeyID()
        if err != nil {
            return nil, err
        }
        s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
        // We send the message over the community topic
        spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
        if err != nil {
            return nil, err
        }
        newMessage, err = MessageSpecToWhisper(spec)
        if err != nil {
            return nil, err
        }

    } else if !rawMessage.SkipEncryptionLayer {
        newMessage, err = MessageSpecToWhisper(messageSpec)
        if err != nil {
            return nil, err
        }
    } else {
        newMessage = &types.NewMessage{
            TTL:       whisperTTL,
            Payload:   wrappedMessage,
            PowTarget: calculatePoW(wrappedMessage),
            PowTime:   whisperPoWTime,
        }
    }

    newMessage.Ephemeral = rawMessage.Ephemeral
    newMessage.PubsubTopic = rawMessage.PubsubTopic

    messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
    rawMessage.ID = types.EncodeHex(messageID)

    if rawMessage.BeforeDispatch != nil {
        if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
            return nil, err
        }
    }

    // notify before dispatching
    s.notifyOnScheduledMessage(nil, &rawMessage)

    newMessages, err := s.segmentMessage(newMessage)
    if err != nil {
        return nil, err
    }

    hashes := make([][]byte, 0, len(newMessages))
    for _, newMessage := range newMessages {
        hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
        if err != nil {
            return nil, err
        }
        hashes = append(hashes, hash)
    }

    sentMessage := &SentMessage{
        Spec:       messageSpec,
        MessageIDs: [][]byte{messageID},
    }

    s.notifyOnSentMessage(sentMessage)

    s.logger.Debug("sent-message: public message",
        zap.Strings("recipient", PubkeysToHex(rawMessage.Recipients)),
        zap.String("messageID", messageID.String()),
        zap.Any("contentType", rawMessage.MessageType),
        zap.String("messageType", "public"),
        zap.Strings("hashes", types.EncodeHexes(hashes)))
    s.transport.Track(messageID, hashes, newMessages)

    return messageID, nil
}

// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {

    datasyncMessage, err := s.datasync.Unwrap(
        m.SigPubKey(),
        m.EncryptionLayer.Payload,
    )
    if err != nil {
        return err
    }

    response.DatasyncSender = m.SigPubKey()
    response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...)
    response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
    for _, o := range datasyncMessage.GroupOffers {
        for _, mID := range o.MessageIds {
            response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
        }
    }

    for _, ds := range datasyncMessage.Messages {
        message, err := m.Clone()
        if err != nil {
            return err
        }
        message.EncryptionLayer.Payload = ds.Body
        response.DatasyncMessages = append(response.DatasyncMessages, message)

    }
    return nil
}

// HandleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessageResponse, error) {
    logger := s.logger.With(zap.String("site", "HandleMessages"))
    hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))

    response, err := s.handleMessage(wakuMessage)
    if err != nil {
        // Hash ratchet with a group id not found yet, save the message for future processing
        if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 {
            info := response.Message.EncryptionLayer.HashRatchetInfo[0]
            return nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage)
        }

        // The current message segment has been successfully retrieved.
        // However, the collection of segments is not yet complete.
        if err == ErrMessageSegmentsIncomplete {
            return nil, nil
        }

        return nil, err
    }

    // Process queued hash ratchet messages
    for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
        messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
        if err != nil {
            return nil, err
        }

        var processedIds [][]byte
        for _, message := range messages {
            hlogger.Info("handling out of order encrypted messages", zap.String("hash", types.Bytes2Hex(message.Hash)))
            r, err := s.handleMessage(message)
            if err != nil {
                hlogger.Debug("failed to handle hash ratchet message", zap.Error(err))
                continue
            }
            response.DatasyncMessages = append(response.toPublicResponse().StatusMessages, r.Messages()...)
            response.DatasyncAcks = append(response.DatasyncAcks, r.DatasyncAcks...)

            processedIds = append(processedIds, message.Hash)
        }

        err = s.persistence.DeleteHashRatchetMessages(processedIds)
        if err != nil {
            s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err))
            return nil, err
        }
    }

    return response.toPublicResponse(), nil
}

type DatasyncOffer struct {
    GroupID   []byte
    MessageID []byte
}

type HandleMessageResponse struct {
    Hash             []byte
    StatusMessages   []*v1protocol.StatusMessage
    DatasyncSender   *ecdsa.PublicKey
    DatasyncAcks     [][]byte
    DatasyncOffers   []DatasyncOffer
    DatasyncRequests [][]byte
}

func (h *handleMessageResponse) toPublicResponse() *HandleMessageResponse {
    return &HandleMessageResponse{
        Hash:             h.Hash,
        StatusMessages:   h.Messages(),
        DatasyncSender:   h.DatasyncSender,
        DatasyncAcks:     h.DatasyncAcks,
        DatasyncOffers:   h.DatasyncOffers,
        DatasyncRequests: h.DatasyncRequests,
    }
}

type handleMessageResponse struct {
    Hash             []byte
    Message          *v1protocol.StatusMessage
    DatasyncMessages []*v1protocol.StatusMessage
    DatasyncSender   *ecdsa.PublicKey
    DatasyncAcks     [][]byte
    DatasyncOffers   []DatasyncOffer
    DatasyncRequests [][]byte
}

func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage {
    if len(h.DatasyncMessages) > 0 {
        return h.DatasyncMessages
    }
    return []*v1protocol.StatusMessage{h.Message}
}

func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) {
    logger := s.logger.With(zap.String("site", "handleMessage"))
    hlogger := logger.With(zap.String("hash", types.EncodeHex(wakuMessage.Hash)))

    message := &v1protocol.StatusMessage{}

    response := &handleMessageResponse{
        Hash:             wakuMessage.Hash,
        Message:          message,
        DatasyncMessages: []*v1protocol.StatusMessage{},
        DatasyncAcks:     [][]byte{},
    }

    err := message.HandleTransportLayer(wakuMessage)
    if err != nil {
        hlogger.Error("failed to handle transport layer message", zap.Error(err))
        return nil, err
    }

    err = s.handleSegmentationLayerV2(message)
    if err != nil {
        hlogger.Debug("failed to handle segmentation layer message", zap.Error(err))

        // Segments not completed yet, stop processing
        if err == ErrMessageSegmentsIncomplete {
            return nil, err
        }
        // Segments already completed, stop processing
        if err == ErrMessageSegmentsAlreadyCompleted {
            return nil, err
        }
    }

    err = s.handleEncryptionLayer(context.Background(), message)
    if err != nil {
        hlogger.Debug("failed to handle an encryption message", zap.Error(err))

        // Hash ratchet with a group id not found yet, stop processing
        if err == encryption.ErrHashRatchetGroupIDNotFound {
            return response, err
        }
    }

    if s.datasync != nil && s.datasyncEnabled {
        err := s.unwrapDatasyncMessage(message, response)
        if err != nil {
            hlogger.Debug("failed to handle datasync message", zap.Error(err))
        }
    }

    for _, msg := range response.Messages() {
        err := msg.HandleApplicationLayer()
        if err != nil {
            hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
        }
    }

    return response, nil
}

// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
    destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))

    s.ephemeralKeysMutex.Lock()
    decryptionKey, ok := s.ephemeralKeys[destinationID]
    s.ephemeralKeysMutex.Unlock()

    // the key is not there, fallback on identity
    if !ok {
        return s.identity, false
    }
    return decryptionKey, true
}

func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
    logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
    publicKey := message.SigPubKey()

    // if it's an ephemeral key, we don't negotiate a topic
    decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst)

    err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation)

    // if it's an ephemeral key, we don't have to handle a device not found error
    if err == encryption.ErrDeviceNotFound && !skipNegotiation {
        if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
            logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
        }
    }
    if err != nil {
        logger.Error("failed to handle an encrypted message", zap.Error(err))
        return err
    }

    return nil
}

func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
    now := time.Now().Unix()
    advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
    if err != nil {
        return err
    }
    if !advertise {
        return nil
    }

    messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
    if err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()
    // We don't pass an array of messageIDs as no action needs to be taken
    // when sending a bundle
    _, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
    if err != nil {
        return err
    }

    s.protocol.ConfirmBundleAdvertisement(publicKey, now)

    return nil
}

func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
    wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
    if err != nil {
        return nil, errors.Wrap(err, "failed to wrap message")
    }
    return wrappedMessage, nil
}

func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
    groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
    peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
    exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
    if err != nil {
        return nil, errors.Wrap(err, "failed to check if peer is in group")
    }
    if !exist {
        if err := s.datasync.AddPeer(groupID, peerID); err != nil {
            return nil, errors.Wrap(err, "failed to add peer")
        }
    }
    id, err := s.datasync.AppendMessage(groupID, message)
    if err != nil {
        return nil, errors.Wrap(err, "failed to append message to datasync")
    }

    return id[:], nil
}

// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte) ([][]byte, []*types.NewMessage, error) {
    newMessage := &types.NewMessage{
        TTL:         whisperTTL,
        Payload:     payload,
        PowTarget:   calculatePoW(payload),
        PowTime:     whisperPoWTime,
        PubsubTopic: rawMessage.PubsubTopic,
    }

    newMessages, err := s.segmentMessage(newMessage)
    if err != nil {
        return nil, nil, err
    }

    hashes := make([][]byte, 0, len(newMessages))
    var hash []byte
    for _, newMessage := range newMessages {
        if rawMessage.SendOnPersonalTopic {
            hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
        } else {
            hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
        }
        if err != nil {
            return nil, nil, err
        }
        hashes = append(hashes, hash)
    }

    return hashes, newMessages, nil
}

// sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, wrappedMessage []byte, pubsubTopic string, rekey bool, rawMessage *RawMessage) ([][]byte, []*types.NewMessage, error) {
    payload := wrappedMessage
    if rekey && len(rawMessage.HashRatchetGroupID) != 0 {

        var ratchet *encryption.HashRatchetKeyCompatibility
        var err error
        // We have just rekeyed, pull the latest
        if RekeyCompatibility {
            ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
            if err != nil {
                return nil, nil, err
            }

        }
        keyID, err := ratchet.GetKeyID()
        if err != nil {
            return nil, nil, err
        }
        s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
        // We send the message over the community topic
        spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
        if err != nil {
            return nil, nil, err
        }
        payload, err = proto.Marshal(spec.Message)
        if err != nil {
            return nil, nil, err
        }
    }

    newMessage := &types.NewMessage{
        TTL:         whisperTTL,
        Payload:     payload,
        PowTarget:   calculatePoW(payload),
        PowTime:     whisperPoWTime,
        PubsubTopic: pubsubTopic,
    }

    newMessages, err := s.segmentMessage(newMessage)
    if err != nil {
        return nil, nil, err
    }

    hashes := make([][]byte, 0, len(newMessages))
    for _, newMessage := range newMessages {
        hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
        if err != nil {
            return nil, nil, err
        }
        hashes = append(hashes, hash)
    }

    return hashes, newMessages, nil
}

func (s *MessageSender) SendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
    return s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
}

// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
    logger := s.logger.With(zap.String("site", "sendMessageSpec"))

    newMessage, err := MessageSpecToWhisper(messageSpec)
    if err != nil {
        return nil, nil, err
    }

    newMessages, err := s.segmentMessage(newMessage)
    if err != nil {
        return nil, nil, err
    }

    hashes := make([][]byte, 0, len(newMessages))
    var hash []byte
    for _, newMessage := range newMessages {
        // The shared secret needs to be handle before we send a message
        // otherwise the topic might not be set up before we receive a message
        if messageSpec.SharedSecret != nil && s.handleSharedSecrets != nil {
            err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
            if err != nil {
                return nil, nil, err
            }

        }
        // process shared secret
        if messageSpec.AgreedSecret {
            logger.Debug("sending using shared secret")
            hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
        } else {
            logger.Debug("sending partitioned topic")
            hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
        }
        if err != nil {
            return nil, nil, err
        }
        hashes = append(hashes, hash)
    }

    sentMessage := &SentMessage{
        PublicKey:  publicKey,
        Spec:       messageSpec,
        MessageIDs: messageIDs,
    }

    s.notifyOnSentMessage(sentMessage)

    return hashes, newMessages, nil
}

func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
    c := make(chan *MessageEvent, 100)
    s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
    return c
}

func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
    event := &MessageEvent{
        Type:        MessageSent,
        SentMessage: sentMessage,
    }

    s.messageEventsSubscriptionsMutex.Lock()
    defer s.messageEventsSubscriptionsMutex.Unlock()

    // Publish on channels, drop if buffer is full
    for _, c := range s.messageEventsSubscriptions {
        select {
        case c <- event:
        default:
            s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
        }
    }

}

func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) {
    event := &MessageEvent{
        Recipient:  recipient,
        Type:       MessageScheduled,
        RawMessage: message,
    }

    s.messageEventsSubscriptionsMutex.Lock()
    defer s.messageEventsSubscriptionsMutex.Unlock()

    // Publish on channels, drop if buffer is full
    for _, c := range s.messageEventsSubscriptions {
        select {
        case c <- event:
        default:
            s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
        }
    }
}

func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
    return s.transport.JoinPublic(id)
}

// AddEphemeralKey adds an ephemeral key that we will be listening to
// note that we never removed them from now, as waku/whisper does not
// recalculate topics on removal, so effectively there's no benefit.
// On restart they will be gone.
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
    s.ephemeralKeysMutex.Lock()
    s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
    s.ephemeralKeysMutex.Unlock()
    return s.transport.LoadKeyFilters(privateKey)
}

func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
    var newMessage *types.NewMessage

    payload, err := proto.Marshal(spec.Message)
    if err != nil {
        return newMessage, err
    }

    newMessage = &types.NewMessage{
        TTL:       whisperTTL,
        Payload:   payload,
        PowTarget: calculatePoW(payload),
        PowTime:   whisperPoWTime,
    }
    return newMessage, nil
}

// calculatePoW returns the PoWTarget to be used.
// We check the size and arbitrarily set it to a lower PoW if the packet is
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
// large messages.
func calculatePoW(payload []byte) float64 {
    if len(payload) > largeSizeInBytes {
        return whisperLargeSizePoW
    }
    return whisperDefaultPoW
}

func (s *MessageSender) StopDatasync() {
    if s.datasync != nil {
        s.datasync.Stop()
    }
}

// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group
func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) {
    return s.protocol.GetCurrentKeyForGroup(groupID)
}

// GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID
func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) {
    return s.protocol.GetKeysForGroup(groupID)
}

func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error {
    monthAgo := time.Now().AddDate(0, -1, 0).Unix()

    err := s.persistence.DeleteHashRatchetMessagesOlderThan(monthAgo)
    if err != nil {
        return err
    }

    return nil
}