status-im/status-go

View on GitHub
protocol/messenger_mailserver.go

Summary

Maintainability
C
1 day
Test Coverage
F
54%
package protocol

import (
    "context"
    "fmt"
    "math"
    "sort"
    "sync"
    "time"

    "github.com/pkg/errors"
    "go.uber.org/zap"

    "github.com/status-im/status-go/connection"
    "github.com/status-im/status-go/eth-node/crypto"
    "github.com/status-im/status-go/eth-node/types"
    "github.com/status-im/status-go/protocol/common"
    "github.com/status-im/status-go/protocol/protobuf"
    "github.com/status-im/status-go/protocol/transport"
    "github.com/status-im/status-go/services/mailservers"
)

const (
    initialStoreNodeRequestPageSize = 4
    defaultStoreNodeRequestPageSize = 50

    // tolerance is how many seconds of potentially out-of-order messages we want to fetch
    tolerance uint32 = 60

    mailserverRequestTimeout         = 30 * time.Second
    mailserverMaxTries          uint = 2
    mailserverMaxFailedRequests uint = 2

    oneDayDuration   = 24 * time.Hour
    oneMonthDuration = 31 * oneDayDuration
)

// maxTopicsPerRequest sets the batch size to limit the number of topics per store query
var maxTopicsPerRequest int = 10

var ErrNoFiltersForChat = errors.New("no filter registered for given chat")

func (m *Messenger) shouldSync() (bool, error) {
    // TODO (pablo) support community store node as well
    if m.mailserverCycle.activeMailserver == nil || !m.Online() {
        return false, nil
    }

    useMailserver, err := m.settings.CanUseMailservers()
    if err != nil {
        m.logger.Error("failed to get use mailservers", zap.Error(err))
        return false, err
    }

    return useMailserver, nil
}

func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {
    shouldSync, err := m.shouldSync()
    if err != nil {
        m.logger.Error("failed to get should sync", zap.Error(err))
        return false, err
    }

    if !shouldSync {
        return false, nil
    }

    go func() {
        ms := m.getActiveMailserver(chat.CommunityID)
        _, err = m.performMailserverRequest(ms, func(mailServer mailservers.Mailserver) (*MessengerResponse, error) {
            response, err := m.syncChatWithFilters(mailServer, chat.ID)

            if err != nil {
                m.logger.Error("failed to sync chat", zap.Error(err))
                return nil, err
            }

            if m.config.messengerSignalsHandler != nil {
                m.config.messengerSignalsHandler.MessengerResponse(response)
            }
            return response, nil
        })
        if err != nil {
            m.logger.Error("failed to perform mailserver request", zap.Error(err))
        }
    }()
    return true, nil
}

func (m *Messenger) connectToNewMailserverAndWait() error {
    // Handle pinned mailservers
    m.logger.Info("disconnecting mailserver")
    pinnedMailserver, err := m.getPinnedMailserver()
    if err != nil {
        m.logger.Error("could not obtain the pinned mailserver", zap.Error(err))
        return err
    }
    // If pinned mailserver is not nil, no need to disconnect and wait for it to be available
    if pinnedMailserver == nil {
        m.disconnectActiveMailserver()
    }

    return m.findNewMailserver()
}

func (m *Messenger) performMailserverRequest(ms *mailservers.Mailserver, fn func(mailServer mailservers.Mailserver) (*MessengerResponse, error)) (*MessengerResponse, error) {
    if ms == nil {
        return nil, errors.New("mailserver not available")
    }

    m.mailserverCycle.RLock()
    defer m.mailserverCycle.RUnlock()
    var tries uint = 0
    for tries < mailserverMaxTries {
        if !m.communityStorenodes.IsCommunityStoreNode(ms.ID) && !m.isMailserverAvailable(ms.ID) {
            return nil, errors.New("mailserver not available")
        }
        m.logger.Info("trying performing mailserver requests", zap.Uint("try", tries), zap.String("mailserverID", ms.ID))

        // Peform request
        response, err := fn(*ms) // pass by value because we don't want the fn to modify the mailserver
        if err == nil {
            // Reset failed requests
            m.logger.Debug("mailserver request performed successfully",
                zap.String("mailserverID", ms.ID))
            ms.FailedRequests = 0
            return response, nil
        }

        m.logger.Error("failed to perform mailserver request",
            zap.String("mailserverID", ms.ID),
            zap.Uint("tries", tries),
            zap.Error(err),
        )

        tries++
        // Increment failed requests
        ms.FailedRequests++

        // Change mailserver
        if ms.FailedRequests >= mailserverMaxFailedRequests {
            return nil, errors.New("too many failed requests")
        }
        // Wait a couple of second not to spam
        time.Sleep(2 * time.Second)

    }
    return nil, errors.New("failed to perform mailserver request")
}

