network/transport/grpc/connection.go
/*
* Copyright (C) 2021 Nuts community
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package grpc
import (
"context"
"errors"
"fmt"
"github.com/nuts-foundation/go-did/did"
"github.com/nuts-foundation/nuts-node/core"
"io"
"sync"
"sync/atomic"
"github.com/nuts-foundation/nuts-node/network/log"
"github.com/nuts-foundation/nuts-node/network/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// OutboxHardLimit defines how many outgoing messages may be queued per protocol
// this is the hard limit of the underlying channel
const OutboxHardLimit = 5000
// outboxSoftLimit defines how many outgoing messages are desirable to be queued per protocol
// If needed the channel may grow to OutboxHardLimit
const outboxSoftLimit = 100
// Connection is created by grpcConnectionManager to register a connection to a peer.
// The connection can be either inbound or outbound. The presence of a Connection for a peer doesn't imply
// there's an actual connection, because it might still be trying to establish an outbound connection to the given peer.
type Connection interface {
// disconnect shuts down active inbound or outbound streams.
disconnect()
// waitUntilDisconnected blocks until the connection is closed. If it already is closed or was never open, it returns immediately.
waitUntilDisconnected()
// registerClientStream adds the given grpc.ClientStream to this Connection under the given method,
// which holds the fully qualified name of the gRPC stream call. It can be formatted using grpc.GetStreamMethod
// (the gRPC library does not provide it for grpc.ClientStream, like it does for server grpc.ServerStream).
// The stream is closed when close() is called.
registerStream(protocol Protocol, stream Stream) bool
// Send tries to send the given message over the stream of the given protocol.
// If there's no active stream for the protocol, or something else goes wrong, an error is returned.
// A sender may specify ignoreSoftLimit=true to allow extra messages to be sent.
// This is needed to finish sending a TransactionList that falls within a single page.
Send(protocol Protocol, envelope interface{}, ignoreSoftLimit bool) error
// setPeer sets the peer of this connection.
setPeer(peer transport.Peer)
// verifyOrSetPeerID checks whether the given transport.PeerID matches the one currently set for this connection.
// If no transport.PeerID is set on this connection it just sets it. Subsequent calls must then match it.
// This method is used to:
// - Initial discovery of the peer's transport.PeerID, setting it when it isn't known before connecting.
// - Verify multiple active protocols to the same peer all send the same transport.PeerID.
// It returns false if the given transport.PeerID doesn't match the previously set transport.PeerID.
verifyOrSetPeerID(id transport.PeerID) bool
// Peer returns the associated peer information of this connection.
Peer() transport.Peer
// IsConnected returns whether the connection is active or not.
IsConnected() bool
// IsAuthenticated returns whether teh given connection is authenticated.
IsAuthenticated() bool
// closeError returns the status when the connection closed with an error or nil otherwise
closeError() *status.Status
}
func createConnection(parentCtx context.Context, peer transport.Peer) Connection {
result := &conn{
streams: make(map[string]Stream),
outboxes: make(map[string]chan interface{}),
}
result.ctx, result.cancelCtx = context.WithCancel(parentCtx)
result.setPeer(peer)
return result
}
type conn struct {
peer atomic.Value
ctx context.Context
cancelCtx func()
status atomic.Pointer[status.Status]
mux sync.RWMutex
streams map[string]Stream
outboxes map[string]chan interface{}
activeGoroutines int32
}
func (mc *conn) Peer() transport.Peer {
// Populated through createConnection and verifyOrSetPeerID
peer, _ := mc.peer.Load().(transport.Peer)
return peer
}
func (mc *conn) ID() transport.PeerID {
return mc.Peer().ID
}
func (mc *conn) disconnect() {
mc.mux.Lock()
defer mc.mux.Unlock()
mc.cancelCtx()
// Close streams
mc.streams = make(map[string]Stream)
// Close outboxes
for _, outbox := range mc.outboxes {
close(outbox)
}
mc.outboxes = make(map[string]chan interface{})
// Set peer ID, since when it reconnects it might have changed (due to a reboot). Also reset node DID because it has to be re-authenticated.
peer := mc.Peer()
peer.ID = ""
peer.NodeDID = did.DID{}
peer.Authenticated = false
mc.setPeer(peer)
}
func (mc *conn) waitUntilDisconnected() {
mc.mux.RLock()
if len(mc.streams) == 0 {
// do not wait if there is no connection
mc.mux.RUnlock()
return
}
mc.mux.RUnlock()
<-mc.ctx.Done()
}
func (mc *conn) verifyOrSetPeerID(id transport.PeerID) bool {
mc.mux.Lock()
defer mc.mux.Unlock()
currentPeer := mc.Peer()
if len(currentPeer.ID) == 0 {
currentPeer.ID = id
mc.setPeer(currentPeer)
return true
}
return currentPeer.ID == id
}
func (mc *conn) setPeer(peer transport.Peer) {
mc.peer.Store(peer)
}
func (mc *conn) Send(protocol Protocol, envelope interface{}, ignoreSoftLimit bool) error {
mc.mux.Lock()
defer mc.mux.Unlock()
outbox := mc.outboxes[protocol.MethodName()]
if outbox == nil {
return fmt.Errorf("can't send message, protocol not connected: %s", protocol.MethodName())
}
if len(outbox) >= cap(outbox) {
// This node is a slow responder, we'll have to drop this message because our backlog is full.
return fmt.Errorf("peer's outbound message backlog has reached hard limit, message is dropped (peer=%s,backlog-size=%d)", mc.Peer(), cap(outbox))
}
if len(outbox) >= outboxSoftLimit && !ignoreSoftLimit {
return fmt.Errorf("peer's outbound message backlog has reached max desired capacity, message is dropped (peer=%s,backlog-size=%d)", mc.Peer(), outboxSoftLimit)
}
outbox <- envelope
return nil
}
func (mc *conn) registerStream(protocol Protocol, stream Stream) bool {
mc.mux.Lock()
defer mc.mux.Unlock()
methodName := protocol.MethodName()
if mc.streams[methodName] != nil {
return false
}
mc.streams[methodName] = stream
mc.outboxes[methodName] = make(chan interface{}, OutboxHardLimit)
mc.startReceiving(protocol, stream)
mc.startSending(protocol, stream)
// A connection can have multiple active streams, but if one of them is closed, all of them should be closed, also closing the underlying connection.
go func() {
<-stream.Context().Done()
mc.cancelCtx()
}()
return true
}
func (mc *conn) startReceiving(protocol Protocol, stream Stream) {
peer := mc.Peer() // copy Peer, because it will be nil when logging after disconnecting.
atomic.AddInt32(&mc.activeGoroutines, 1)
go func(activeGoroutines *int32) {
defer atomic.AddInt32(activeGoroutines, -1)
for {
message := protocol.CreateEnvelope()
err := stream.RecvMsg(message) // blocking
if mc.ctx.Err() != nil {
// connection has been closed: drop message and stop receiving
return
}
if err != nil {
errStatus, isStatusError := status.FromError(err)
if errors.Is(err, io.EOF) || (isStatusError && errStatus.Code() == codes.Canceled) {
log.Logger().
WithField(core.LogFieldProtocolVersion, protocol.Version()).
WithFields(peer.ToFields()).
Info("Peer closed connection")
} else {
log.Logger().
WithError(err).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
WithFields(peer.ToFields()).
Warn("Peer connection error")
}
mc.status.Store(errStatus)
mc.cancelCtx()
break
}
err = protocol.Handle(mc, message)
if err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
WithFields(peer.ToFields()).
WithField(core.LogFieldMessageType, fmt.Sprintf("%T", protocol.UnwrapMessage(message))).
Warn("Error handling message")
}
}
}(&mc.activeGoroutines)
}
func (mc *conn) startSending(protocol Protocol, stream Stream) {
outbox := mc.outboxes[protocol.MethodName()]
atomic.AddInt32(&mc.activeGoroutines, 1)
go func(activeGoroutines *int32) {
defer atomic.AddInt32(activeGoroutines, -1)
loop:
for {
select {
case <-mc.ctx.Done():
break loop
case envelope := <-outbox:
if envelope == nil {
// https://github.com/nuts-foundation/nuts-node/issues/1017
// message to send can also be nil when the connection is closed,
// and the outbox channel case receives the nil value before the done channel case receives its value.
// This sometimes triggered a panic during test teardown on slow systems
break loop
}
err := stream.SendMsg(envelope)
if err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
WithFields(mc.Peer().ToFields()).
WithField(core.LogFieldMessageType, fmt.Sprintf("%T", envelope)).
Warn("Unable to send message, message is dropped")
}
}
}
// Connection closed, see if we need to close the gRPC stream
unwrappedStream := stream
if unwrappable, ok := stream.(interface{ Unwrap() Stream }); ok {
unwrappedStream = unwrappable.Unwrap()
}
clientStream, ok := unwrappedStream.(grpc.ClientStream)
if ok {
err := clientStream.CloseSend()
if err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
Warn("Error while closing client for gRPC stream")
}
}
}(&mc.activeGoroutines)
}
func (mc *conn) IsConnected() bool {
mc.mux.RLock()
defer mc.mux.RUnlock()
return len(mc.streams) > 0
}
func (mc *conn) IsAuthenticated() bool {
return mc.Peer().Authenticated
}
func (mc *conn) closeError() *status.Status {
return mc.status.Load()
}