status-im/status-go

View on GitHub
services/wallet/activity/session.go

Summary

Maintainability
A
0 mins
Test Coverage
A
95%
package activity

import (
    "context"
    "errors"
    "strconv"
    "time"

    eth "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/event"
    "github.com/ethereum/go-ethereum/log"
    "github.com/status-im/status-go/services/wallet/async"
    "github.com/status-im/status-go/services/wallet/common"
    "github.com/status-im/status-go/services/wallet/transfer"
    "github.com/status-im/status-go/services/wallet/walletevent"
    "github.com/status-im/status-go/transactions"
)

const nilStr = "nil"

type EntryIdentity struct {
    payloadType PayloadType
    transaction *transfer.TransactionIdentity
    id          common.MultiTransactionIDType
}

func (e EntryIdentity) same(a EntryIdentity) bool {
    return a.payloadType == e.payloadType &&
        ((a.transaction == nil && e.transaction == nil) ||
            (a.transaction.ChainID == e.transaction.ChainID &&
                a.transaction.Hash == e.transaction.Hash &&
                a.transaction.Address == e.transaction.Address)) &&
        a.id == e.id
}

func (e EntryIdentity) key() string {
    txID := nilStr
    if e.transaction != nil {
        txID = strconv.FormatUint(uint64(e.transaction.ChainID), 10) + e.transaction.Hash.Hex() + e.transaction.Address.Hex()
    }
    return strconv.Itoa(e.payloadType) + txID + strconv.FormatInt(int64(e.id), 16)
}

type SessionID int32

// Session stores state related to a filter session
// The user happy flow is:
// 1. StartFilterSession to get a new SessionID and client be notified by the current state
// 2. GetMoreForFilterSession anytime to get more entries after the first page
// 3. UpdateFilterForSession to update the filter and get the new state or clean the filter and get the newer entries
// 4. ResetFilterSession in case client receives SessionUpdate with HasNewOnTop = true to get the latest state
// 5. StopFilterSession to stop the session when no used (user changed from activity screens or changed addresses and chains)
type Session struct {
    id SessionID

    // Filter info
    //
    addresses []eth.Address
    chainIDs  []common.ChainID
    filter    Filter

    // model is a mirror of the data model presentation has (sent by EventActivityFilteringDone)
    model []EntryIdentity
    // noFilterModel is a mirror of the data model presentation has when filter is empty
    noFilterModel map[string]EntryIdentity
    // new holds the new entries until user requests update by calling ResetFilterSession
    new []EntryIdentity
}

type EntryUpdate struct {
    Pos   int    `json:"pos"`
    Entry *Entry `json:"entry"`
}

// SessionUpdate payload for EventActivitySessionUpdated
type SessionUpdate struct {
    HasNewOnTop *bool           `json:"hasNewOnTop,omitempty"`
    New         []*EntryUpdate  `json:"new,omitempty"`
    Removed     []EntryIdentity `json:"removed,omitempty"`
}

type fullFilterParams struct {
    sessionID SessionID
    addresses []eth.Address
    chainIDs  []common.ChainID
    filter    Filter
}

func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) {
    s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) {
        allAddresses := s.areAllAddresses(f.addresses)
        activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count)
        return activities, err
    }, func(result interface{}, taskType async.TaskType, err error) {
        res := FilterResponse{
            ErrorCode: ErrorCodeFailed,
        }

        if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
            res.ErrorCode = ErrorCodeTaskCanceled
        } else if err == nil {
            activities := result.([]Entry)
            res.Activities = activities
            res.HasMore = len(activities) == count
            res.ErrorCode = ErrorCodeSuccess

            res.Offset = processResults(activities)
        }

        int32SessionID := int32(f.sessionID)
        sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err)

        s.getActivityDetailsAsync(int32SessionID, res.Activities)
    })
}

// mirrorIdentities for update use
func mirrorIdentities(entries []Entry) []EntryIdentity {
    model := make([]EntryIdentity, 0, len(entries))
    for _, a := range entries {
        model = append(model, EntryIdentity{
            payloadType: a.payloadType,
            transaction: a.transaction,
            id:          a.id,
        })
    }
    return model
}

func (s *Service) internalFilterForSession(session *Session, firstPageCount int) {
    s.internalFilter(
        fullFilterParams{
            sessionID: session.id,
            addresses: session.addresses,
            chainIDs:  session.chainIDs,
            filter:    session.filter,
        },
        0,
        firstPageCount,
        func(entries []Entry) (offset int) {
            s.sessionsRWMutex.Lock()
            defer s.sessionsRWMutex.Unlock()

            session.model = mirrorIdentities(entries)

            return 0
        },
    )
}