func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) (bool, error) {
    shouldSync, err := m.shouldSync()
    if err != nil {
        m.logger.Error("failed to get shouldSync", zap.Error(err))
        return false, err
    }

    if !shouldSync {
        return false, nil
    }

    go func() {
        // split filters by community store node so we can request the filters to the correct mailserver
        filtersByMs := m.SplitFiltersByStoreNode(filters)
        for communityID, filtersForMs := range filtersByMs {
            ms := m.getActiveMailserver(communityID)
            _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
                response, err := m.syncFilters(ms, filtersForMs)

                if err != nil {
                    m.logger.Error("failed to sync filter", zap.Error(err))
                    return nil, err
                }

                if m.config.messengerSignalsHandler != nil {
                    m.config.messengerSignalsHandler.MessengerResponse(response)
                }
                return response, nil
            })
            if err != nil {
                m.logger.Error("failed to perform mailserver request", zap.Error(err))
            }
        }

    }()
    return true, nil
}

func (m *Messenger) calculateMailserverTo() uint32 {
    seconds := float64(m.GetCurrentTimeInMillis()) / 1000
    return uint32(math.Ceil(seconds))
}

func (m *Messenger) calculateMailserverTimeBounds(duration time.Duration) (uint32, uint32) {
    now := float64(m.GetCurrentTimeInMillis()) / 1000
    to := uint32(math.Ceil(now))
    from := uint32(math.Floor(now)) - uint32(duration.Seconds())
    return from, to
}

func (m *Messenger) filtersForChat(chatID string) ([]*transport.Filter, error) {
    chat, ok := m.allChats.Load(chatID)
    if !ok {
        return nil, ErrChatNotFound
    }
    var filters []*transport.Filter

    if chat.OneToOne() {
        // We sync our own topic and any eventual negotiated
        publicKeys := []string{common.PubkeyToHex(&m.identity.PublicKey), chatID}

        filters = m.transport.FiltersByIdentities(publicKeys)

    } else if chat.PrivateGroupChat() {
        var publicKeys []string
        for _, m := range chat.Members {
            publicKeys = append(publicKeys, m.ID)
        }

        filters = m.transport.FiltersByIdentities(publicKeys)

    } else {
        filter := m.transport.FilterByChatID(chatID)
        if filter == nil {
            return nil, ErrNoFiltersForChat
        }
        filters = []*transport.Filter{filter}
    }

    return filters, nil
}

func (m *Messenger) topicsForChat(chatID string) (string, []types.TopicType, error) {
    filters, err := m.filtersForChat(chatID)
    if err != nil {
        return "", nil, err
    }

    var contentTopics []types.TopicType

    for _, filter := range filters {
        contentTopics = append(contentTopics, filter.ContentTopic)
    }

    return filters[0].PubsubTopic, contentTopics, nil
}

func (m *Messenger) syncChatWithFilters(ms mailservers.Mailserver, chatID string) (*MessengerResponse, error) {
    filters, err := m.filtersForChat(chatID)
    if err != nil {
        return nil, err
    }

    return m.syncFilters(ms, filters)
}

