status-im/status-go

View on GitHub
protocol/messenger_raw_message_resend.go

Summary

Maintainability
A
0 mins
Test Coverage
C
78%
package protocol

import (
    "context"
    "fmt"
    "math"
    "time"

    "github.com/status-im/status-go/protocol/protobuf"

    "github.com/pkg/errors"
    "go.uber.org/zap"

    "github.com/status-im/status-go/protocol/common"
)

// watchExpiredMessages regularly checks for expired emojis and invoke their resending
func (m *Messenger) watchExpiredMessages() {
    m.logger.Debug("watching expired messages")
    go func() {
        for {
            select {
            case <-time.After(time.Second):
                if m.Online() {
                    err := m.resendExpiredMessages()
                    if err != nil {
                        m.logger.Debug("failed to resend expired message", zap.Error(err))
                    }
                }
            case <-m.quit:
                return
            }
        }
    }()
}

func (m *Messenger) resendExpiredMessages() error {
    if m.connectionState.Offline {
        return errors.New("offline")
    }

    ids, err := m.persistence.ExpiredMessagesIDs(m.config.messageResendMaxCount)
    if err != nil {
        return errors.Wrapf(err, "Can't get expired reactions from db")
    }

    for _, id := range ids {
        message, shouldResend, err := m.processMessageID(id)
        if err != nil {
            m.logger.Error("Error processing message ID when trying resend raw message", zap.String("id", id), zap.Error(err))
        } else if shouldResend {
            m.logger.Debug("Resent raw message",
                zap.String("id", id),
                zap.Any("message type", message.MessageType),
                zap.Int("send count", message.SendCount),
            )
        }
    }
    return nil
}

func (m *Messenger) processMessageID(id string) (*common.RawMessage, bool, error) {
    rawMessage, err := m.persistence.RawMessageByID(id)
    if err != nil {
        return nil, false, errors.Wrap(err, "Can't get raw message by ID")
    }

    shouldResend, err := m.shouldResendMessage(rawMessage, m.getTimesource())
    if err != nil {
        m.logger.Error("Can't check if message should be resent", zap.Error(err))
        return rawMessage, false, err
    }

    if !shouldResend {
        return rawMessage, false, nil
    }

    switch rawMessage.ResendMethod {
    case common.ResendMethodSendCommunityMessage:
        err = m.handleSendCommunityMessage(rawMessage)
    case common.ResendMethodSendPrivate:
        err = m.handleSendPrivateMessage(rawMessage)
    case common.ResendMethodDynamic:
        shouldResend, err = m.handleOtherResendMethods(rawMessage)
    default:
        err = errors.New("Unknown resend method")
    }
    return rawMessage, shouldResend, err
}

func (m *Messenger) handleSendCommunityMessage(rawMessage *common.RawMessage) error {
    _, err := m.sender.SendCommunityMessage(context.TODO(), rawMessage)
    if err != nil {
        err = errors.Wrap(err, "Can't resend message with SendCommunityMessage")
    }
    m.upsertRawMessageToWatch(rawMessage)
    return err
}

func (m *Messenger) handleSendPrivateMessage(rawMessage *common.RawMessage) error {
    if len(rawMessage.Recipients) == 0 {
        m.logger.Error("No recipients to resend message", zap.String("id", rawMessage.ID))
        m.upsertRawMessageToWatch(rawMessage)
        return errors.New("No recipients to resend message with SendPrivate")
    }

    var err error
    for _, r := range rawMessage.Recipients {
        _, err = m.sender.SendPrivate(context.TODO(), r, rawMessage)
        if err != nil {
            err = errors.Wrap(err, fmt.Sprintf("Can't resend message with SendPrivate to %s", common.PubkeyToHex(r)))
        }
    }

    m.upsertRawMessageToWatch(rawMessage)
    return err
}

func (m *Messenger) handleOtherResendMethods(rawMessage *common.RawMessage) (bool, error) {
    chat, ok := m.allChats.Load(rawMessage.LocalChatID)
    if !ok {
        m.logger.Error("Can't find chat with id", zap.String("id", rawMessage.LocalChatID))
        return false, nil // Continue with next message if chat not found
    }

    if !(chat.Public() || chat.CommunityChat()) {
        return false, nil // Only resend for public or community chats
    }

    if ok {
        err := m.persistence.SaveRawMessage(rawMessage)
        if err != nil {
            m.logger.Error("Can't save raw message marked as expired", zap.Error(err))
            return true, err
        }
    }
    return true, m.reSendRawMessage(context.Background(), rawMessage.ID)
}

func (m *Messenger) shouldResendMessage(message *common.RawMessage, t common.TimeSource) (bool, error) {
    if m.featureFlags.ResendRawMessagesDisabled {
        return false, nil
    }
    //exponential backoff depends on how many attempts to send message already made
    power := math.Pow(2, float64(message.SendCount-1))
    backoff := uint64(power) * uint64(m.config.messageResendMinDelay.Milliseconds())
    backoffElapsed := t.GetCurrentTime() > (message.LastSent + backoff)
    return backoffElapsed, nil
}

// pull a message from the database and send it again
func (m *Messenger) reSendRawMessage(ctx context.Context, messageID string) error {
    message, err := m.persistence.RawMessageByID(messageID)
    if err != nil {
        return err
    }

    chat, ok := m.allChats.Load(message.LocalChatID)
    if !ok {
        return errors.New("chat not found")
    }

    _, err = m.dispatchMessage(ctx, common.RawMessage{
        LocalChatID: chat.ID,
        Payload:     message.Payload,
        PubsubTopic: message.PubsubTopic,
        MessageType: message.MessageType,
        Recipients:  message.Recipients,
        ResendType:  message.ResendType,
        SendCount:   message.SendCount,
    })
    return err
}

// UpsertRawMessageToWatch insert/update the rawMessage to the database, resend it if necessary.
// relate watch method: Messenger#watchExpiredMessages
func (m *Messenger) UpsertRawMessageToWatch(rawMessage *common.RawMessage) (*common.RawMessage, error) {
    rawMessage.SendCount++
    rawMessage.LastSent = m.getTimesource().GetCurrentTime()
    err := m.persistence.SaveRawMessage(rawMessage)
    if err != nil {
        return nil, err
    }
    return rawMessage, nil
}

func (m *Messenger) upsertRawMessageToWatch(rawMessage *common.RawMessage) {
    _, err := m.UpsertRawMessageToWatch(rawMessage)
    if err != nil {
        // this is unlikely to happen, but we should log it
        m.logger.Error("Can't upsert raw message after SendCommunityMessage", zap.Error(err), zap.String("id", rawMessage.ID))
    }
}

func (m *Messenger) RawMessagesIDsByType(t protobuf.ApplicationMetadataMessage_Type) ([]string, error) {
    return m.persistence.RawMessagesIDsByType(t)
}

func (m *Messenger) RawMessageByID(id string) (*common.RawMessage, error) {
    return m.persistence.RawMessageByID(id)
}

func (m *Messenger) UpdateRawMessageSent(id string, sent bool) error {
    return m.persistence.UpdateRawMessageSent(id, sent)
}

func (m *Messenger) UpdateRawMessageLastSent(id string, lastSent uint64) error {
    return m.persistence.UpdateRawMessageLastSent(id, lastSent)
}