waku-org/go-waku

View on GitHub
waku/v2/protocol/relay/subscription.go

Summary

Maintainability
A
0 mins
Test Coverage
A
90%
package relay

import (
    "context"

    "github.com/waku-org/go-waku/waku/v2/protocol"
    "golang.org/x/exp/slices"
)

// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
type Subscription struct {
    ID            int
    Unsubscribe   func() //for internal use only. For relay Subscription use relay protocol's unsubscribe
    Ch            chan *protocol.Envelope
    contentFilter protocol.ContentFilter
    subType       SubscriptionType
    noConsume     bool
}

type SubscriptionType int

const (
    SpecificContentTopics SubscriptionType = iota
    AllContentTopics
)

// Submit allows a message to be submitted for a subscription
func (s *Subscription) Submit(ctx context.Context, msg *protocol.Envelope) {
    //Filter and notify
    // - if contentFilter doesn't have a contentTopic
    // - if contentFilter has contentTopics and it matches with message
    if !s.noConsume && (len(s.contentFilter.ContentTopicsList()) == 0 ||
        (len(s.contentFilter.ContentTopicsList()) > 0 && slices.Contains(s.contentFilter.ContentTopicsList(), msg.Message().ContentTopic))) {
        select {
        case <-ctx.Done():
            return
        case s.Ch <- msg:
        }
    }
}

// NewSubscription creates a subscription that will only receive messages based on the contentFilter
func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
    ch := make(chan *protocol.Envelope)
    var subType SubscriptionType
    if len(contentFilter.ContentTopicsList()) == 0 {
        subType = AllContentTopics
    }
    return &Subscription{
        Unsubscribe: func() {
            close(ch)
        },
        Ch:            ch,
        contentFilter: contentFilter,
        subType:       subType,
    }
}