func (m *Messenger) syncBackup() error {

    filter := m.transport.PersonalTopicFilter()
    if filter == nil {
        return errors.New("personal topic filter not loaded")
    }

    from, to := m.calculateMailserverTimeBounds(oneMonthDuration)

    batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}}
    ms := m.getActiveMailserver(filter.ChatID)
    err := m.processMailserverBatch(*ms, batch)
    if err != nil {
        return err
    }
    return m.settings.SetBackupFetched(true)
}

func (m *Messenger) defaultSyncPeriodFromNow() (uint32, error) {
    defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod()
    if err != nil {
        return 0, err
    }
    return uint32(m.getTimesource().GetCurrentTime()/1000) - defaultSyncPeriod, nil
}

// capToDefaultSyncPeriod caps the sync period to the default
func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) {
    d, err := m.defaultSyncPeriodFromNow()
    if err != nil {
        return 0, err
    }
    if d > period {
        return d, nil
    }
    return period - tolerance, nil
}

func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) {
    for _, filter := range filters {
        chatID := filter.ChatID
        chat := m.Chat(chatID)
        if chat != nil {
            filter.Priority = chat.ReadMessagesAtClockValue
        }
    }
}

func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) {
    for _, filter := range filters {
        filter.Priority = 0
    }
}

func (m *Messenger) SplitFiltersByStoreNode(filters []*transport.Filter) map[string][]*transport.Filter {
    // split filters by community store node so we can request the filters to the correct mailserver
    filtersByMs := make(map[string][]*transport.Filter, len(filters))
    for _, f := range filters {
        communityID := "" // none by default
        if chat, ok := m.allChats.Load(f.ChatID); ok && chat.CommunityChat() && m.communityStorenodes.HasStorenodeSetup(chat.CommunityID) {
            communityID = chat.CommunityID
        }
        if _, exists := filtersByMs[communityID]; !exists {
            filtersByMs[communityID] = make([]*transport.Filter, 0, len(filters))
        }
        filtersByMs[communityID] = append(filtersByMs[communityID], f)
    }
    return filtersByMs
}

// RequestAllHistoricMessages requests all the historic messages for any topic
func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries bool) (*MessengerResponse, error) {
    shouldSync, err := m.shouldSync()
    if err != nil {
        return nil, err
    }

    if !shouldSync {
        return nil, nil
    }

    backupFetched, err := m.settings.BackupFetched()
    if err != nil {
        return nil, err
    }

    if forceFetchingBackup || !backupFetched {
        m.logger.Info("fetching backup")
        err := m.syncBackup()
        if err != nil {
            return nil, err
        }
        m.logger.Info("backup fetched")
    }

    filters := m.transport.Filters()
    m.updateFiltersPriority(filters)
    defer m.resetFiltersPriority(filters)

    filtersByMs := m.SplitFiltersByStoreNode(filters)
    allResponses := &MessengerResponse{}
    for communityID, filtersForMs := range filtersByMs {
        ms := m.getActiveMailserver(communityID)
        if withRetries {
            response, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
                return m.syncFilters(ms, filtersForMs)
            })
            if err != nil {
                return nil, err
            }
            allResponses.AddChats(response.Chats())
            allResponses.AddMessages(response.Messages())
            continue
        }
        response, err := m.syncFilters(*ms, filtersForMs)
        if err != nil {
            return nil, err
        }
        allResponses.AddChats(response.Chats())
        allResponses.AddMessages(response.Messages())
    }
    return allResponses, nil
}

const missingMessageCheckPeriod = 30 * time.Second

func (m *Messenger) checkForMissingMessagesLoop() {
    t := time.NewTicker(missingMessageCheckPeriod)
    defer t.Stop()

    for {
        select {
        case <-m.quit:
            return

        case <-t.C:
            filters := m.transport.Filters()
            filtersByMs := m.SplitFiltersByStoreNode(filters)
            for communityID, filtersForMs := range filtersByMs {
                ms := m.getActiveMailserver(communityID)
                if ms == nil {
                    continue
                }

                peerID, err := ms.PeerID()
                if err != nil {
                    m.logger.Error("could not obtain the peerID")
                    return
                }
                m.transport.SetCriteriaForMissingMessageVerification(peerID, filtersForMs)
            }
        }
    }
}

