waku-org/go-waku

View on GitHub
waku/v2/api/publish/message_queue.go

Summary

Maintainability
A
0 mins
Test Coverage
B
89%
package publish

import (
    "container/heap"
    "context"
    "sync"

    "github.com/waku-org/go-waku/waku/v2/protocol"
    "github.com/waku-org/go-waku/waku/v2/utils"
)

// MessagePriority determines the ordering for the message priority queue
type MessagePriority = int

const (
    LowPriority    MessagePriority = 1
    NormalPriority MessagePriority = 2
    HighPriority   MessagePriority = 3
)

type envelopePriority struct {
    envelope *protocol.Envelope
    priority int
    index    int
}

type envelopePriorityQueue []*envelopePriority

func (pq envelopePriorityQueue) Len() int { return len(pq) }

func (pq envelopePriorityQueue) Less(i, j int) bool {
    if pq[i].priority > pq[j].priority {
        return true
    } else if pq[i].priority == pq[j].priority {
        return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp()
    }

    return false
}

func (pq envelopePriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
    pq[i].index = i
    pq[j].index = j
}

func (pq *envelopePriorityQueue) Push(x any) {
    n := len(*pq)
    item := x.(*envelopePriority)
    item.index = n
    *pq = append(*pq, item)
}

func (pq *envelopePriorityQueue) Pop() any {
    old := *pq
    n := len(old)
    item := old[n-1]
    old[n-1] = nil  // avoid memory leak
    item.index = -1 // for safety
    *pq = old[0 : n-1]
    return item
}

type safeEnvelopePriorityQueue struct {
    pq   envelopePriorityQueue
    lock sync.Mutex
}

func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) {
    spq.lock.Lock()
    defer spq.lock.Unlock()
    heap.Push(&spq.pq, task)
}

func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority {
    spq.lock.Lock()
    defer spq.lock.Unlock()

    if len(spq.pq) == 0 {
        return nil
    }
    task := heap.Pop(&spq.pq).(*envelopePriority)
    return task
}

// Len returns the length of the priority queue in a thread-safe manner
func (spq *safeEnvelopePriorityQueue) Len() int {
    spq.lock.Lock()
    defer spq.lock.Unlock()

    return spq.pq.Len()
}

func newSafePriorityQueue() *safeEnvelopePriorityQueue {
    result := &safeEnvelopePriorityQueue{
        pq: make(envelopePriorityQueue, 0),
    }
    heap.Init(&result.pq)
    return result
}

// MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct {
    usePriorityQueue bool

    toSendChan                             chan *protocol.Envelope
    throttledPrioritySendQueue             chan *envelopePriority
    envelopeAvailableOnPriorityQueueSignal chan struct{}
    envelopePriorityQueue                  *safeEnvelopePriorityQueue
}

// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
// priority queue to handle the ordering of the messages, or use a simple FIFO queue.
func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
    m := &MessageQueue{
        usePriorityQueue: usePriorityQueue,
    }

    if m.usePriorityQueue {
        m.envelopePriorityQueue = newSafePriorityQueue()
        m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
        m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
    } else {
        m.toSendChan = make(chan *protocol.Envelope, bufferSize)
    }

    return m
}

// Start must be called to handle the lifetime of the internals of the message queue
func (m *MessageQueue) Start(ctx context.Context) {

    for {
        select {
        case envelopePriority, ok := <-m.throttledPrioritySendQueue:
            if !ok {
                continue
            }

            m.envelopePriorityQueue.Push(envelopePriority)
            m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}

        case <-ctx.Done():
            return
        }
    }
}

// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error {
    if m.usePriorityQueue {
        msgPriority := NormalPriority
        if len(priority) != 0 {
            msgPriority = priority[0]
        }

        pEnvelope := &envelopePriority{
            envelope: envelope,
            priority: msgPriority,
        }

        select {
        case m.throttledPrioritySendQueue <- pEnvelope:
            // Do nothing
        case <-ctx.Done():
            return ctx.Err()
        }
    } else {
        select {
        case m.toSendChan <- envelope:
            // Do nothing
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}

// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
    ch := make(chan *protocol.Envelope)

    go func() {
        defer utils.LogOnPanic()
        defer close(ch)

        select {
        case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
            if ok {
                e := m.envelopePriorityQueue.Pop()
                if e != nil {
                    ch <- e.envelope
                }
            }

        case envelope, ok := <-m.toSendChan:
            if ok {
                ch <- envelope
            }

        case <-ctx.Done():
            return
        }

    }()

    return ch
}