
View on GitHub


0 mins
Test Coverage
 * Nuts node
 * Copyright (C) 2021 Nuts community
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <>.

package vcr

import (

// Ambassador registers a callback with the network for processing received Verifiable Credentials.
type Ambassador interface {
    // Configure instructs the ambassador to start receiving DID Documents from the network.
    Configure() error
    // Start the event subscriber for reprocessing transactions from the DAG when called
    Start() error

type ambassador struct {
    networkClient network.Transactions
    writer        types.Writer
    // verifier is used to store incoming revocations from the network
    verifier     verifier.Verifier
    eventManager events.Event

// NewAmbassador creates a new listener for the network that listens to Verifiable Credential transactions.
func NewAmbassador(networkClient network.Transactions, writer types.Writer, verifier verifier.Verifier, eventManager events.Event) Ambassador {
    return &ambassador{
        networkClient: networkClient,
        writer:        writer,
        verifier:      verifier,
        eventManager:  eventManager,

// Configure instructs the ambassador to start receiving DID Documents from the network.
func (n ambassador) Configure() error {
    err := n.networkClient.Subscribe("vcr_vcs", n.handleNetworkVCs,
        network.WithSelectionFilter(func(event dag.Event) bool {
            return event.Type == dag.PayloadEventType && event.Transaction.PayloadType() == types.VcDocumentType
    if err != nil {
        return err
    return n.networkClient.Subscribe("vcr_revocations", n.handleNetworkRevocations,
        network.WithSelectionFilter(func(event dag.Event) bool {
            return event.Type == dag.PayloadEventType && event.Transaction.PayloadType() == types.RevocationLDDocumentType

func (n ambassador) Start() error {
    stream := events.NewDisposableStream(
        fmt.Sprintf("%s_%s", events.ReprocessStream, "VCR"),
            fmt.Sprintf("%s.%s", events.ReprocessStream, types.VcDocumentType),
            fmt.Sprintf("%s.%s", events.ReprocessStream, types.RevocationLDDocumentType),
    conn, _, err := n.eventManager.Pool().Acquire(context.Background())
    if err != nil {
        return fmt.Errorf("failed to subscribe to REPROCESS event stream: %w", err)

    err = stream.Subscribe(conn, "VCR", fmt.Sprintf("%s.*", events.ReprocessStream), n.handleReprocessEvent)
    if err != nil {
        return fmt.Errorf("failed to subscribe to REPROCESS event stream: %v", err)

    // removing failed events required for #1743
    // remove after v6 release
    return n.networkClient.CleanupSubscriberEvents("vcr_vcs", "canonicalization failed: unable to normalize the json-ld document: loading remote context failed: Dereferencing a URL did not result in a valid JSON-LD context")

func (n ambassador) handleNetworkVCs(event dag.Event) (bool, error) {
    if err := n.vcCallback(event.Transaction, event.Payload); err != nil {
        return n.handleError(err)
    return true, nil

func (n ambassador) handleNetworkRevocations(event dag.Event) (bool, error) {
    if err := n.jsonLDRevocationCallback(event.Transaction, event.Payload); err != nil {
        return n.handleError(err)
    return true, nil

func (n ambassador) handleError(err error) (bool, error) {
    // Recoverable: context time-outs and cancellations (e.g. storage taking too long)
    if errors.Is(err, context.Canceled) ||
        errors.Is(err, context.DeadlineExceeded) {
        return false, err
    // Disallowed URLs (configurable) is "basic flow"; not an error, no need to retry
    if errors.Is(err, jsonld.ContextURLNotAllowedErr) {
        log.Logger().WithError(err).Debug("JSON-LD VC or revocation ignored; context not configured on allow list")
        return true, nil
    // Recoverable: loading remote JSON-LD documents.
    var jsonLDError *ld.JsonLdError
    if errors.As(err, &jsonLDError) &&
        jsonLDError.Code == ld.LoadingRemoteContextFailed &&
        !errors.Is(err, jsonld.ContextURLNotAllowedErr) {
        return false, err

    // TODO: other database/storage errors are also considered recoverable. VCR only uses go-leia for storage,
    //  which doesn't define a single error to recognize storage-related errors.
    //  This means go-leia error, which should be recoverable, can't be recognized as being recoverable.
    //  If they occur and cause inconsistencies, they can be fixed using `Reprocess(application/vc+json)`.
    // Other errors are non-recoverable
    return false, dag.EventFatal{Err: err}

func (n ambassador) handleReprocessEvent(msg *nats.Msg) {
    jsonBytes := msg.Data
    twp := events.TransactionWithPayload{}

    if err := msg.Ack(); err != nil {
            WithField(core.LogFieldEventSubject, msg.Subject).
            Error("Failed to process event: failed to ack message")

    if err := json.Unmarshal(jsonBytes, &twp); err != nil {
            WithField(core.LogFieldEventSubject, msg.Subject).
            Error("Failed to process event: failed to unmarshall data")

    if len(twp.Payload) != 0 { // private TXs not intended for us
        callback := n.getCallbackFn(twp.Transaction.PayloadType())
        if err := callback(twp.Transaction, twp.Payload); err != nil {
                WithField(core.LogFieldEventSubject, msg.Subject).
                Error("Failed to process event")

func (n ambassador) getCallbackFn(contentType string) func(dag.Transaction, []byte) error {
    switch contentType {
    case types.VcDocumentType:
        return n.vcCallback
    case types.RevocationLDDocumentType:
        return n.jsonLDRevocationCallback

    return func(tx dag.Transaction, payload []byte) error {
        return nil

// vcCallback gets called when new Verifiable Credentials are received by the network. All checks on the signature are already performed.
// The VCR is used to verify the contents of the credential.
// payload should be a json encoded vc.VerifiableCredential
func (n ambassador) vcCallback(tx dag.Transaction, payload []byte) error {
        WithField(core.LogFieldTransactionRef, tx.Ref()).
        Debug("Processing VC received from Nuts Network")

    target := vc.VerifiableCredential{}
    if err := json.Unmarshal(payload, &target); err != nil {
        return fmt.Errorf("credential processing failed: %w", err)

    // Verify and store
    validAt := tx.SigningTime()
    return n.writer.StoreCredential(target, &validAt)

// jsonLDRevocationCallback gets called when new credential revocations are received by the network.
// These revocations are in the form of a JSON-LD document.
// All checks on the signature are already performed.
// The VCR is used to verify the contents of the revocation.
// payload should be a json encoded Revocation
func (n ambassador) jsonLDRevocationCallback(tx dag.Transaction, payload []byte) error {
        WithField(core.LogFieldTransactionRef, tx.Ref()).
        Debug("Processing VC revocation received from Nuts Network")

    r := credential.Revocation{}
    if err := json.Unmarshal(payload, &r); err != nil {
        return fmt.Errorf("revocation processing failed: %w", err)

    return n.verifier.RegisterRevocation(r)