func getPrioritizedBatches() []int {
    return []int{1, 5, 10}
}

func (m *Messenger) syncFiltersFrom(ms mailservers.Mailserver, filters []*transport.Filter, lastRequest uint32) (*MessengerResponse, error) {
    response := &MessengerResponse{}
    topicInfo, err := m.mailserversDatabase.Topics()
    if err != nil {
        return nil, err
    }

    topicsData := make(map[string]mailservers.MailserverTopic)
    for _, topic := range topicInfo {
        topicsData[fmt.Sprintf("%s-%s", topic.PubsubTopic, topic.ContentTopic)] = topic
    }

    batches := make(map[string]map[int]MailserverBatch)

    to := m.calculateMailserverTo()
    var syncedTopics []mailservers.MailserverTopic

    sort.Slice(filters[:], func(i, j int) bool {
        p1 := filters[i].Priority
        p2 := filters[j].Priority
        return p1 > p2
    })
    prioritizedBatches := getPrioritizedBatches()
    currentBatch := 0

    if len(filters) == 0 || filters[0].Priority == 0 {
        currentBatch = len(prioritizedBatches)
    }

    defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow()
    if err != nil {
        return nil, err
    }

    contentTopicsPerPubsubTopic := make(map[string]map[string]*transport.Filter)
    for _, filter := range filters {
        if !filter.Listen || filter.Ephemeral {
            continue
        }

        contentTopics, ok := contentTopicsPerPubsubTopic[filter.PubsubTopic]
        if !ok {
            contentTopics = make(map[string]*transport.Filter)
        }
        contentTopics[filter.ContentTopic.String()] = filter
        contentTopicsPerPubsubTopic[filter.PubsubTopic] = contentTopics
    }

    for pubsubTopic, contentTopics := range contentTopicsPerPubsubTopic {
        if _, ok := batches[pubsubTopic]; !ok {
            batches[pubsubTopic] = make(map[int]MailserverBatch)
        }

        for _, filter := range contentTopics {
            var chatID string
            // If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter
            if len(filter.Identity) != 0 {
                chatID = filter.Identity
            } else {
                chatID = filter.ChatID
            }

            topicData, ok := topicsData[filter.PubsubTopic+filter.ContentTopic.String()]
            var capToDefaultSyncPeriod = true
            if !ok {
                if lastRequest == 0 {
                    lastRequest = defaultPeriodFromNow
                }
                topicData = mailservers.MailserverTopic{
                    PubsubTopic:  filter.PubsubTopic,
                    ContentTopic: filter.ContentTopic.String(),
                    LastRequest:  int(defaultPeriodFromNow),
                }
            } else if lastRequest != 0 {
                topicData.LastRequest = int(lastRequest)
                capToDefaultSyncPeriod = false
            }

            batchID := topicData.LastRequest

            if currentBatch < len(prioritizedBatches) {
                batch, ok := batches[pubsubTopic][currentBatch]
                if ok {
                    prevTopicData, ok := topicsData[batch.PubsubTopic+batch.Topics[0].String()]
                    if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
                        (ok && prevTopicData.LastRequest != topicData.LastRequest) {
                        currentBatch++
                    }
                }
                if currentBatch < len(prioritizedBatches) {
                    batchID = currentBatch
                    currentBatchCap := prioritizedBatches[currentBatch] - 1
                    if currentBatchCap == 0 {
                        currentBatch++
                    } else {
                        prioritizedBatches[currentBatch] = currentBatchCap
                    }
                }
            }

            batch, ok := batches[pubsubTopic][batchID]
            if !ok {
                from := uint32(topicData.LastRequest)
                if capToDefaultSyncPeriod {
                    from, err = m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
                    if err != nil {
                        return nil, err
                    }
                }
                batch = MailserverBatch{From: from, To: to}
            }

            batch.ChatIDs = append(batch.ChatIDs, chatID)
            batch.PubsubTopic = pubsubTopic
            batch.Topics = append(batch.Topics, filter.ContentTopic)
            batches[pubsubTopic][batchID] = batch

            // Set last request to the new `to`
            topicData.LastRequest = int(to)
            syncedTopics = append(syncedTopics, topicData)
        }
    }

    if m.config.messengerSignalsHandler != nil {
        m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
    }

    var batches24h []MailserverBatch
    for pubsubTopic := range batches {
        batchKeys := make([]int, 0, len(batches[pubsubTopic]))
        for k := range batches[pubsubTopic] {
            batchKeys = append(batchKeys, k)
        }
        sort.Ints(batchKeys)

        keysToIterate := append([]int{}, batchKeys...)
        for {
            // For all batches
            var tmpKeysToIterate []int
            for _, k := range keysToIterate {
                batch := batches[pubsubTopic][k]

                dayBatch := MailserverBatch{
                    To:          batch.To,
                    Cursor:      batch.Cursor,
                    PubsubTopic: batch.PubsubTopic,
                    Topics:      batch.Topics,
                    ChatIDs:     batch.ChatIDs,
                }

                from := batch.To - uint32(oneDayDuration.Seconds())
                if from > batch.From {
                    dayBatch.From = from
                    batches24h = append(batches24h, dayBatch)

                    // Replace og batch with new dates
                    batch.To = from
                    batches[pubsubTopic][k] = batch
                    tmpKeysToIterate = append(tmpKeysToIterate, k)
                } else {
                    batches24h = append(batches24h, batch)
                }
            }

            if len(tmpKeysToIterate) == 0 {
                break
            }
            keysToIterate = tmpKeysToIterate
        }
    }

    for _, batch := range batches24h {
        err := m.processMailserverBatch(ms, batch)
        if err != nil {
            m.logger.Error("error syncing topics", zap.Error(err))
            return nil, err
        }
    }

    m.logger.Debug("topics synced")
    if m.config.messengerSignalsHandler != nil {
        m.config.messengerSignalsHandler.HistoryRequestCompleted()
    }

    err = m.mailserversDatabase.AddTopics(syncedTopics)
    if err != nil {
        return nil, err
    }

    var messagesToBeSaved []*common.Message
    for _, batches := range batches {
        for _, batch := range batches {
            for _, id := range batch.ChatIDs {
                chat, ok := m.allChats.Load(id)
                if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() {
                    continue
                }
                gap, err := m.calculateGapForChat(chat, batch.From)
                if err != nil {
                    return nil, err
                }
                if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
                    chat.SyncedFrom = batch.From
                }

                chat.SyncedTo = to

                err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID)
                if err != nil {
                    return nil, err
                }

                response.AddChat(chat)
                if gap != nil {
                    response.AddMessage(gap)
                    messagesToBeSaved = append(messagesToBeSaved, gap)
                }
            }
        }
    }

    if len(messagesToBeSaved) > 0 {
        err := m.persistence.SaveMessages(messagesToBeSaved)
        if err != nil {
            return nil, err
        }
    }
    return response, nil
}

