network/network.go
/*
* Copyright (C) 2021 Nuts community
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package network
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/nuts-foundation/go-did/did"
"github.com/nuts-foundation/nuts-node/core"
"github.com/nuts-foundation/nuts-node/crypto"
"github.com/nuts-foundation/nuts-node/crypto/hash"
"github.com/nuts-foundation/nuts-node/events"
"github.com/nuts-foundation/nuts-node/network/dag"
"github.com/nuts-foundation/nuts-node/network/log"
"github.com/nuts-foundation/nuts-node/network/transport"
"github.com/nuts-foundation/nuts-node/network/transport/grpc"
"github.com/nuts-foundation/nuts-node/network/transport/v2"
"github.com/nuts-foundation/nuts-node/pki"
"github.com/nuts-foundation/nuts-node/storage"
"github.com/nuts-foundation/nuts-node/vdr/didnuts/didstore"
"github.com/nuts-foundation/nuts-node/vdr/resolver"
"go.etcd.io/bbolt"
)
var _ Transactions = (*Network)(nil)
var _ core.HealthCheckable = (*Network)(nil)
const (
// ModuleName specifies the name of this module.
ModuleName = "Network"
// softwareID contains the name of the vendor/implementation that's published in the node's diagnostic information.
softwareID = "https://github.com/nuts-foundation/nuts-node"
errEventFailedMsg = "failed to emit event for published transaction: %w"
// health check keys
healthTLS = "tls"
healthAuthConfig = "auth_config"
// newNodeConnectionDelay specifies how long a new node should delay connecting to newly discovered NutsComm addresses.
// If the node connects to every new DID immediately it will try to sync the remainder of the DAG with more and more peers at the same time.
// This generates a lot of network traffic with duplicate information and just slows down the actual synchronization.
newNodeConnectionDelay = 5 * time.Minute
)
// defaultBBoltOptions are given to bbolt, allows for package local adjustments during test
var defaultBBoltOptions = bbolt.DefaultOptions
// Network implements Transactions interface and Engine functions.
type Network struct {
config Config
certificate tls.Certificate
trustStore *core.TrustStore
strictMode bool
protocols []transport.Protocol
connectionManager transport.ConnectionManager
state dag.State
keyStore crypto.KeyStore
keyResolver resolver.KeyResolver
startTime atomic.Pointer[time.Time]
peerID transport.PeerID
nodeDID did.DID
didStore didstore.Store
didDocumentFinder resolver.DocFinder
serviceResolver resolver.ServiceResolver
eventPublisher events.Event
storeProvider storage.Provider
pkiValidator pki.Validator
// assumeNewNode indicates the node hasn't initially sync'd with the network.
assumeNewNode bool
selfTestDialer tls.Dialer
}
// CheckHealth performs health checks for the network engine.
func (n *Network) CheckHealth() map[string]core.Health {
results := make(map[string]core.Health)
if n.certificate.Leaf != nil {
results[healthTLS] = n.checkNodeTLSHealth()
}
// healthAuthConfig checks that the node is correctly configured to be authenticated by others
results[healthAuthConfig] = n.checkNodeDIDHealth(context.TODO(), n.nodeDID)
return results
}
func (n *Network) checkNodeTLSHealth() core.Health {
// TLS enabled, verify the configured certificate
_, err := n.certificate.Leaf.Verify(x509.VerifyOptions{
Roots: core.NewCertPool(n.trustStore.RootCAs),
Intermediates: core.NewCertPool(n.trustStore.IntermediateCAs),
})
if err != nil {
return core.Health{
Status: core.HealthStatusDown,
Details: err.Error(),
}
}
// check if the configured certificate is revoked / denied.
err = n.pkiValidator.Validate([]*x509.Certificate{n.certificate.Leaf})
if err != nil {
return core.Health{
Status: core.HealthStatusDown,
Details: err.Error(),
}
}
return core.Health{
Status: core.HealthStatusUp,
}
}
func (n *Network) Migrate() error {
return n.state.Migrate()
}
// NewNetworkInstance creates a new Network engine instance.
func NewNetworkInstance(
config Config,
store didstore.Store,
keyStore crypto.KeyStore,
eventPublisher events.Event,
storeProvider storage.Provider,
pkiValidator pki.Validator,
) *Network {
return &Network{
config: config,
keyResolver: resolver.DIDKeyResolver{Resolver: store},
keyStore: keyStore,
didStore: store,
didDocumentFinder: didstore.Finder{Store: store},
serviceResolver: resolver.DIDServiceResolver{Resolver: store},
eventPublisher: eventPublisher,
storeProvider: storeProvider,
pkiValidator: pkiValidator,
selfTestDialer: tls.Dialer{
NetDialer: &net.Dialer{
Timeout: time.Second,
},
Config: &tls.Config{InsecureSkipVerify: true}, // set during Configure. Connection is not used for any data exchange.
},
}
}
// Configure configures the Network subsystem
func (n *Network) Configure(config core.ServerConfig) error {
var err error
dagStore, err := n.storeProvider.GetKVStore("data", storage.PersistentStorageClass)
if err != nil {
return fmt.Errorf("unable to create database: %w", err)
}
nutsKeyResolver := dag.SourceTXKeyResolver{Resolver: n.didStore}
if n.state, err = dag.NewState(dagStore, dag.NewPrevTransactionsVerifier(), dag.NewTransactionSignatureVerifier(nutsKeyResolver)); err != nil {
return fmt.Errorf("failed to configure state: %w", err)
}
// load state
err = n.state.Configure(core.ServerConfig{})
if err != nil {
return err
}
n.strictMode = config.Strictmode
n.peerID = transport.PeerID(uuid.New().String())
// TLS
tlsEnabled := config.TLS.Enabled()
if tlsEnabled {
n.certificate, err = config.TLS.LoadCertificate()
if err != nil {
return err
}
n.trustStore, err = config.TLS.LoadTrustStore()
if err != nil {
return err
}
// set truststore so selfTestNutsCommAddress can verify the endpoint contains a valid certificate
n.selfTestDialer.Config, err = grpc.NewClientTLSConfig(&n.certificate, n.trustStore.CertPool, n.pkiValidator)
if err != nil {
return err
}
}
// Resolve node DID
if n.config.NodeDID != "" {
nodeDID, err := did.ParseDID(n.config.NodeDID)
if err != nil {
return fmt.Errorf("configured NodeDID is invalid: %w", err)
}
n.nodeDID = *nodeDID
} else {
log.Logger().Warn("Node DID not set, sending/receiving private transactions is disabled.")
}
// Configure protocols
// todo: correct config passing? (no defaults are not used in test context)
v2Cfg := n.config.ProtocolV2
v2Cfg.Datadir = config.Datadir
// Register enabled protocols
var candidateProtocols []transport.Protocol
if n.protocols == nil {
candidateProtocols = []transport.Protocol{
v2.New(v2Cfg, n.nodeDID, n.state, n.didStore, n.keyStore, n.collectDiagnosticsForPeers, dagStore),
}
} else {
// Only set protocols if not already set: improves testability
candidateProtocols = n.protocols
n.protocols = nil
}
for _, protocol := range candidateProtocols {
if n.config.IsProtocolEnabled(protocol.Version()) {
n.protocols = append(n.protocols, protocol)
}
}
for _, prot := range n.protocols {
err := prot.Configure(n.peerID)
if err != nil {
return fmt.Errorf("error while configuring protocol %T: %w", prot, err)
}
}
// Setup connection manager, load with bootstrap nodes
if n.connectionManager == nil {
grpcOpts := []grpc.ConfigOption{
grpc.WithConnectionTimeout(time.Duration(n.config.ConnectionTimeout) * time.Millisecond),
grpc.WithBackoff(func() grpc.Backoff {
return grpc.BoundedBackoff(time.Second, n.config.MaxBackoff)
}),
}
otherNodes, err := n.findVendorDIDs()
if err != nil {
return err
}
n.assumeNewNode = len(otherNodes) == 0
// Configure TLS
var authenticator grpc.Authenticator
if tlsEnabled {
grpcOpts = append(grpcOpts, grpc.WithTLS(n.certificate, n.trustStore, n.pkiValidator))
if config.TLS.Offload == core.OffloadIncomingTLS {
grpcOpts = append(grpcOpts, grpc.WithTLSOffloading(config.TLS.ClientCertHeaderName))
}
authenticator = grpc.NewTLSAuthenticator(n.serviceResolver)
} else {
// Not allowed in strict mode for security reasons: only intended for demo/workshop purposes.
if config.Strictmode {
if len(n.config.BootstrapNodes) == 0 && n.assumeNewNode {
log.Logger().Info("It appears the gRPC network will not be used (no bootstrap nodes and an empty network state), so disabled TLS is accepted even with strict mode enabled.")
} else {
return errors.New("disabling TLS in strict mode is not allowed")
}
}
authenticator = grpc.NewDummyAuthenticator(nil)
}
// Instantiate
connectionStore, err := n.storeProvider.GetKVStore("connections", storage.VolatileStorageClass)
if err != nil {
return fmt.Errorf("failed to open connections store: %w", err)
}
connectionManCfg, err := grpc.NewConfig(n.config.GrpcAddr, n.peerID, grpcOpts...)
if err != nil {
return err
}
n.connectionManager, err = grpc.NewGRPCConnectionManager(
connectionManCfg,
connectionStore,
n.nodeDID,
authenticator,
n.protocols...,
)
if err != nil {
return err
}
}
// register callback from DAG to other engines, with payload only.
if _, err = n.state.Notifier("nats", n.emitEvents,
dag.WithPersistency(dagStore),
dag.WithSelectionFilter(func(event dag.Event) bool {
return event.Type == dag.PayloadEventType
})); err != nil {
return err
}
return nil
}
func (n *Network) DiscoverServices(updatedDID did.DID) {
if !n.config.EnableDiscovery {
return
}
document, _, err := n.didStore.Resolve(updatedDID, nil)
if err != nil {
// VDR store is down. Any missed updates are resolved on node restart.
// This can happen when the VDR is receiving lots of DID updates, such as during the initial sync of the network.
log.Logger().WithError(err).
WithField(core.LogFieldDID, updatedDID.String()).
Debug("Service discovery could not read DID document after an update")
return
}
n.connectToDID(n.nodeDID, *document, true)
}
// emitEvents is called when a payload is added.
func (n *Network) emitEvents(event dag.Event) (bool, error) {
_, js, err := n.eventPublisher.Pool().Acquire(context.Background())
if err != nil {
return false, fmt.Errorf(errEventFailedMsg, err)
}
twp := events.TransactionWithPayload{
Transaction: event.Transaction,
Payload: event.Payload,
}
twpData, err := json.Marshal(twp)
if err != nil {
return false, fmt.Errorf(errEventFailedMsg, err)
}
if _, err = js.PublishAsync(events.TransactionsSubject, twpData); err != nil {
return false, fmt.Errorf(errEventFailedMsg, err)
}
return true, nil
}
// Name returns the module name.
func (n *Network) Name() string {
return ModuleName
}
// Config returns a pointer to the actual config of the module.
func (n *Network) Config() interface{} {
return &n.config
}
// Start initiates the Network subsystem
func (n *Network) Start() error {
startTime := time.Now()
n.startTime.Store(&startTime)
if err := n.state.Start(); err != nil {
return err
}
// Sanity check for configured node DID: can we resolve it and do we have the keys?
if !n.nodeDID.Empty() {
err := n.validateNodeDIDKeys(context.TODO(), n.nodeDID)
if err != nil && n.strictMode {
return err
}
}
for _, prot := range n.protocols {
if err := prot.Start(); err != nil {
return err
}
}
// Start connection management, protocols and connections
err := n.connectionManager.Start()
if err != nil {
return err
}
if err = n.connectToKnownNodes(n.nodeDID); err != nil {
return err
}
// Resume all notifiers. Notifiers may access other components of the network stack.
// To prevent nil derefs run the notifiers last. https://github.com/nuts-foundation/nuts-node/issues/3155
for _, notifier := range n.state.Notifiers() {
if err = notifier.Run(); err != nil {
return fmt.Errorf("failed to start notifiers: %w", err)
}
}
return nil
}
func (n *Network) connectToKnownNodes(nodeDID did.DID) error {
// Start connecting to bootstrap nodes
for _, bootstrapNode := range n.config.BootstrapNodes {
if len(strings.TrimSpace(bootstrapNode)) == 0 {
continue
}
n.connectionManager.Connect(bootstrapNode, did.DID{}, nil)
}
if !n.config.EnableDiscovery {
return nil
}
// start connecting to published NutsComm addresses
otherNodes, err := n.findVendorDIDs()
if err != nil {
return err
}
if n.assumeNewNode {
log.Logger().Infof("Assuming this is a new node, discovered NutsComm addresses are processed with a %s delay", newNodeConnectionDelay)
}
for _, node := range otherNodes {
n.connectToDID(nodeDID, node, false)
}
return nil
}
func (n *Network) connectToDID(nodeDID did.DID, node did.Document, resetDelay bool) {
if !nodeDID.Empty() && node.ID.Equals(nodeDID) {
// Found local node, do not discover.
return
}
inner:
for _, service := range node.Service {
if service.Type == transport.NutsCommServiceType {
var nutsCommUrl transport.NutsCommURL
if err := service.UnmarshalServiceEndpoint(&nutsCommUrl); err != nil {
log.Logger().
WithError(err).
WithField(core.LogFieldDID, node.ID.String()).
Debug("Failed to extract NutsComm address from service")
continue inner
}
log.Logger().
WithField(core.LogFieldDID, node.ID.String()).
WithField(core.LogFieldNodeAddress, nutsCommUrl.Host).
Debug("Discovered Nuts node")
var delay *time.Duration
if resetDelay {
if n.assumeNewNode && time.Since(*n.startTime.Load()) < newNodeConnectionDelay {
// Connect to NutsComm addresses with a delay.
tmp := newNodeConnectionDelay
delay = &tmp
} else {
// Connect without any delay
tmp := time.Duration(0)
delay = &tmp
}
} // else: leave any existing delay unchanged
n.connectionManager.Connect(nutsCommUrl.Host, node.ID, delay)
}
}
}
func (n *Network) validateNodeDIDKeys(ctx context.Context, nodeDID did.DID) error {
// Check if DID document can be resolved
document, _, err := n.didStore.Resolve(nodeDID, nil)
if err != nil {
return fmt.Errorf("DID document can't be resolved (did=%s): %w", nodeDID, err)
}
// Check if the key agreement keys can be resolved
if len(document.KeyAgreement) == 0 {
return fmt.Errorf("DID document does not contain a keyAgreement key, register a keyAgreement key (did=%s)", nodeDID)
}
for _, keyAgreement := range document.KeyAgreement {
exists, err := n.keyStore.Exists(ctx, keyAgreement.ID.String())
if err != nil {
return fmt.Errorf("error checking keyAgreement key existence (did=%s,kid=%s): %w", nodeDID, keyAgreement.ID, err)
}
if !exists {
return fmt.Errorf("keyAgreement private key is not present in key store, recover your key material or register a new keyAgreement key (did=%s,kid=%s)", nodeDID, keyAgreement.ID)
}
}
return nil
}
func (n *Network) checkNodeDIDHealth(ctx context.Context, nodeDID did.DID) core.Health {
if nodeDID.Empty() {
return core.Health{
Status: core.HealthStatusUp,
Details: "no node DID",
}
}
if err := n.validateNodeDIDKeys(ctx, nodeDID); err != nil {
return core.Health{
Status: core.HealthStatusDown,
Details: fmt.Sprintf("cannot verify DID ownership: %s", err.Error()),
}
}
// Check if the DID document has a resolvable and valid NutsComm endpoint
serviceRef := resolver.MakeServiceReference(nodeDID, transport.NutsCommServiceType)
nutsCommService, err := n.serviceResolver.Resolve(serviceRef, resolver.DefaultMaxServiceReferenceDepth)
if err != nil {
// Non-existing NutsComm results in HealthStatusUnknown to make it easier to fix the issue (HealthStatusDown kills the node in certain environments)
return core.Health{
Status: core.HealthStatusUnknown,
Details: fmt.Sprintf("unable to resolve %s service endpoint, register it on the DID document (did=%s): %s", transport.NutsCommServiceType, nodeDID, err),
}
}
var nutsCommURL transport.NutsCommURL
if err = nutsCommService.UnmarshalServiceEndpoint(&nutsCommURL); err != nil {
return core.Health{
Status: core.HealthStatusUnknown,
Details: fmt.Sprintf("invalid %s service endpoint: %s", transport.NutsCommServiceType, err),
}
}
// Check certificate and confirm it contains the NutsComm address
if n.certificate.Leaf == nil {
return core.Health{
Status: core.HealthStatusDown,
Details: "missing TLS certificate",
}
}
if err = n.certificate.Leaf.VerifyHostname(nutsCommURL.Hostname()); err != nil {
return core.Health{
Status: core.HealthStatusDown,
Details: fmt.Sprintf("none of the DNS names in TLS certificate match the %s service endpoint (nodeDID=%s, %s=%s)", transport.NutsCommServiceType, nodeDID, transport.NutsCommServiceType, nutsCommURL.String()),
}
}
// Check if we can connect to the NutsComm endpoint
if err = n.selfTestNutsCommAddress(nutsCommURL); err != nil {
return core.Health{
Status: core.HealthStatusUnknown,
Details: fmt.Sprintf("cannot connect to own NutsComm %s: %s", nutsCommURL.String(), err),
}
}
return core.Health{Status: core.HealthStatusUp}
}
// selfTestNutsCommAddress verifies that the NutsComm address can be reached and returns a valid certificate.
func (n *Network) selfTestNutsCommAddress(nutsComm transport.NutsCommURL) error {
nutsCommAddr := strings.TrimPrefix(nutsComm.String(), "grpc://")
conn, err := n.selfTestDialer.Dial("tcp", nutsCommAddr)
if err != nil {
return err
}
// defer Close just in case
defer func() {
if err = conn.Close(); err != nil {
log.Logger().WithError(err).Warn("Failed to close self-check connection")
}
}()
return nil
}
// Subscribe registers a receiverFn with specific options.
// The receiver is called when a transaction is added to the DAG.
// It's only called if the given dag.NotificationFilter's match.
func (n *Network) Subscribe(name string, subscriber dag.ReceiverFn, options ...SubscriberOption) error {
notifierOptions := make([]dag.NotifierOption, len(options))
for i, o := range options {
notifierOptions[i] = o()
}
_, err := n.state.Notifier(name, subscriber, notifierOptions...)
return err
}
func (n *Network) Subscribers() []dag.Notifier {
if n.state != nil {
return n.state.Notifiers()
}
return []dag.Notifier{}
}
func (n *Network) CleanupSubscriberEvents(subscriberName, errorPrefix string) error {
for _, subscriber := range n.Subscribers() {
if subscriber.Name() == subscriberName {
events, err := subscriber.GetFailedEvents()
if err != nil {
return err
}
for _, event := range events {
if strings.HasPrefix(event.Error, errorPrefix) {
if err := subscriber.Finished(event.Hash); err != nil {
return err
}
}
}
}
}
return nil
}
// GetTransaction retrieves the transaction for the given reference. If the transaction is not known, an error is returned.
func (n *Network) GetTransaction(transactionRef hash.SHA256Hash) (dag.Transaction, error) {
return n.state.GetTransaction(context.Background(), transactionRef)
}
// GetTransactionPayload retrieves the transaction Payload for the given transaction. If the transaction or Payload is not found
// nil is returned.
func (n *Network) GetTransactionPayload(transactionRef hash.SHA256Hash) ([]byte, error) {
transaction, err := n.state.GetTransaction(context.Background(), transactionRef)
if err != nil {
if errors.Is(err, dag.ErrTransactionNotFound) {
// convert ErrPayloadNotFound for simpler error handling
return nil, dag.ErrPayloadNotFound
}
return nil, err
}
return n.state.ReadPayload(context.Background(), transaction.PayloadHash())
}
// ListTransactionsInRange returns all transactions known to this Network instance with lamport clock value between startInclusive and endExclusive.
func (n *Network) ListTransactionsInRange(startInclusive uint32, endExclusive uint32) ([]dag.Transaction, error) {
return n.state.FindBetweenLC(context.Background(), startInclusive, endExclusive)
}
// CreateTransaction creates a new transaction from the given template.
func (n *Network) CreateTransaction(ctx context.Context, template Template) (dag.Transaction, error) {
payloadHash := hash.SHA256Sum(template.Payload)
log.Logger().
WithField(core.LogFieldTransactionType, template.Type).
WithField(core.LogFieldTransactionPayloadHash, payloadHash).
WithField(core.LogFieldTransactionPayloadLength, len(template.Payload)).
WithField(core.LogFieldTransactionIsPrivate, len(template.Participants) > 0).
WithField(core.LogFieldKeyID, template.KID).
Debug("Creating transaction")
// Assert that all additional prevs are present and its Payload is there
for _, prev := range template.AdditionalPrevs {
isPresent, err := n.isPayloadPresent(ctx, prev)
if err != nil {
return nil, err
}
if !isPresent {
return nil, fmt.Errorf("additional prev is unknown or missing payload (prev=%s)", prev)
}
}
// Assert node DID is configured when participants are specified
if len(template.Participants) > 0 {
if n.nodeDID.Empty() {
return nil, errors.New("node DID must be configured to create private transactions")
}
}
// get head
head, err := n.state.Head(ctx)
prevs := make([]hash.SHA256Hash, 0)
if err != nil {
return nil, fmt.Errorf("unable to get current head of the DAG: %w", err)
}
if !head.Equals(hash.EmptyHash()) {
prevs = append(prevs, head)
} else if len(template.AdditionalPrevs) != 0 {
return nil, fmt.Errorf("cannot have previous transactions on root transaction")
}
// and additional prevs
prevs = append(prevs, template.AdditionalPrevs...)
// Encrypt PAL, making the TX private (if participants are specified)
var pal [][]byte
if len(template.Participants) > 0 {
pal, err = template.Participants.Encrypt(n.keyResolver)
if err != nil {
return nil, fmt.Errorf("unable to encrypt PAL header for new transaction: %w", err)
}
}
// Calculate clock value
// Todo: optimize with getting current Head. LC will always be Head LC + 1
lamportClock, err := n.calculateLamportClock(ctx, prevs)
if err != nil {
return nil, fmt.Errorf("unable to calculate clock value for new transaction: %w", err)
}
// Create transaction
unsignedTransaction, err := dag.NewTransaction(payloadHash, template.Type, prevs, pal, lamportClock)
if err != nil {
return nil, fmt.Errorf("unable to create new transaction: %w", err)
}
// Sign it
var transaction dag.Transaction
signer := dag.NewTransactionSigner(n.keyStore, template.KID, template.PublicKey)
timestamp := time.Now()
if !template.Timestamp.IsZero() {
timestamp = template.Timestamp
}
transaction, err = signer.Sign(ctx, unsignedTransaction, timestamp)
if err != nil {
return nil, fmt.Errorf("unable to sign newly created transaction: %w", err)
}
// Store in local State and publish it
if err = n.state.Add(ctx, transaction, template.Payload); err != nil {
return nil, fmt.Errorf("unable to add newly created transaction to State: %w", err)
}
log.Logger().
WithField(core.LogFieldTransactionRef, transaction.Ref()).
WithField(core.LogFieldTransactionType, template.Type).
WithField(core.LogFieldTransactionPayloadLength, len(template.Payload)).
Info("Transaction created")
return transaction, nil
}
func (n *Network) calculateLamportClock(ctx context.Context, prevs []hash.SHA256Hash) (uint32, error) {
// the root has 0
if len(prevs) == 0 {
return 0, nil
}
var clock uint32
for _, prev := range prevs {
// GetTransaction always supplies an LC value, either calculated or stored
tx, err := n.state.GetTransaction(ctx, prev)
if err != nil {
return 0, err
}
if tx.Clock() > clock {
clock = tx.Clock()
}
}
// add one
return clock + 1, nil
}
// Shutdown cleans up any leftover go routines
func (n *Network) Shutdown() error {
// Stop protocols and connection manager
for _, prot := range n.protocols {
prot.Stop()
}
n.connectionManager.Stop()
err := n.state.Shutdown()
if err != nil {
return err
}
return nil
}
// Diagnostics collects and returns diagnostics for the Network engine.
func (n *Network) Diagnostics() []core.DiagnosticResult {
var results = make([]core.DiagnosticResult, 0)
// Connection manager and protocols
results = append(results, core.DiagnosticResultMap{Title: "connections", Items: n.connectionManager.Diagnostics()})
for _, prot := range n.protocols {
results = append(results, core.DiagnosticResultMap{Title: fmt.Sprintf("protocol_v%d", prot.Version()), Items: prot.Diagnostics()})
}
// DAG
if graph, ok := n.state.(core.Diagnosable); ok {
results = append(results, core.DiagnosticResultMap{Title: "state", Items: graph.Diagnostics()})
}
// NodeDID
results = append(results, core.GenericDiagnosticResult{
Title: "node_did",
Outcome: n.nodeDID,
})
return results
}
// PeerDiagnostics returns a map containing diagnostic information of the node's peers. The key contains the remote peer's ID.
func (n *Network) PeerDiagnostics() map[transport.PeerID]transport.Diagnostics {
result := make(map[transport.PeerID]transport.Diagnostics, 0)
// We assume higher protocol versions (later in the slice) have better/more accurate diagnostics,
// so for now they're copied over diagnostics of earlier versions, unless the entry is empty for that peer.
// We assume the diagnostic result is empty when it lists no peers (since it has at least 1 peer: the local node).
for _, prot := range n.protocols {
for peerID, peerDiagnostics := range prot.PeerDiagnostics() {
if _, exists := result[peerID]; !exists || len(peerDiagnostics.Peers) > 0 {
result[peerID] = peerDiagnostics
}
}
}
return result
}
func (n *Network) AddressBook() []transport.Contact {
return n.connectionManager.Contacts()
}
// ReprocessReport describes the reprocess exection.
type ReprocessReport struct {
// reserved for future use
}
func (n *Network) Reprocess(ctx context.Context, contentType string) (*ReprocessReport, error) {
log.Logger().Infof("Starting reprocess of %s", contentType)
_, js, err := n.eventPublisher.Pool().Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("reprocess abort on message client: %w", err)
}
// The Lamport's clock stamps count from 0, with a step size of 1.
const clockSteps = 1000
for offset := 0; ; offset += clockSteps {
end := offset + clockSteps
if end >= 1<<30 {
return nil, errors.New("reprocess abort on Lamport clock int overflow")
}
txs, err := n.state.FindBetweenLC(ctx, uint32(offset), uint32(end))
if err != nil {
return nil, fmt.Errorf("reprocess abort on transaction lookup, clock range [%d, %d): %w", offset, end, err)
}
for _, tx := range txs {
if tx.PayloadType() != contentType {
continue // filter
}
// add to Nats
subject := fmt.Sprintf("%s.%s", events.ReprocessStream, contentType)
payload, err := n.state.ReadPayload(ctx, tx.PayloadHash())
if err != nil && !errors.Is(err, dag.ErrPayloadNotFound) {
return nil, fmt.Errorf("reprocess abort on transaction %s payload %s: %w", tx.Ref(), tx.PayloadHash(), err)
}
twp := events.TransactionWithPayload{
Transaction: tx,
Payload: payload,
}
data, _ := json.Marshal(twp)
log.Logger().
WithField(core.LogFieldTransactionRef, tx.Ref()).
WithField(core.LogFieldEventSubject, subject).
Trace("Publishing transaction")
_, err = js.PublishAsync(subject, data)
if err != nil {
return nil, fmt.Errorf("reprocess abort on transaction %s publish: %w", tx.Ref(), err)
}
}
if len(txs) == 0 {
break
}
lastTick := txs[len(txs)-1].Clock()
if int(uint(lastTick))+1 < end {
break
}
// Workaround Nuts stoabs package which locks updates on any pending read
// transactions.
time.Sleep(time.Second)
}
// flush publish queue
select {
case <-js.PublishAsyncComplete():
break
case <-ctx.Done():
return nil, fmt.Errorf("reprocess terminate before completing succesful: %w", ctx.Err())
}
log.Logger().Infof("Successfully completed reprocess of %s", contentType)
return new(ReprocessReport), nil
}
func (n *Network) collectDiagnosticsForPeers() transport.Diagnostics {
stateDiagnostics := n.state.Diagnostics()
transactionCount := uint(0)
for _, diagnostic := range stateDiagnostics {
if diagnostic.Name() == dag.TransactionCountDiagnostic {
transactionCount = diagnostic.Result().(uint)
}
}
result := transport.Diagnostics{
Uptime: time.Since(*n.startTime.Load()),
NumberOfTransactions: uint32(transactionCount),
SoftwareVersion: fmt.Sprintf("%s (%s)", core.GitBranch, core.GitCommit),
SoftwareID: softwareID,
}
for _, peer := range n.connectionManager.Peers() {
result.Peers = append(result.Peers, peer.ID)
}
return result
}
func (n *Network) isPayloadPresent(ctx context.Context, txRef hash.SHA256Hash) (bool, error) {
tx, err := n.state.GetTransaction(ctx, txRef)
if err != nil {
if errors.Is(err, dag.ErrTransactionNotFound) {
return false, nil
}
return false, err
}
return n.state.IsPayloadPresent(ctx, tx.PayloadHash())
}
func (n *Network) findVendorDIDs() ([]did.Document, error) {
result, err := n.didDocumentFinder.Find(resolver.IsActive(), resolver.ValidAt(time.Now()), resolver.ByServiceType(transport.NutsCommServiceType))
if err != nil {
return nil, err
}
return result, nil
}