nuts-foundation/nuts-node

View on GitHub
network/transport/grpc/addressbook.go

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
/*
 * Copyright (C) 2023 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package grpc

import (
    "sync"
    "sync/atomic"
    "time"

    "github.com/nuts-foundation/go-did/did"
    "github.com/nuts-foundation/go-stoabs"
    "github.com/nuts-foundation/nuts-node/network/transport"
)

// AddressBook provides an API for protocols to query the ConnectionManager's known addresses.
type AddressBook interface {
    // update the address for the peer's existing contact, or create an entry if it does not exist.
    // it returns the contact, and true if the AddressBook was updated (contact was created or updated).
    // bootstrap nodes contain an address with an empty DID.
    update(peer transport.Peer) (*contact, bool)
    // get the contact if it exists. Returns nil and false if it does not.
    get(peer transport.Peer) (*contact, bool)
    // all returns a copy of the slice of contacts.
    stats() []transport.Contact
    // remove contact for the given DID. if peerDID.Empty() this removes all bootstrap contacts.
    remove(peerDID did.DID)
}

func newAddressBook(connectionStore stoabs.KVStore, backoffCreator func() Backoff) *addressBook {
    return &addressBook{
        contacts:       make([]*contact, 0),
        backoffStore:   connectionStore,
        backoffCreator: backoffCreator,
    }
}

type addressBook struct {
    mux            sync.RWMutex
    contacts       []*contact
    backoffStore   stoabs.KVStore
    backoffCreator func() Backoff
}

func (a *addressBook) get(peer transport.Peer) (*contact, bool) {
    a.mux.RLock()
    defer a.mux.RUnlock()
    return a.getWithoutLock(peer)
}

// getWithoutLock is get for internal calls
func (a *addressBook) getWithoutLock(peer transport.Peer) (*contact, bool) {
    if !peer.NodeDID.Empty() { // find on DID
        for _, o := range a.contacts {
            if peer.NodeDID.Equals(o.peer.NodeDID) {
                return o, true
            }
        }
    } else { // find on address -> bootstrap only
        for _, o := range a.contacts {
            if peer.Address == o.peer.Address {
                return o, true
            }
        }
    }
    return nil, false
}

func (a *addressBook) update(peer transport.Peer) (*contact, bool) {
    a.mux.Lock()
    defer a.mux.Unlock()

    // update existing address
    current, exists := a.getWithoutLock(peer)
    if exists {
        if peer.Address == current.peer.Address {
            // no change
            return current, false
        }
        current.peer.Address = peer.Address
        return current, true
    }

    // add new address
    backoff := a.backoffCreator()
    // only persist non-bootstrap contacts
    // bootstrap addresses are configured by the node owner and should always be called at startup
    if !peer.NodeDID.Empty() {
        // store the backoff under the DID since an address could be used by multiple DIDs.
        backoff = NewPersistedBackoff(a.backoffStore, peer.NodeDID.String(), backoff)
    }
    // wrap it in a lock since it's used from multiple go routines
    backoff = NewSyncedBackoff(backoff)
    newC := newContact(peer, NewSyncedBackoff(backoff))
    a.contacts = append(a.contacts, newC)
    return newC, true
}

func (a *addressBook) stats() []transport.Contact {
    a.mux.RLock()
    defer a.mux.RUnlock()

    result := make([]transport.Contact, 0, len(a.contacts))
    for _, c := range a.contacts {
        result = append(result, c.stats())
    }
    return result
}

// limit returns a number of contacts that match all predicates
func (a *addressBook) limit(number int, predicates ...predicate) []*contact {
    a.mux.RLock()
    defer a.mux.RUnlock()

    result := make([]*contact, 0)

outer:
    for _, c := range a.contacts {
        if len(result) == number {
            break
        }
        for _, p := range predicates {
            if !p(c) {
                continue outer
            }
        }
        result = append(result, c)
    }

    return result
}

func (a *addressBook) remove(peerDID did.DID) {
    a.mux.Lock()
    defer a.mux.Unlock()
    var j int
    for _, curr := range a.contacts {
        if !curr.peer.NodeDID.Equals(peerDID) {
            a.contacts[j] = curr
            j++
        }
    }
    a.contacts = a.contacts[:j]
}

// newContact connects to a remote server in a loop, taking into account a given backoff.
// When the connection succeeds it calls the given callback. The caller is responsible to reset the backoff after optional application-level checks succeed (e.g. authentication).
func newContact(peer transport.Peer, backoff Backoff) *contact {
    return &contact{
        peer:    peer,
        backoff: backoff,
    }
}

type contact struct {
    peer        transport.Peer
    calling     atomic.Bool
    backoff     Backoff
    attempts    atomic.Uint32
    lastAttempt atomic.Pointer[time.Time]
    error       atomic.Pointer[string]
}

func (c *contact) stats() transport.Contact {
    lastAttempt := c.lastAttempt.Load()
    var nextAttemptP *time.Time
    if lastAttempt != nil {
        nextAttempt := lastAttempt.Add(c.backoff.Value())
        nextAttemptP = &nextAttempt
    }
    return transport.Contact{
        Address:     c.peer.Address,
        DID:         c.peer.NodeDID,
        Attempts:    c.attempts.Load(),
        LastAttempt: lastAttempt,
        NextAttempt: nextAttemptP,
        Error:       c.error.Load(),
    }
}

// predicate returns true if its implementation matches a contact.
type predicate func(c *contact) bool

func isNotActivePredicate(s *grpcConnectionManager) predicate {
    return func(c *contact) bool {
        return !s.hasActiveConnection(c.peer)
    }
}

func backoffExpiredPredicate() predicate {
    return func(c *contact) bool {
        return c.backoff.Expired()
    }
}

func notDialingPredicate() predicate {
    return func(c *contact) bool {
        return !c.calling.Load()
    }
}