func (m *Messenger) syncFilters(ms mailservers.Mailserver, filters []*transport.Filter) (*MessengerResponse, error) {
    return m.syncFiltersFrom(ms, filters, 0)
}

func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Message, error) {
    // Chat was never synced, no gap necessary
    if chat.SyncedTo == 0 {
        return nil, nil
    }

    // If we filled the gap, nothing to do
    if chat.SyncedTo >= from {
        return nil, nil
    }

    timestamp := m.getTimesource().GetCurrentTime()

    message := &common.Message{
        ChatMessage: &protobuf.ChatMessage{
            ChatId:      chat.ID,
            Text:        "Gap message",
            MessageType: protobuf.MessageType_SYSTEM_MESSAGE_GAP,
            ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_GAP,
            Clock:       uint64(from) * 1000,
            Timestamp:   timestamp,
        },
        GapParameters: &common.GapParameters{
            From: chat.SyncedTo,
            To:   from,
        },
        From:             common.PubkeyToHex(&m.identity.PublicKey),
        WhisperTimestamp: timestamp,
        LocalChatID:      chat.ID,
        Seen:             true,
        ID:               types.EncodeHex(crypto.Keccak256([]byte(fmt.Sprintf("%s-%d-%d", chat.ID, chat.SyncedTo, from)))),
    }

    return message, m.persistence.SaveMessages([]*common.Message{message})
}

