nuts-foundation/nuts-node

View on GitHub
network/transport/grpc/connection_list.go

Summary

Maintainability
A
0 mins
Test Coverage
A
96%
/*
 * Copyright (C) 2021 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package grpc

import (
    "context"
    "errors"
    "sync"

    "github.com/nuts-foundation/nuts-node/core"
    "github.com/nuts-foundation/nuts-node/network/transport"
)

// ErrNoConnection can be used when no connection is available but one is required.
var ErrNoConnection = errors.New("no connection available")

// ConnectionList provides an API for protocols to query the ConnectionManager's connections.
type ConnectionList interface {
    // Get returns the first Connection which matches the predicates (using AND)
    // If there's no match, nil is returned.
    Get(query ...Predicate) Connection
    // All returns the list of connections.
    All() []Connection
    // AllMatching returns the list of connections which match the predicates (using AND).
    AllMatching(query ...Predicate) []Connection
}

type connectionList struct {
    mux  sync.Mutex
    list []Connection
}

func (c *connectionList) Get(query ...Predicate) Connection {
    c.mux.Lock()
    defer c.mux.Unlock()
    return c.get(query...)
}

func (c *connectionList) get(query ...Predicate) Connection {
    // Make sure we're not returning the first random connection by accident
    if len(query) == 0 {
        return nil
    }

outer:
    for _, curr := range c.list {
        for _, predicate := range query {
            if !predicate.Match(curr) {
                continue outer
            }
        }

        return curr
    }

    return nil
}

func (c *connectionList) forEach(consumer func(connection Connection)) {
    c.mux.Lock()
    defer c.mux.Unlock()

    for _, curr := range c.list {
        consumer(curr)
    }
}

// getOrRegister retrieves the connection that matches the given peer (either on ID or address).
// If no connections match the given peer it creates a new one.
// It returns false if the peer matched an existing connection.
// It returns true if a new connection was created.
// The given context is used as parent context for new connections: if it's cancelled, callers blocked by waitUntilDisconnected will be unblocked.
func (c *connectionList) getOrRegister(ctx context.Context, peer transport.Peer, outbound bool) (Connection, bool) {
    c.mux.Lock()
    defer c.mux.Unlock()

    // Check whether we're already connected to this peer
    var existing Connection
    if outbound {
        if peer.NodeDID.Empty() { // allow 1 connection to a bootstrap node
            existing = c.get(ByAddress(peer.Address), ByNodeDID(peer.NodeDID))
        } else { // outbound only needs 1 connection to a DID
            existing = c.get(ByNodeDID(peer.NodeDID))
        }
    } else {
        // allow 1 connection per PeerID/DID combo for clustering purposes.
        // for anonymous connections (NodeDID.Empty()==true) there is only 1 per peerID
        // TODO: add a configurable limit to the number of connections per DID
        existing = c.get(ByPeerID(peer.ID), ByNodeDID(peer.NodeDID))
    }
    if existing != nil {
        return existing, false
    }

    result := createConnection(ctx, peer)
    c.list = append(c.list, result)
    return result, true
}

func (c *connectionList) All() []Connection {
    c.mux.Lock()
    defer c.mux.Unlock()

    result := make([]Connection, len(c.list))
    copy(result, c.list)
    return result
}

func (c *connectionList) AllMatching(query ...Predicate) []Connection {
    c.mux.Lock()
    defer c.mux.Unlock()

    var result []Connection
outer:
    for _, curr := range c.list {
        for _, predicate := range query {
            if !predicate.Match(curr) {
                continue outer
            }
        }

        result = append(result, curr)
    }

    return result
}

func (c *connectionList) remove(target Connection) {
    c.mux.Lock()
    defer c.mux.Unlock()

    var j int
    for _, curr := range c.list {
        if curr != target {
            c.list[j] = curr
            j++
        }
    }
    c.list = c.list[:j]
}

func (c *connectionList) Diagnostics() []core.DiagnosticResult {
    c.mux.Lock()
    defer c.mux.Unlock()
    var peers []transport.Peer
    // Only add peer to "outbound_connectors" contacts if not connected
    for _, curr := range c.list {
        if curr.IsConnected() {
            peers = append(peers, curr.Peer())
        }
    }
    return []core.DiagnosticResult{
        numberOfPeersStatistic{numberOfPeers: len(peers)},
        peersStatistic{peers: peers},
    }
}