nuts-foundation/nuts-node

View on GitHub
network/transport/v2/gossip/queue.go

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
/*
 * Nuts node
 * Copyright (C) 2022 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package gossip

import (
    "context"
    "sync"

    "github.com/nuts-foundation/nuts-node/crypto/hash"
)

// peerQueue contains a log of received transaction references and a queue of references to send for a specific peer.
type peerQueue struct {
    // cancelFunc to unregister the ticker for this peer
    cancelFunc context.CancelFunc
    // log of recent received transaction hashes
    log *uniqueList
    // maxSize determines how many items are kept in each list
    maxSize int
    // mutex to protect concurrent access to the queue
    mutex sync.Mutex
    // queue that holds the list of hashes to be gossipped
    queue *uniqueList
    xor   hash.SHA256Hash
    clock uint32
}

func newPeerQueue() peerQueue {
    return peerQueue{
        log:     newUniqueList(),
        maxSize: maxQueueSize,
        queue:   newUniqueList(),
    }
}

// registerContext registers a context. When unregister is called it'll cancel the returned context.
func (pq *peerQueue) registerContext(parentCtx context.Context) context.Context {
    var ctx context.Context
    ctx, pq.cancelFunc = context.WithCancel(parentCtx)

    return ctx
}

func (pq *peerQueue) unregister() {
    if pq.cancelFunc != nil {
        pq.cancelFunc()
    }
}

// lock and do stuff
func (pq *peerQueue) do(f func()) {
    pq.mutex.Lock()
    defer pq.mutex.Unlock()

    f()
}

// enqueued returns the enqueued transaction references
// it does not lock the mutex
func (pq *peerQueue) enqueued() ([]hash.SHA256Hash, hash.SHA256Hash, uint32) {
    return pq.queue.Values(), pq.xor, pq.clock
}

// clear the queue, it does not clear the log
// it does not lock the mutex
func (pq *peerQueue) clear() {
    pq.queue = newUniqueList()
}

// logReceivedTransactions adds given hashes to the log and removes them from the queue when present
func (pq *peerQueue) logReceivedTransactions(refs ...hash.SHA256Hash) {
    pq.mutex.Lock()
    defer pq.mutex.Unlock()

    for _, ref := range refs {
        // add to log
        pq.log.Add(ref)

        // remove from queue
        pq.queue.Remove(ref)

        // shrink log if too big
        if pq.log.Len() > pq.maxSize {
            pq.log.RemoveFront()
        }
    }
}

// enqueue adds given hashes to the queue unless present in the log
func (pq *peerQueue) enqueue(clock uint32, xor hash.SHA256Hash, refs ...hash.SHA256Hash) {
    pq.mutex.Lock()
    defer pq.mutex.Unlock()

    pq.xor = xor
    pq.clock = clock

    for _, ref := range refs {
        if pq.queue.Len() >= pq.maxSize {
            // ignore new, older TXs are more important to process first for peer
            return
        }

        // ignore if present in log
        if pq.log.Contains(ref) {
            continue
        }

        // add to queue
        pq.queue.Add(ref)
    }
}