type work struct {
    pubsubTopic   string
    contentTopics []types.TopicType
    cursor        []byte
    storeCursor   *types.StoreRequestCursor
    limit         uint32
}

type messageRequester interface {
    SendMessagesRequestForTopics(
        ctx context.Context,
        peerID []byte,
        from, to uint32,
        previousCursor []byte,
        previousStoreCursor *types.StoreRequestCursor,
        pubsubTopic string,
        contentTopics []types.TopicType,
        limit uint32,
        waitForResponse bool,
        processEnvelopes bool,
    ) (cursor []byte, storeCursor *types.StoreRequestCursor, envelopesCount int, err error)
}

func processMailserverBatch(
    ctx context.Context,
    messageRequester messageRequester,
    batch MailserverBatch,
    mailserverID []byte,
    logger *zap.Logger,
    pageLimit uint32,
    shouldProcessNextPage func(int) (bool, uint32),
    processEnvelopes bool,
) error {

    var topicStrings []string
    for _, t := range batch.Topics {
        topicStrings = append(topicStrings, t.String())
    }
    logger = logger.With(zap.Any("chatIDs", batch.ChatIDs),
        zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)),
        zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)),
        zap.Any("topic", topicStrings),
        zap.Int64("from", int64(batch.From)),
        zap.Int64("to", int64(batch.To)))

    logger.Info("syncing topic")

    wg := sync.WaitGroup{}
    workWg := sync.WaitGroup{}
    workCh := make(chan work, 1000)       // each batch item is split in 10 topics bunch and sent to this channel
    workCompleteCh := make(chan struct{}) // once all batch items are processed, this channel is triggered
    semaphore := make(chan int, 3)        // limit the number of concurrent queries
    errCh := make(chan error)

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Producer
    wg.Add(1)
    go func() {
        defer func() {
            logger.Debug("mailserver batch producer complete")
            wg.Done()
        }()

        allWorks := int(math.Ceil(float64(len(batch.Topics)) / float64(maxTopicsPerRequest)))
        workWg.Add(allWorks)

        for i := 0; i < len(batch.Topics); i += maxTopicsPerRequest {
            j := i + maxTopicsPerRequest
            if j > len(batch.Topics) {
                j = len(batch.Topics)
            }

            select {
            case <-ctx.Done():
                logger.Debug("processBatch producer - context done")
                return
            default:
                logger.Debug("processBatch producer - creating work")
                workCh <- work{
                    pubsubTopic:   batch.PubsubTopic,
                    contentTopics: batch.Topics[i:j],
                    limit:         pageLimit,
                }
                time.Sleep(50 * time.Millisecond)
            }
        }

        go func() {
            workWg.Wait()
            workCompleteCh <- struct{}{}
        }()

        logger.Debug("processBatch producer complete")
    }()

    var result error

