waku-org/go-waku

View on GitHub
waku/v2/api/publish/message_sender.go

Summary

Maintainability
A
35 mins
Test Coverage
F
52%
package publish

import (
    "context"
    "errors"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/protocol/pb"
    "go.uber.org/zap"
    "golang.org/x/time/rate"
)

const DefaultPeersToPublishForLightpush = 2
const DefaultPublishingLimiterRate = rate.Limit(2)
const DefaultPublishingLimitBurst = 4

type PublishMethod int

const (
    LightPush PublishMethod = iota
    Relay
    UnknownMethod
)

func (pm PublishMethod) String() string {
    switch pm {
    case LightPush:
        return "LightPush"
    case Relay:
        return "Relay"
    default:
        return "Unknown"
    }
}

type Publisher interface {
    // RelayListPeers returns the list of peers for a pubsub topic
    RelayListPeers(pubsubTopic string) ([]peer.ID, error)

    // RelayPublish publishes a message via WakuRelay
    RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error)

    // LightpushPublish publishes a message via WakuLightPush
    LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error)
}

type MessageSender struct {
    publishMethod    PublishMethod
    publisher        Publisher
    messageSentCheck ISentCheck
    rateLimiter      *PublishRateLimiter
    logger           *zap.Logger
}

type Request struct {
    ctx           context.Context
    envelope      *protocol.Envelope
    publishMethod PublishMethod
}

func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request {
    return &Request{
        ctx:           ctx,
        envelope:      envelope,
        publishMethod: UnknownMethod,
    }
}

func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request {
    r.publishMethod = publishMethod
    return r
}

func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger *zap.Logger) (*MessageSender, error) {
    if publishMethod == UnknownMethod {
        return nil, errors.New("publish method is required")
    }
    return &MessageSender{
        publishMethod: publishMethod,
        publisher:     publisher,
        rateLimiter:   NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst),
        logger:        logger,
    }, nil
}

func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender {
    ms.messageSentCheck = messageSentCheck
    return ms
}

func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender {
    ms.rateLimiter = rateLimiter
    return ms
}

func (ms *MessageSender) Send(req *Request) error {
    logger := ms.logger.With(
        zap.Stringer("envelopeHash", req.envelope.Hash()),
        zap.String("pubsubTopic", req.envelope.PubsubTopic()),
        zap.String("contentTopic", req.envelope.Message().ContentTopic),
        zap.Int64("timestamp", req.envelope.Message().GetTimestamp()),
    )

    if ms.rateLimiter != nil {
        if err := ms.rateLimiter.Check(req.ctx, logger); err != nil {
            return err
        }
    }

    publishMethod := req.publishMethod
    if publishMethod == UnknownMethod {
        publishMethod = ms.publishMethod
    }

    switch publishMethod {
    case LightPush:
        logger.Info("publishing message via lightpush")
        _, err := ms.publisher.LightpushPublish(
            req.ctx,
            req.envelope.Message(),
            req.envelope.PubsubTopic(),
            DefaultPeersToPublishForLightpush,
        )
        if err != nil {
            return err
        }
    case Relay:
        peers, err := ms.publisher.RelayListPeers(req.envelope.PubsubTopic())
        if err != nil {
            return err
        }
        logger.Info("publishing message via relay", zap.Int("peerCnt", len(peers)))
        _, err = ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic())
        if err != nil {
            return err
        }
    default:
        return errors.New("unknown publish method")
    }

    if ms.messageSentCheck != nil && !req.envelope.Message().GetEphemeral() {
        ms.messageSentCheck.Add(
            req.envelope.PubsubTopic(),
            common.BytesToHash(req.envelope.Hash().Bytes()),
            uint32(req.envelope.Message().GetTimestamp()/int64(time.Second)),
        )
    }

    return nil
}

func (ms *MessageSender) Start() {
    if ms.messageSentCheck != nil {
        go ms.messageSentCheck.Start()
    }
}

func (ms *MessageSender) PublishMethod() PublishMethod {
    return ms.publishMethod
}

func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) {
    if ms.messageSentCheck != nil {
        ms.messageSentCheck.DeleteByMessageIDs(messageIDs)
    }
}