waku/v2/api/publish/message_queue.go
package publish
import (
"container/heap"
"context"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// MessagePriority determines the ordering for the message priority queue
type MessagePriority = int
const (
LowPriority MessagePriority = 1
NormalPriority MessagePriority = 2
HighPriority MessagePriority = 3
)
type envelopePriority struct {
envelope *protocol.Envelope
priority int
index int
}
type envelopePriorityQueue []*envelopePriority
func (pq envelopePriorityQueue) Len() int { return len(pq) }
func (pq envelopePriorityQueue) Less(i, j int) bool {
if pq[i].priority > pq[j].priority {
return true
} else if pq[i].priority == pq[j].priority {
return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp()
}
return false
}
func (pq envelopePriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *envelopePriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*envelopePriority)
item.index = n
*pq = append(*pq, item)
}
func (pq *envelopePriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
type safeEnvelopePriorityQueue struct {
pq envelopePriorityQueue
lock sync.Mutex
}
func (spq *safeEnvelopePriorityQueue) Push(task *envelopePriority) {
spq.lock.Lock()
defer spq.lock.Unlock()
heap.Push(&spq.pq, task)
}
func (spq *safeEnvelopePriorityQueue) Pop() *envelopePriority {
spq.lock.Lock()
defer spq.lock.Unlock()
if len(spq.pq) == 0 {
return nil
}
task := heap.Pop(&spq.pq).(*envelopePriority)
return task
}
// Len returns the length of the priority queue in a thread-safe manner
func (spq *safeEnvelopePriorityQueue) Len() int {
spq.lock.Lock()
defer spq.lock.Unlock()
return spq.pq.Len()
}
func newSafePriorityQueue() *safeEnvelopePriorityQueue {
result := &safeEnvelopePriorityQueue{
pq: make(envelopePriorityQueue, 0),
}
heap.Init(&result.pq)
return result
}
// MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct {
usePriorityQueue bool
toSendChan chan *protocol.Envelope
throttledPrioritySendQueue chan *envelopePriority
envelopeAvailableOnPriorityQueueSignal chan struct{}
envelopePriorityQueue *safeEnvelopePriorityQueue
}
// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
// priority queue to handle the ordering of the messages, or use a simple FIFO queue.
func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
m := &MessageQueue{
usePriorityQueue: usePriorityQueue,
}
if m.usePriorityQueue {
m.envelopePriorityQueue = newSafePriorityQueue()
m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
} else {
m.toSendChan = make(chan *protocol.Envelope, bufferSize)
}
return m
}
// Start must be called to handle the lifetime of the internals of the message queue
func (m *MessageQueue) Start(ctx context.Context) {
for {
select {
case envelopePriority, ok := <-m.throttledPrioritySendQueue:
if !ok {
continue
}
m.envelopePriorityQueue.Push(envelopePriority)
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}
case <-ctx.Done():
return
}
}
}
// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error {
if m.usePriorityQueue {
msgPriority := NormalPriority
if len(priority) != 0 {
msgPriority = priority[0]
}
pEnvelope := &envelopePriority{
envelope: envelope,
priority: msgPriority,
}
select {
case m.throttledPrioritySendQueue <- pEnvelope:
// Do nothing
case <-ctx.Done():
return ctx.Err()
}
} else {
select {
case m.toSendChan <- envelope:
// Do nothing
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)
go func() {
defer utils.LogOnPanic()
defer close(ch)
select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok {
e := m.envelopePriorityQueue.Pop()
if e != nil {
ch <- e.envelope
}
}
case envelope, ok := <-m.toSendChan:
if ok {
ch <- envelope
}
case <-ctx.Done():
return
}
}()
return ch
}