waku-org/go-waku

View on GitHub
library/filter.go

Summary

Maintainability
A
1 hr
Test Coverage
package library

import (
    "context"
    "encoding/json"
    "errors"
    "time"

    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/protocol/filter"
    "github.com/waku-org/go-waku/waku/v2/protocol/subscription"
    "github.com/waku-org/go-waku/waku/v2/utils"
)

type filterArgument struct {
    PubsubTopic   string   `json:"pubsubTopic,omitempty"`
    ContentTopics []string `json:"contentTopics,omitempty"`
}

func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
    var f filterArgument
    err := json.Unmarshal([]byte(filterJSON), &f)
    if err != nil {
        return protocol.ContentFilter{}, err
    }

    return protocol.ContentFilter{
        PubsubTopic:   f.PubsubTopic,
        ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
    }, nil
}

type subscribeResult struct {
    Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
    Error         string                              `json:"error,omitempty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) (string, error) {
    cf, err := toContentFilter(filterJSON)
    if err != nil {
        return "", err
    }

    if err := validateInstance(instance, MustBeStarted); err != nil {
        return "", err
    }

    var ctx context.Context
    var cancel context.CancelFunc

    if ms > 0 {
        ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
        defer cancel()
    } else {
        ctx = instance.ctx
    }

    var fOptions []filter.FilterSubscribeOption
    if peerID != "" {
        p, err := peer.Decode(peerID)
        if err != nil {
            return "", err
        }
        fOptions = append(fOptions, filter.WithPeer(p))
    } else {
        fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
    }

    subscriptions, err := instance.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
    if err != nil && subscriptions == nil {
        return "", err
    }

    for _, subscriptionDetails := range subscriptions {
        go func(subscriptionDetails *subscription.SubscriptionDetails) {
            defer utils.LogOnPanic()
            for envelope := range subscriptionDetails.C {
                send(instance, "message", toSubscriptionMessage(envelope))
            }
        }(subscriptionDetails)
    }
    var subResult subscribeResult
    subResult.Subscriptions = subscriptions
    if err != nil {
        subResult.Error = err.Error()
    }

    return marshalJSON(subResult)
}

// FilterPing is used to determine if a peer has an active subscription
func FilterPing(instance *WakuInstance, peerID string, ms int) error {
    if err := validateInstance(instance, MustBeStarted); err != nil {
        return err
    }

    var ctx context.Context
    var cancel context.CancelFunc

    if ms > 0 {
        ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
        defer cancel()
    } else {
        ctx = instance.ctx
    }

    var pID peer.ID
    var err error
    if peerID != "" {
        pID, err = peer.Decode(peerID)
        if err != nil {
            return err
        }
    } else {
        return errors.New("peerID is required")
    }

    return instance.node.FilterLightnode().Ping(ctx, pID)
}

// FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
    cf, err := toContentFilter(filterJSON)
    if err != nil {
        return err
    }

    if err := validateInstance(instance, MustBeStarted); err != nil {
        return err
    }

    var ctx context.Context
    var cancel context.CancelFunc

    if ms > 0 {
        ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
        defer cancel()
    } else {
        ctx = instance.ctx
    }

    var fOptions []filter.FilterSubscribeOption
    if peerID != "" {
        p, err := peer.Decode(peerID)
        if err != nil {
            return err
        }
        fOptions = append(fOptions, filter.WithPeer(p))
    } else {
        return errors.New("peerID is required")
    }

    result, err := instance.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
    if err != nil {
        return err
    }

    errs := result.Errors()
    if len(errs) == 0 {
        return nil
    }
    return errs[0].Err
}

type unsubscribeAllResult struct {
    PeerID string `json:"peerID"`
    Error  string `json:"error"`
}

// FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions
func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string, error) {
    if err := validateInstance(instance, MustBeStarted); err != nil {
        return "", err
    }

    var ctx context.Context
    var cancel context.CancelFunc

    if ms > 0 {
        ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
        defer cancel()
    } else {
        ctx = instance.ctx
    }

    var fOptions []filter.FilterSubscribeOption
    if peerID != "" {
        p, err := peer.Decode(peerID)
        if err != nil {
            return "", err
        }
        fOptions = append(fOptions, filter.WithPeer(p))
    } else {
        fOptions = append(fOptions, filter.UnsubscribeAll())
    }

    result, err := instance.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
    if err != nil {
        return "", err
    }

    var unsubscribeResult []unsubscribeAllResult

    for _, err := range result.Errors() {
        ur := unsubscribeAllResult{
            PeerID: err.PeerID.String(),
        }
        if err.Err != nil {
            ur.Error = err.Err.Error()
        }
        unsubscribeResult = append(unsubscribeResult, ur)
    }

    return marshalJSON(unsubscribeResult)
}