network/transport/grpc/connection_manager.go
/*
* Copyright (C) 2023 Nuts community
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package grpc
import (
"context"
"crypto/x509"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/nuts-foundation/go-did/did"
"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/nuts-node/core"
"github.com/nuts-foundation/nuts-node/network/log"
"github.com/nuts-foundation/nuts-node/network/transport"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
grpcPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
const defaultMaxMessageSizeInBytes = 1024 * 512
const maxConcurrentCallsPerTick = 10
const peerIDHeader = "peerID"
const nodeDIDHeader = "nodeDID"
// ErrNodeDIDAuthFailed is the error message returned to the peer when the node DID it sent could not be authenticated.
// It is specified by RFC017.
var ErrNodeDIDAuthFailed = status.Error(codes.Unauthenticated, "nodeDID authentication failed")
// ErrUnexpectedNodeDID is the error used in outbound calling to signal that the peer sent a different NodeDID than expected.
// The DID has moved on, do not call it again until notified of its new address.
var ErrUnexpectedNodeDID = fmt.Errorf("call answered by other node DID than expected: %w", ErrNodeDIDAuthFailed)
// ErrAlreadyConnected indicates the node is already connected to the peer.
var ErrAlreadyConnected = errors.New("already connected")
// MaxMessageSizeInBytes defines the maximum size of an in- or outbound gRPC/Protobuf message
var MaxMessageSizeInBytes = defaultMaxMessageSizeInBytes
// defaultInterceptors aids testing
var defaultInterceptors []grpc.StreamServerInterceptor
var _ transport.ConnectionManager = (*grpcConnectionManager)(nil)
type fatalError struct {
error
}
func (s fatalError) Error() string {
return s.error.Error()
}
func (s fatalError) Unwrap() error {
return s.error
}
type dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
// NewGRPCConnectionManager creates a new ConnectionManager that accepts/creates connections which communicate using the given protocols.
func NewGRPCConnectionManager(config Config, connectionStore stoabs.KVStore, nodeDID did.DID, authenticator Authenticator, protocols ...transport.Protocol) (*grpcConnectionManager, error) {
var grpcProtocols []Protocol
for _, curr := range protocols {
// For now, only gRPC protocols are supported
protocol, ok := curr.(Protocol)
if ok {
grpcProtocols = append(grpcProtocols, protocol)
}
}
// client tls
tlsDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) // No TLS, requires 'insecure' flag
if config.tlsEnabled() {
clientTlsConfig, err := newClientTLSConfig(config)
if err != nil {
return nil, err
}
tlsDialOption = grpc.WithTransportCredentials(credentials.NewTLS(clientTlsConfig)) // TLS authentication
}
cm := &grpcConnectionManager{
protocols: grpcProtocols,
nodeDID: nodeDID,
authenticator: authenticator,
config: config,
connectionTimeout: config.connectionTimeout,
connections: &connectionList{},
dialer: config.dialer,
dialOptions: []grpc.DialOption{
grpc.WithBlock(), // Dial should block until connection succeeded (or time-out expired)
grpc.WithReturnConnectionError(), // This option causes underlying errors to be returned when connections fail, rather than just "context deadline exceeded"
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(MaxMessageSizeInBytes),
grpc.MaxCallSendMsgSize(MaxMessageSizeInBytes),
),
grpc.WithUserAgent(core.UserAgent()),
tlsDialOption,
},
}
cm.addressBook = newAddressBook(connectionStore, config.backoffCreator)
cm.registerPrometheusMetrics()
cm.ctx, cm.ctxCancel = context.WithCancel(context.Background())
cm.lastCertificateValidation.Store(&time.Time{})
if config.tlsEnabled() {
config.pkiValidator.SubscribeDenied(cm.revalidatePeers)
}
return cm, nil
}
// grpcConnectionManager is a ConnectionManager that does not discover peers on its own, but just connects to the peers for which Connect() is called.
type grpcConnectionManager struct {
protocols []Protocol
config Config
grpcServer *grpc.Server
ctx context.Context
ctxCancel func()
listener net.Listener
authenticator Authenticator
nodeDID did.DID
observers []transport.StreamStateObserverFunc
peersCounter prometheus.Gauge
recvMessagesCounter *prometheus.CounterVec
sentMessagesCounter *prometheus.CounterVec
addressBook *addressBook
dialer
connectLoopWG sync.WaitGroup
dialOptions []grpc.DialOption
connectionTimeout time.Duration
connections *connectionList
lastCertificateValidation atomic.Pointer[time.Time]
}
// newGrpcServer configures a new grpc.Server
func newGrpcServer(config Config) (*grpc.Server, error) {
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(MaxMessageSizeInBytes),
grpc.MaxSendMsgSize(MaxMessageSizeInBytes),
}
var serverInterceptors []grpc.StreamServerInterceptor
serverInterceptors = append(serverInterceptors, defaultInterceptors...)
// Configure TLS if enabled
if config.tlsEnabled() {
// Some form of TLS is enabled
if config.serverCert != nil {
// TLS is terminated at the Nuts node (no offloading)
tlsServer, err := newServerTLSConfig(config)
if err != nil {
return nil, err
}
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsServer)))
} else {
// TLS offloading for incoming traffic. config.clientCertHeaderName is validated during config creation.
serverInterceptors = append(serverInterceptors, newAuthenticationInterceptor(config.clientCertHeaderName, config.pkiValidator))
}
} else {
log.Logger().Info("TLS is disabled, this is very unsecure and only suitable for demo/development environments.")
}
// Chain interceptors. ipInterceptor is added last, so it processes the stream first.
serverInterceptors = append(serverInterceptors, ipInterceptor)
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(serverInterceptors...))
// Create gRPC server for inbound connectionList and associate it with the protocols
return grpc.NewServer(serverOpts...), nil
}
func (s *grpcConnectionManager) Start() error {
// Start outbound
s.connectLoopWG.Add(1)
go func() {
defer s.connectLoopWG.Done()
s.connectLoop()
}()
// Start inbound
if s.config.listenAddress == "" {
log.Logger().Info("Not starting gRPC server, connections will only be outbound.")
return nil
}
log.Logger().Debugf("Starting gRPC server on %s", s.config.listenAddress)
var err error
s.listener, err = s.config.listener(s.config.listenAddress)
if err != nil {
return err
}
// Create gRPC server for inbound connectionList and associate it with the protocols
s.grpcServer, err = newGrpcServer(s.config)
if err != nil {
return err
}
for _, protocol := range s.protocols {
protocol.Register(s, func(stream grpc.ServerStream) error {
return s.handleInboundStream(protocol, stream)
}, s.connections, s)
}
// Start serving from the gRPC server
go func(server *grpc.Server, listener net.Listener) {
err := server.Serve(listener)
if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
log.Logger().
WithError(err).
Error("gRPC server errored")
s.Stop()
}
}(s.grpcServer, s.listener)
log.Logger().Infof("gRPC server started on %s", s.config.listenAddress)
return nil
}
func (s *grpcConnectionManager) Stop() {
log.Logger().Debug("Stopping gRPC connection manager")
s.ctxCancel() // stops connectLoop
log.Logger().Trace("Waiting for connectLoop to close")
s.connectLoopWG.Wait()
s.connections.forEach(func(connection Connection) {
connection.disconnect()
})
if s.grpcServer != nil { // is nil when not accepting inbound connections
s.grpcServer.GracefulStop() // also closes listener
}
prometheus.Unregister(s.peersCounter)
prometheus.Unregister(s.sentMessagesCounter)
prometheus.Unregister(s.recvMessagesCounter)
}
func (s *grpcConnectionManager) connectLoop() {
log.Logger().Debug("Start connecting")
ticker := time.NewTicker(time.Second)
connectWG := new(sync.WaitGroup)
defer ticker.Stop()
outerLoop:
for {
select {
case <-s.ctx.Done():
break outerLoop
case <-ticker.C:
// Try to connect to a subset of contacts that meet the criteria (not connected and an expired backoff)
// The limited subset prevents calling all contacts at the exact same time, it is not a limit on the number of allowed outbound calls at a time.
// This is mostly an issue during startup, and for new nodes this prevents the node from performing a DoS attack on its backoff store.
for _, c := range s.addressBook.limit(maxConcurrentCallsPerTick, isNotActivePredicate(s), backoffExpiredPredicate(), notDialingPredicate()) {
// the notDialingPredicate above guarantees that calling is currently false. We can take the calling lock
c.calling.Store(true)
connectWG.Add(1)
go func(cp *contact) {
defer func() {
cp.calling.Store(false) // reset call lock at the end of calling
connectWG.Done()
}()
s.connect(cp) // blocking while connected
}(c)
}
}
}
connectWG.Wait()
}
func (s *grpcConnectionManager) connect(contact *contact) {
connection, isNew := s.connections.getOrRegister(s.ctx, contact.peer, true)
if !isNew {
// can only occur when receiving an inbound connection at the same time.
log.Logger().WithFields(contact.peer.ToFields()).
Debug("stop calling, already has a connection")
return
}
defer func() {
// connection does not exist outside the dialer
connection.disconnect()
s.connections.remove(connection)
}()
// Open a grpc.ClientConn
log.Logger().WithFields(contact.peer.ToFields()).Debug("connecting to peer")
contact.attempts.Add(1)
now := time.Now()
contact.lastAttempt.Store(&now)
dialContext, cancel := context.WithTimeout(s.ctx, s.connectionTimeout)
defer cancel()
grpcClient, err := s.dialer(dialContext, contact.peer.Address, s.dialOptions...)
if err != nil { // failed to connect
log.Logger().WithError(err).WithFields(contact.peer.ToFields()).Debug("failed to open a grpc ClientConn")
errStatus, isStatusError := status.FromError(err)
if isStatusError && errStatus.Code() == codes.Canceled {
// Do not backoff when context is cancelled
// Backoff might try to persist after stores are closed
// https://github.com/nuts-foundation/nuts-node/issues/1864
return
}
sErr := err.Error()
contact.error.Store(&sErr)
contact.backoff.Backoff() // backoff store
return
}
defer grpcClient.Close()
log.Logger().WithFields(contact.peer.ToFields()).Debug("connected to peer (outbound)")
// Connect protocol streams
err = s.openOutboundStreams(connection, grpcClient) // blocking call, connect needs to be async
if err != nil {
// connection failed, increase backoff
// TODO: check if this works as intended for multiple streams/protocols on the same connection
contact.backoff.Backoff()
if errors.Is(err, ErrUnexpectedNodeDID) {
// backoff expires after a day. DID is probably abandoned/replaced, but try again later in case the node was misconfigured.
contact.backoff.Reset(time.Hour * 24)
}
log.Logger().WithError(err).WithFields(connection.Peer().ToFields()).
Debug("Error while setting up outbound gRPC streams, disconnecting")
} else {
// Connection was OK, but now disconnected. Add a random wait to prevent simultaneous reconnecting.
contact.backoff.Reset(RandomBackoff(time.Second, 5*time.Second))
}
}
func (s *grpcConnectionManager) hasActiveConnection(peer transport.Peer) bool {
if peer.NodeDID.Empty() { // bootstrap matches on address + empty node DID
return s.connections.Get(ByAddress(peer.Address), ByNodeDID(did.DID{})) != nil
}
// Only authenticated connections
return s.connections.Get(ByNodeDID(peer.NodeDID), ByAuthenticated()) != nil
}
func (s *grpcConnectionManager) Connect(peerAddress string, peerDID did.DID, delay *time.Duration) {
// peer has deactivated its DID or removed it's NutsComm address. Delete peer from address book, if it exists.
if peerAddress == "" {
s.addressBook.remove(peerDID)
return
}
// add/update contact
peer := transport.Peer{Address: peerAddress, NodeDID: peerDID}
if cont, updated := s.addressBook.update(peer); updated && delay != nil {
// reset existing backoff after an update to try to connect to the peer's new address
cont.backoff.Reset(*delay)
}
}
func (s *grpcConnectionManager) RegisterObserver(observer transport.StreamStateObserverFunc) {
s.observers = append(s.observers, observer)
}
func (s *grpcConnectionManager) notifyObservers(peer transport.Peer, protocol transport.Protocol, state transport.StreamState) {
log.Logger().
WithFields(peer.ToFields()).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
Debugf("Stream state changed to %s", state)
for _, observer := range s.observers {
observer(peer, state, protocol)
}
}
func (s *grpcConnectionManager) Peers() []transport.Peer {
var peers []transport.Peer
for _, curr := range s.connections.AllMatching(ByConnected()) {
peers = append(peers, curr.Peer())
}
return peers
}
func (s *grpcConnectionManager) Contacts() []transport.Contact {
return s.addressBook.stats()
}
func (s *grpcConnectionManager) Diagnostics() []core.DiagnosticResult {
return append(
[]core.DiagnosticResult{
lastCertificateValidationStatistic{*s.lastCertificateValidation.Load()},
ownPeerIDStatistic{s.config.peerID},
},
s.connections.Diagnostics()...)
}
// RegisterService implements grpc.ServiceRegistrar to register the gRPC services protocols expose.
func (s *grpcConnectionManager) RegisterService(desc *grpc.ServiceDesc, impl interface{}) {
s.grpcServer.RegisterService(desc, impl)
}
// openOutboundStreams instructs the protocols that support gRPC streaming to open their streams.
// The resulting grpc.ClientStream(s) must be registered on the Connection.
// If an error is returned the connection should be closed.
func (s *grpcConnectionManager) openOutboundStreams(connection Connection, grpcConn *grpc.ClientConn) error {
md, err := s.constructMetadata(connection.Peer().NodeDID.Empty())
if err != nil {
return err
}
protocolNum := 0
// Call gRPC-enabled protocols, block until they close
for _, protocol := range s.protocols {
clientStream, err := s.openOutboundStream(connection, protocol, grpcConn, md)
if err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldPeerAddr, grpcConn.Target()).
WithField(core.LogFieldPeerNodeDID, connection.Peer().NodeDID).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
Info("Failed to open gRPC stream")
if errors.As(err, new(fatalError)) {
// Error indicates connection should be closed.
return err
}
// Non-fatal error: other protocols may continue
continue
}
peer := connection.Peer() // work with a copy of peer to avoid race condition due to disconnect() resetting it
log.Logger().
WithField(core.LogFieldProtocolVersion, protocol.Version()).
WithFields(peer.ToFields()).
Debug("Opened gRPC stream")
s.notifyObservers(peer, protocol, transport.StateConnected)
go func() {
// Waits for the clientStream to be done (other side closed the stream), then we disconnect the connection on our side
<-clientStream.Context().Done()
s.notifyObservers(peer, protocol, transport.StateDisconnected)
connection.disconnect()
}()
protocolNum++
}
if protocolNum == 0 {
return fmt.Errorf("could not use any of the supported protocols to communicate with peer (id=%s)", connection.Peer())
}
s.peersCounter.Inc()
defer s.peersCounter.Dec()
// Function must block until streams are closed or disconnect() is called.
connection.waitUntilDisconnected()
if st := connection.closeError(); st != nil && st.Code() == codes.Unauthenticated {
// return error so entire connection will be tried anew. Otherwise, backoff isn't honored
return st.Err()
}
return nil
}
func (s *grpcConnectionManager) openOutboundStream(connection Connection, protocol Protocol, grpcConn grpc.ClientConnInterface, md metadata.MD) (grpc.ClientStream, error) {
outgoingContext := metadata.NewOutgoingContext(s.ctx, md)
clientStream, err := protocol.CreateClientStream(outgoingContext, grpcConn)
if err != nil {
return nil, fatalError{error: err}
}
// Read peer ID from metadata
peerHeaders, err := clientStream.Header()
if err != nil {
return nil, fatalError{error: fmt.Errorf("failed to read gRPC headers: %w", err)}
}
if len(peerHeaders) == 0 { // non-fatal error
return nil, fmt.Errorf("peer didn't send any headers, maybe the protocol version is not supported")
}
peerID, nodeDID, err := readMetadata(peerHeaders)
if err != nil {
return nil, fatalError{error: fmt.Errorf("failed to read peer ID header: %w", err)}
}
// Update connection information
if !connection.verifyOrSetPeerID(peerID) {
return nil, fatalError{error: fmt.Errorf("peer sent invalid ID (id=%s)", peerID)}
}
peerFromCtx, _ := grpcPeer.FromContext(clientStream.Context())
peer := connection.Peer()
// Add certificate so it is available during authentication
peer.Certificate = extractCertificate(peerFromCtx)
// Authenticate expected DID
if !peer.NodeDID.Empty() { // do not authenticate bootstrap connections
if nodeDID.Empty() {
// Peer might be in maintenance mode, try again later
return nil, fatalError{ErrNodeDIDAuthFailed}
}
if !peer.NodeDID.Equals(nodeDID) {
// DID no longer lives at this address, don't call this DID again!
return nil, fatalError{ErrUnexpectedNodeDID}
}
peer, err = s.authenticate(nodeDID, peer)
if err != nil {
return nil, fatalError{err}
}
}
connection.setPeer(peer)
wrappedStream := s.wrapStream(clientStream, protocol)
if !connection.registerStream(protocol, wrappedStream) {
// This can happen when the peer connected to us previously, and now we connect back to them.
log.Logger().
WithFields(connection.Peer().ToFields()).
Warn("We connected to a peer that we're already connected to")
return nil, fatalError{error: ErrAlreadyConnected}
}
return clientStream, nil
}
func (s *grpcConnectionManager) authenticate(nodeDID did.DID, peer transport.Peer) (transport.Peer, error) {
if !nodeDID.Empty() {
var err error
peer, err = s.authenticator.Authenticate(nodeDID, peer)
if err != nil {
log.Logger().
WithError(err).
WithFields(peer.ToFields()).
WithField(core.LogFieldDID, nodeDID).
Debug("Peer node DID could not be authenticated")
// Error message is spec'd by RFC017, because it is returned to the peer
return transport.Peer{}, ErrNodeDIDAuthFailed
}
}
return peer, nil
}
func extractCertificate(peerFromCtx *grpcPeer.Peer) *x509.Certificate {
tlsInfo, isTLS := peerFromCtx.AuthInfo.(credentials.TLSInfo)
if !isTLS || len(tlsInfo.State.PeerCertificates) == 0 {
return nil
}
return tlsInfo.State.PeerCertificates[0]
}
// revalidatePeers verifies for all peers the x509.Certificate provided during TLS handshake is still valid.
func (s *grpcConnectionManager) revalidatePeers() {
var err error
now := nowFunc()
s.lastCertificateValidation.Store(&now)
s.connections.forEach(func(conn Connection) {
peerCert := conn.Peer().Certificate
if peerCert == nil {
// This can happen when the denylist is updated while the node is trying to set up an outbound connection.
// See https://github.com/nuts-foundation/nuts-node/issues/2333
return
}
if nowFunc().After(peerCert.NotAfter) {
log.Logger().WithError(errors.New("certificate expired while in use")).WithFields(conn.Peer().ToFields()).Info("Disconnected peer")
conn.disconnect()
return
}
err = s.config.pkiValidator.Validate([]*x509.Certificate{peerCert})
if err != nil {
log.Logger().WithError(err).WithFields(conn.Peer().ToFields()).Warn("Disconnected peer")
conn.disconnect()
}
})
}
func (s *grpcConnectionManager) handleInboundStream(protocol Protocol, inboundStream grpc.ServerStream) error {
peerFromCtx, _ := grpcPeer.FromContext(inboundStream.Context())
log.Logger().
WithField(core.LogFieldPeerAddr, peerFromCtx.Addr.String()).
Trace("New peer connected")
// Send our headers
md, err := s.constructMetadata(false)
if err != nil {
return err
}
if err := inboundStream.SendHeader(md); err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldPeerAddr, peerFromCtx.Addr.String()).
Error("Unable to accept gRPC stream, unable to send headers")
return errors.New("unable to send headers")
}
// Build peer info and check it
md, ok := metadata.FromIncomingContext(inboundStream.Context())
if !ok {
return errors.New("unable to read metadata")
}
peerID, nodeDID, err := readMetadata(md)
if err != nil {
log.Logger().Debugf("Peer sent invalid peer ID, headers: %v", md)
return errors.New("unable to read peer ID")
}
peer := transport.Peer{
ID: peerID,
Address: peerFromCtx.Addr.String(), // this is including port number, so a unique value for inbound
Certificate: extractCertificate(peerFromCtx),
}
log.Logger().
WithFields(peer.ToFields()).
WithField(core.LogFieldProtocolVersion, protocol.Version()).
Debug("New inbound stream from peer")
peer, err = s.authenticate(nodeDID, peer)
if err != nil {
return err
}
// TODO: Need to authenticate PeerID, to make sure a second stream with a known PeerID is from the same node (maybe even connection).
// Use address from peer context?
connection, created := s.connections.getOrRegister(s.ctx, peer, false)
if created {
// If created is false, it's a second (or third...) protocol on the same connection
s.peersCounter.Inc()
defer s.peersCounter.Dec()
}
wrappedStream := s.wrapStream(inboundStream, protocol)
if !connection.registerStream(protocol, wrappedStream) {
return ErrAlreadyConnected
}
s.notifyObservers(peer, protocol, transport.StateConnected)
connection.waitUntilDisconnected()
s.notifyObservers(peer, protocol, transport.StateDisconnected)
s.connections.remove(connection)
return nil
}
func (s *grpcConnectionManager) constructMetadata(bootstrap bool) (metadata.MD, error) {
md := metadata.New(map[string]string{})
if bootstrap {
// Older nodes (< v5.1) only match on peerID.
// The postfix allows them to have a bootstrap connection and authenticated connection at the same time.
md.Set(peerIDHeader, string(s.config.peerID)+"-bootstrap")
return md, nil
}
md.Set(peerIDHeader, string(s.config.peerID))
if !s.nodeDID.Empty() {
md.Set(nodeDIDHeader, s.nodeDID.String())
}
return md, nil
}
func (s *grpcConnectionManager) wrapStream(stream Stream, protocol Protocol) prometheusStreamWrapper {
return prometheusStreamWrapper{
stream: stream,
protocol: protocol,
recvMessagesCounter: s.recvMessagesCounter,
sentMessagesCounter: s.sentMessagesCounter,
}
}
func (s *grpcConnectionManager) registerPrometheusMetrics() {
s.peersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "nuts",
Subsystem: "network",
Name: "peers",
Help: "Number of connected gRPC peers.",
})
_ = prometheus.Register(s.peersCounter)
s.sentMessagesCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "nuts",
Subsystem: "network_grpc",
Name: "messages_sent",
Help: "Number of gRPC messages sent per protocol and message type.",
}, []string{"protocol", "message_type"})
_ = prometheus.Register(s.sentMessagesCounter)
s.recvMessagesCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "nuts",
Subsystem: "network_grpc",
Name: "messages_received",
Help: "Number of gRPC messages received per protocol and message type.",
}, []string{"protocol", "message_type"})
_ = prometheus.Register(s.recvMessagesCounter)
}