nuts-foundation/nuts-node

View on GitHub
vcr/ambassador.go

Summary

Maintainability
A
0 mins
Test Coverage
C
75%
/*
 * Nuts node
 * Copyright (C) 2021 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package vcr

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/nats-io/nats.go"
    "github.com/nuts-foundation/go-did/vc"
    "github.com/nuts-foundation/nuts-node/core"
    "github.com/nuts-foundation/nuts-node/events"
    "github.com/nuts-foundation/nuts-node/jsonld"
    "github.com/nuts-foundation/nuts-node/network"
    "github.com/nuts-foundation/nuts-node/network/dag"
    "github.com/nuts-foundation/nuts-node/vcr/credential"
    "github.com/nuts-foundation/nuts-node/vcr/log"
    "github.com/nuts-foundation/nuts-node/vcr/types"
    "github.com/nuts-foundation/nuts-node/vcr/verifier"
    "github.com/piprate/json-gold/ld"
)

// Ambassador registers a callback with the network for processing received Verifiable Credentials.
type Ambassador interface {
    // Configure instructs the ambassador to start receiving DID Documents from the network.
    Configure() error
    // Start the event subscriber for reprocessing transactions from the DAG when called
    Start() error
}

type ambassador struct {
    networkClient network.Transactions
    writer        types.Writer
    // verifier is used to store incoming revocations from the network
    verifier     verifier.Verifier
    eventManager events.Event
}

// NewAmbassador creates a new listener for the network that listens to Verifiable Credential transactions.
func NewAmbassador(networkClient network.Transactions, writer types.Writer, verifier verifier.Verifier, eventManager events.Event) Ambassador {
    return &ambassador{
        networkClient: networkClient,
        writer:        writer,
        verifier:      verifier,
        eventManager:  eventManager,
    }
}

// Configure instructs the ambassador to start receiving DID Documents from the network.
func (n ambassador) Configure() error {
    err := n.networkClient.Subscribe("vcr_vcs", n.handleNetworkVCs,
        n.networkClient.WithPersistency(),
        network.WithSelectionFilter(func(event dag.Event) bool {
            return event.Type == dag.PayloadEventType && event.Transaction.PayloadType() == types.VcDocumentType
        }))
    if err != nil {
        return err
    }
    return n.networkClient.Subscribe("vcr_revocations", n.handleNetworkRevocations,
        n.networkClient.WithPersistency(),
        network.WithSelectionFilter(func(event dag.Event) bool {
            return event.Type == dag.PayloadEventType && event.Transaction.PayloadType() == types.RevocationLDDocumentType
        }))
}

func (n ambassador) Start() error {
    stream := events.NewDisposableStream(
        fmt.Sprintf("%s_%s", events.ReprocessStream, "VCR"),
        []string{
            fmt.Sprintf("%s.%s", events.ReprocessStream, types.VcDocumentType),
            fmt.Sprintf("%s.%s", events.ReprocessStream, types.RevocationLDDocumentType),
        },
        network.MaxReprocessBufferSize)
    conn, _, err := n.eventManager.Pool().Acquire(context.Background())
    if err != nil {
        return fmt.Errorf("failed to subscribe to REPROCESS event stream: %w", err)
    }

    err = stream.Subscribe(conn, "VCR", fmt.Sprintf("%s.*", events.ReprocessStream), n.handleReprocessEvent)
    if err != nil {
        return fmt.Errorf("failed to subscribe to REPROCESS event stream: %v", err)
    }

    // removing failed events required for #1743
    // remove after v6 release
    return n.networkClient.CleanupSubscriberEvents("vcr_vcs", "canonicalization failed: unable to normalize the json-ld document: loading remote context failed: Dereferencing a URL did not result in a valid JSON-LD context")
}

func (n ambassador) handleNetworkVCs(event dag.Event) (bool, error) {
    if err := n.vcCallback(event.Transaction, event.Payload); err != nil {
        return n.handleError(err)
    }
    return true, nil
}

func (n ambassador) handleNetworkRevocations(event dag.Event) (bool, error) {
    if err := n.jsonLDRevocationCallback(event.Transaction, event.Payload); err != nil {
        return n.handleError(err)
    }
    return true, nil
}

func (n ambassador) handleError(err error) (bool, error) {
    // Recoverable: context time-outs and cancellations (e.g. storage taking too long)
    if errors.Is(err, context.Canceled) ||
        errors.Is(err, context.DeadlineExceeded) {
        return false, err
    }
    // Disallowed URLs (configurable) is "basic flow"; not an error, no need to retry
    if errors.Is(err, jsonld.ContextURLNotAllowedErr) {
        log.Logger().WithError(err).Debug("JSON-LD VC or revocation ignored; context not configured on allow list")
        return true, nil
    }
    // Recoverable: loading remote JSON-LD documents.
    var jsonLDError *ld.JsonLdError
    if errors.As(err, &jsonLDError) &&
        jsonLDError.Code == ld.LoadingRemoteContextFailed &&
        !errors.Is(err, jsonld.ContextURLNotAllowedErr) {
        return false, err
    }

    // TODO: other database/storage errors are also considered recoverable. VCR only uses go-leia for storage,
    //  which doesn't define a single error to recognize storage-related errors.
    //  This means go-leia error, which should be recoverable, can't be recognized as being recoverable.
    //  If they occur and cause inconsistencies, they can be fixed using `Reprocess(application/vc+json)`.
    // Other errors are non-recoverable
    return false, dag.EventFatal{Err: err}
}

func (n ambassador) handleReprocessEvent(msg *nats.Msg) {
    jsonBytes := msg.Data
    twp := events.TransactionWithPayload{}

    if err := msg.Ack(); err != nil {
        log.Logger().
            WithError(err).
            WithField(core.LogFieldEventSubject, msg.Subject).
            Error("Failed to process event: failed to ack message")
        return
    }

    if err := json.Unmarshal(jsonBytes, &twp); err != nil {
        log.Logger().
            WithError(err).
            WithField(core.LogFieldEventSubject, msg.Subject).
            Error("Failed to process event: failed to unmarshall data")
        return
    }

    if len(twp.Payload) != 0 { // private TXs not intended for us
        callback := n.getCallbackFn(twp.Transaction.PayloadType())
        if err := callback(twp.Transaction, twp.Payload); err != nil {
            log.Logger().
                WithError(err).
                WithField(core.LogFieldEventSubject, msg.Subject).
                Error("Failed to process event")
            return
        }
    }
}

func (n ambassador) getCallbackFn(contentType string) func(dag.Transaction, []byte) error {
    switch contentType {
    case types.VcDocumentType:
        return n.vcCallback
    case types.RevocationLDDocumentType:
        return n.jsonLDRevocationCallback
    }

    return func(tx dag.Transaction, payload []byte) error {
        return nil
    }
}

// vcCallback gets called when new Verifiable Credentials are received by the network. All checks on the signature are already performed.
// The VCR is used to verify the contents of the credential.
// payload should be a json encoded vc.VerifiableCredential
func (n ambassador) vcCallback(tx dag.Transaction, payload []byte) error {
    log.Logger().
        WithField(core.LogFieldTransactionRef, tx.Ref()).
        Debug("Processing VC received from Nuts Network")

    target := vc.VerifiableCredential{}
    if err := json.Unmarshal(payload, &target); err != nil {
        return fmt.Errorf("credential processing failed: %w", err)
    }

    // Verify and store
    validAt := tx.SigningTime()
    return n.writer.StoreCredential(target, &validAt)
}

// jsonLDRevocationCallback gets called when new credential revocations are received by the network.
// These revocations are in the form of a JSON-LD document.
// All checks on the signature are already performed.
// The VCR is used to verify the contents of the revocation.
// payload should be a json encoded Revocation
func (n ambassador) jsonLDRevocationCallback(tx dag.Transaction, payload []byte) error {
    log.Logger().
        WithField(core.LogFieldTransactionRef, tx.Ref()).
        Debug("Processing VC revocation received from Nuts Network")

    r := credential.Revocation{}
    if err := json.Unmarshal(payload, &r); err != nil {
        return fmt.Errorf("revocation processing failed: %w", err)
    }

    return n.verifier.RegisterRevocation(r)
}