loop:
    for {
        select {
        case <-ctx.Done():
            logger.Debug("processBatch cleanup - context done")
            result = ctx.Err()
            if errors.Is(result, context.Canceled) {
                result = nil
            }
            break loop
        case w, ok := <-workCh:
            if !ok {
                continue
            }

            logger.Debug("processBatch - received work")
            semaphore <- 1
            go func(w work) { // Consumer
                defer func() {
                    workWg.Done()
                    <-semaphore
                }()

                queryCtx, queryCancel := context.WithTimeout(ctx, mailserverRequestTimeout)
                cursor, storeCursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, mailserverID, batch.From, batch.To, w.cursor, w.storeCursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)

                queryCancel()

                if err != nil {
                    logger.Debug("failed to send request", zap.Error(err))
                    errCh <- err
                    return
                }

                processNextPage := true
                nextPageLimit := pageLimit

                if shouldProcessNextPage != nil {
                    processNextPage, nextPageLimit = shouldProcessNextPage(envelopesCount)
                }

                if !processNextPage {
                    return
                }

                // Check the cursor after calling `shouldProcessNextPage`.
                // The app might use process the fetched envelopes in the callback for own needs.
                if len(cursor) == 0 && storeCursor == nil {
                    return
                }

                logger.Debug("processBatch producer - creating work (cursor)")

                workWg.Add(1)
                workCh <- work{
                    pubsubTopic:   w.pubsubTopic,
                    contentTopics: w.contentTopics,
                    cursor:        cursor,
                    storeCursor:   storeCursor,
                    limit:         nextPageLimit,
                }
            }(w)
        case err := <-errCh:
            logger.Debug("processBatch - received error", zap.Error(err))
            cancel() // Kill go routines
            return err
        case <-workCompleteCh:
            logger.Debug("processBatch - all jobs complete")
            cancel() // Kill go routines
        }
    }

    wg.Wait()

    // NOTE(camellos): Disabling for now, not critical and I'd rather take a bit more time
    // to test it
    //logger.Info("waiting until message processed")
    //m.waitUntilP2PMessagesProcessed()

    logger.Info("synced topic", zap.NamedError("hasError", result))
    return result
}

func (m *Messenger) DisableStoreNodes() {
    m.featureFlags.StoreNodesDisabled = true
}

func (m *Messenger) processMailserverBatch(ms mailservers.Mailserver, batch MailserverBatch) error {
    if m.featureFlags.StoreNodesDisabled {
        return nil
    }

    mailserverID, err := ms.IDBytes()
    if err != nil {
        return err
    }
    logger := m.logger.With(zap.String("mailserverID", ms.ID))
    return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, defaultStoreNodeRequestPageSize, nil, false)
}

func (m *Messenger) processMailserverBatchWithOptions(ms mailservers.Mailserver, batch MailserverBatch, pageLimit uint32, shouldProcessNextPage func(int) (bool, uint32), processEnvelopes bool) error {
    if m.featureFlags.StoreNodesDisabled {
        return nil
    }

    mailserverID, err := ms.IDBytes()
    if err != nil {
        return err
    }
    logger := m.logger.With(zap.String("mailserverID", ms.ID))
    return processMailserverBatch(m.ctx, m.transport, batch, mailserverID, logger, pageLimit, shouldProcessNextPage, processEnvelopes)
}

type MailserverBatch struct {
    From        uint32
    To          uint32
    Cursor      string
    PubsubTopic string
    Topics      []types.TopicType
    ChatIDs     []string
}

func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
    chat, ok := m.allChats.Load(chatID)
    if !ok {
        return 0, ErrChatNotFound
    }

    ms := m.getActiveMailserver(chat.CommunityID)
    var from uint32
    _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
        pubsubTopic, topics, err := m.topicsForChat(chatID)
        if err != nil {
            return nil, nil
        }

        defaultSyncPeriod, err := m.settings.GetDefaultSyncPeriod()
        if err != nil {
            return nil, err
        }

        batch := MailserverBatch{
            ChatIDs:     []string{chatID},
            To:          chat.SyncedFrom,
            From:        chat.SyncedFrom - defaultSyncPeriod,
            PubsubTopic: pubsubTopic,
            Topics:      topics,
        }
        if m.config.messengerSignalsHandler != nil {
            m.config.messengerSignalsHandler.HistoryRequestStarted(1)
        }

        err = m.processMailserverBatch(ms, batch)
        if err != nil {
            return nil, err
        }

        if m.config.messengerSignalsHandler != nil {
            m.config.messengerSignalsHandler.HistoryRequestCompleted()
        }
        if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
            chat.SyncedFrom = batch.From
        }

        m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID))

        err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID)
        from = batch.From
        return nil, err
    })
    if err != nil {
        return 0, err
    }

    return from, nil
}