func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID {
    sessionID := s.nextSessionID()

    session := &Session{
        id: sessionID,

        addresses: addresses,
        chainIDs:  chainIDs,
        filter:    filter,

        model: make([]EntryIdentity, 0, firstPageCount),
    }

    s.sessionsRWMutex.Lock()
    subscribeToEvents := len(s.sessions) == 0

    s.sessions[sessionID] = session

    if subscribeToEvents {
        s.subscribeToEvents()
    }
    s.sessionsRWMutex.Unlock()

    s.internalFilterForSession(session, firstPageCount)

    return sessionID
}

// UpdateFilterForSession is to be called for updating the filter of a specific session
// After calling this method to set a filter all the incoming changes will be reported with
// Entry.isNew = true when filter is reset to empty
func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error {
    s.sessionsRWMutex.RLock()
    session, found := s.sessions[id]
    if !found {
        s.sessionsRWMutex.RUnlock()
        return errors.New("session not found")
    }

    prevFilterEmpty := session.filter.IsEmpty()
    newFilerEmpty := filter.IsEmpty()
    s.sessionsRWMutex.RUnlock()

    s.sessionsRWMutex.Lock()

    session.new = nil

    session.filter = filter

    if prevFilterEmpty && !newFilerEmpty {
        // Session is moving from empty to non-empty filter
        // Take a snapshot of the current model
        session.noFilterModel = entryIdsToMap(session.model)

        session.model = make([]EntryIdentity, 0, firstPageCount)

        // In this case there is nothing to flag so we request the first page
        s.internalFilterForSession(session, firstPageCount)
    } else if !prevFilterEmpty && newFilerEmpty {
        // Session is moving from non-empty to empty filter
        // In this case we need to flag all the new entries that are not in the noFilterModel
        s.internalFilter(
            fullFilterParams{
                sessionID: session.id,
                addresses: session.addresses,
                chainIDs:  session.chainIDs,
                filter:    session.filter,
            },
            0,
            firstPageCount,
            func(entries []Entry) (offset int) {
                s.sessionsRWMutex.Lock()
                defer s.sessionsRWMutex.Unlock()

                // Mark new entries
                for i, a := range entries {
                    _, found := session.noFilterModel[a.getIdentity().key()]
                    entries[i].isNew = !found
                }

                // Mirror identities for update use
                session.model = mirrorIdentities(entries)
                session.noFilterModel = nil
                return 0
            },
        )
    } else {
        // Else act as a normal filter update
        s.internalFilterForSession(session, firstPageCount)
    }
    s.sessionsRWMutex.Unlock()

    return nil
}

// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to
// update client with the latest state including new on top entries
func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error {
    session, found := s.sessions[id]
    if !found {
        return errors.New("session not found")
    }

    s.internalFilter(
        fullFilterParams{
            sessionID: id,
            addresses: session.addresses,
            chainIDs:  session.chainIDs,
            filter:    session.filter,
        },
        0,
        firstPageCount,
        func(entries []Entry) (offset int) {
            s.sessionsRWMutex.Lock()
            defer s.sessionsRWMutex.Unlock()

            // Mark new entries
            newMap := entryIdsToMap(session.new)
            for i, a := range entries {
                _, isNew := newMap[a.getIdentity().key()]
                entries[i].isNew = isNew
            }
            session.new = nil

            if session.noFilterModel != nil {
                // Add reported new entries to mark them as seen
                for _, a := range newMap {
                    session.noFilterModel[a.key()] = a
                }
            }

            // Mirror client identities for checking updates
            session.model = mirrorIdentities(entries)

            return 0
        },
    )
    return nil
}

func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error {
    session, found := s.sessions[id]
    if !found {
        return errors.New("session not found")
    }

    prevModelLen := len(session.model)
    s.internalFilter(
        fullFilterParams{
            sessionID: id,
            addresses: session.addresses,
            chainIDs:  session.chainIDs,
            filter:    session.filter,
        },
        prevModelLen+len(session.new),
        pageCount,
        func(entries []Entry) (offset int) {
            s.sessionsRWMutex.Lock()
            defer s.sessionsRWMutex.Unlock()

            // Mirror client identities for checking updates
            for _, a := range entries {
                session.model = append(session.model, EntryIdentity{
                    payloadType: a.payloadType,
                    transaction: a.transaction,
                    id:          a.id,
                })
            }

            // Overwrite the offset to account for new entries
            return prevModelLen
        },
    )
    return nil
}

// subscribeToEvents should be called with sessionsRWMutex locked for writing
func (s *Service) subscribeToEvents() {
    s.ch = make(chan walletevent.Event, 100)
    s.subscriptions = s.eventFeed.Subscribe(s.ch)
    go s.processEvents()
}

// processEvents runs only if more than one session is active
func (s *Service) processEvents() {
    eventCount := 0
    lastUpdate := time.Now().UnixMilli()
    for event := range s.ch {
        if event.Type == transactions.EventPendingTransactionUpdate ||
            event.Type == transactions.EventPendingTransactionStatusChanged ||
            event.Type == transfer.EventNewTransfers {
            eventCount++
        }
        // debounce events updates
        if eventCount > 0 &&
            (time.Duration(time.Now().UnixMilli()-lastUpdate)*time.Millisecond) >= s.debounceDuration {
            s.detectNew(eventCount)
            eventCount = 0
            lastUpdate = time.Now().UnixMilli()
        }
    }
}

func (s *Service) detectNew(changeCount int) {
    for sessionID := range s.sessions {
        session := s.sessions[sessionID]

        fetchLen := len(session.model) + changeCount
        allAddresses := s.areAllAddresses(session.addresses)
        activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, allAddresses, session.chainIDs, session.filter, 0, fetchLen)
        if err != nil {
            log.Error("Error getting activity entries", "error", err)
            continue
        }

        s.sessionsRWMutex.RLock()
        allData := append(session.new, session.model...)
        new, _ /*removed*/ := findUpdates(allData, activities)
        s.sessionsRWMutex.RUnlock()

        s.sessionsRWMutex.Lock()
        lastProcessed := -1
        onTop := true
        var mixed []*EntryUpdate
        for i, idRes := range new {
            // Detect on top
            if onTop {
                // mixedIdentityResult.newPos includes session.new, therefore compensate for it
                if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 {
                    // From now on the events are not on top and continuous but mixed between existing entries
                    onTop = false
                    mixed = make([]*EntryUpdate, 0, len(new)-i)
                }
                lastProcessed = idRes.newPos
            }

            if onTop {
                if session.new == nil {
                    session.new = make([]EntryIdentity, 0, len(new))
                }
                session.new = append(session.new, idRes.id)
            } else {
                modelPos := idRes.newPos - len(session.new)
                entry := activities[idRes.newPos]
                entry.isNew = true
                mixed = append(mixed, &EntryUpdate{
                    Pos:   modelPos,
                    Entry: &entry,
                })
                // Insert in session model at modelPos index
                session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...)
            }
        }

        s.sessionsRWMutex.Unlock()

        if len(session.new) > 0 || len(mixed) > 0 {
            go notify(s.eventFeed, sessionID, len(session.new) > 0, mixed)
        }
    }
}

func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) {
    payload := SessionUpdate{
        New: mixed,
    }

    if hasNewOnTop {
        payload.HasNewOnTop = &hasNewOnTop
    }

    sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil)
}

// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing
func (s *Service) unsubscribeFromEvents() {
    s.subscriptions.Unsubscribe()
    close(s.ch)
    s.ch = nil
    s.subscriptions = nil
}

func (s *Service) StopFilterSession(id SessionID) {
    s.sessionsRWMutex.Lock()
    delete(s.sessions, id)
    if len(s.sessions) == 0 {
        s.unsubscribeFromEvents()
    }
    s.sessionsRWMutex.Unlock()

    // Cancel any pending or ongoing task
    s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) {
        return nil, nil
    }, func(result interface{}, taskType async.TaskType, err error) {})
}

func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) {
    if len(entries) == 0 {
        return
    }

    ctx := context.Background()

    go func() {
        activityData, err := s.getActivityDetails(ctx, entries)
        if len(activityData) != 0 {
            sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err)
        }
    }()
}

type mixedIdentityResult struct {
    newPos int
    id     EntryIdentity
}

func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity {
    idsMap := make(map[string]EntryIdentity, len(ids))
    for _, id := range ids {
        idsMap[id.key()] = id
    }
    return idsMap
}

func entriesToMap(entries []Entry) map[string]Entry {
    entryMap := make(map[string]Entry, len(entries))
    for _, entry := range entries {
        updatedIdentity := entry.getIdentity()
        entryMap[updatedIdentity.key()] = entry
    }
    return entryMap
}

// FindUpdates returns changes in updated entries compared to the identities
//
// expects identities and entries to be sorted by timestamp
//
// the returned newer are entries that are newer than the first identity
// the returned mixed are entries that are older than the first identity (sorted by timestamp)
// the returned removed are identities that are not present in the updated entries (sorted by timestamp)
//
// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed.
func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) {
    if len(updated) == 0 {
        return
    }

    idsMap := entryIdsToMap(identities)
    updatedMap := entriesToMap(updated)

    for newIndex, entry := range updated {
        id := entry.getIdentity()
        if _, found := idsMap[id.key()]; !found {
            new = append(new, mixedIdentityResult{
                newPos: newIndex,
                id:     id,
            })
        }

        if len(identities) > 0 && entry.getIdentity().same(identities[len(identities)-1]) {
            break
        }
    }

    // Account for new entries
    for i := 0; i < len(identities); i++ {
        id := identities[i]
        if _, found := updatedMap[id.key()]; !found {
            removed = append(removed, id)
        }
    }
    return
}