func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
    messages, err := m.persistence.MessagesByIDs(messageIDs)
    if err != nil {
        return err
    }

    chat, ok := m.allChats.Load(chatID)
    if !ok {
        return errors.New("chat not existing")
    }

    pubsubTopic, topics, err := m.topicsForChat(chatID)
    if err != nil {
        return err
    }

    var lowestFrom, highestTo uint32

    for _, message := range messages {
        if message.GapParameters == nil {
            return errors.New("can't sync non-gap message")
        }

        if lowestFrom == 0 || lowestFrom > message.GapParameters.From {
            lowestFrom = message.GapParameters.From
        }

        if highestTo < message.GapParameters.To {
            highestTo = message.GapParameters.To
        }
    }

    batch := MailserverBatch{
        ChatIDs:     []string{chatID},
        To:          highestTo,
        From:        lowestFrom,
        PubsubTopic: pubsubTopic,
        Topics:      topics,
    }

    if m.config.messengerSignalsHandler != nil {
        m.config.messengerSignalsHandler.HistoryRequestStarted(1)
    }

    ms := m.getActiveMailserver(chat.CommunityID)
    err = m.processMailserverBatch(*ms, batch)
    if err != nil {
        return err
    }

    if m.config.messengerSignalsHandler != nil {
        m.config.messengerSignalsHandler.HistoryRequestCompleted()
    }

    return m.persistence.DeleteMessages(messageIDs)
}

func (m *Messenger) waitUntilP2PMessagesProcessed() { // nolint: unused

    ticker := time.NewTicker(50 * time.Millisecond)

    for { //nolint: gosimple
        select {
        case <-ticker.C:
            if !m.transport.ProcessingP2PMessages() {
                ticker.Stop()
                return
            }
        }
    }
}

func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
    return m.transport.LoadFilters(filters)
}

func (m *Messenger) ToggleUseMailservers(value bool) error {
    m.mailserverCycle.Lock()
    defer m.mailserverCycle.Unlock()

    err := m.settings.SetUseMailservers(value)
    if err != nil {
        return err
    }

    if value {
        m.cycleMailservers()
        return nil
    }

    m.disconnectActiveMailserver()
    return nil
}

func (m *Messenger) SetPinnedMailservers(mailservers map[string]string) error {
    err := m.settings.SetPinnedMailservers(mailservers)
    if err != nil {
        return err
    }

    m.cycleMailservers()
    return nil
}

func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
    return m.transport.RemoveFilters(filters)
}

func (m *Messenger) ConnectionChanged(state connection.State) {
    m.transport.ConnectionChanged(state)
    if !m.connectionState.Offline && state.Offline {
        m.sender.StopDatasync()
    }

    if m.connectionState.Offline && !state.Offline {
        err := m.sender.StartDatasync(m.mvdsStatusChangeEvent, m.sendDataSync)
        if err != nil {
            m.logger.Error("failed to start datasync", zap.Error(err))
        }
    }

    m.connectionState = state
}

func (m *Messenger) fetchMessages(chatID string, duration time.Duration) (uint32, error) {
    from, to := m.calculateMailserverTimeBounds(duration)

    chat, ok := m.allChats.Load(chatID)
    if !ok {
        return 0, ErrChatNotFound
    }

    ms := m.getActiveMailserver(chat.CommunityID)
    _, err := m.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
        m.logger.Debug("fetching messages", zap.String("chatID", chatID), zap.String("mailserver", ms.Name))
        pubsubTopic, topics, err := m.topicsForChat(chatID)
        if err != nil {
            return nil, nil
        }

        batch := MailserverBatch{
            ChatIDs:     []string{chatID},
            From:        from,
            To:          to,
            PubsubTopic: pubsubTopic,
            Topics:      topics,
        }
        if m.config.messengerSignalsHandler != nil {
            m.config.messengerSignalsHandler.HistoryRequestStarted(1)
        }

        err = m.processMailserverBatch(ms, batch)
        if err != nil {
            return nil, err
        }

        if m.config.messengerSignalsHandler != nil {
            m.config.messengerSignalsHandler.HistoryRequestCompleted()
        }
        if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
            chat.SyncedFrom = batch.From
        }

        m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID))

        err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID)
        from = batch.From
        return nil, err
    })
    if err != nil {
        return 0, err
    }

    return from, nil
}