nuts-foundation/nuts-node

View on GitHub
storage/leia.go

Summary

Maintainability
A
1 hr
Test Coverage
C
71%
/*
 * Copyright (C) 2022 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package storage

import (
    "context"
    "errors"
    "fmt"
    "github.com/nuts-foundation/go-leia/v4"
    "github.com/nuts-foundation/go-stoabs"
    "github.com/nuts-foundation/nuts-node/core"
    "github.com/nuts-foundation/nuts-node/storage/log"
)

// KVBackedLeiaStore is a wrapper interface for a leia.Store that uses a stoabs.KVStore as backup for any documents added.
type KVBackedLeiaStore interface {
    leia.Store
    // AddConfiguration adds a configuration for a collection to the store.
    // This is needed to know the kind of collection, the backup shelf name and the iterate query to fetch the documents.
    AddConfiguration(config LeiaBackupConfiguration)
    // HandleRestore migrates the data from the backup store to the leia.Store if needed.
    // It's up to the caller to create the indices on the leia.Collections first before calling this method.
    HandleRestore() error
}

type kvBackedLeiaStore struct {
    store               leia.Store
    backup              stoabs.KVStore
    collectionConfigSet map[string]LeiaBackupConfiguration
}

// NewKVBackedLeiaStore creates a wrapper around a leia.Store that uses a stoabs.KVStore as backup. Write operations (add/delete/update) are first performed on the backup store, then on the leia store.
// The backup store is not closed when Close is called. The leia.Store is closed when Close is called.
func NewKVBackedLeiaStore(store leia.Store, backup stoabs.KVStore) (KVBackedLeiaStore, error) {
    return &kvBackedLeiaStore{
        store:               store,
        backup:              backup,
        collectionConfigSet: map[string]LeiaBackupConfiguration{},
    }, nil
}

// LeiaBackupConfiguration contains the configuration for a collection that is backed by a stoabs.KVStore.
type LeiaBackupConfiguration struct {
    // CollectionName is the name of the collection in the leia.Store.
    CollectionName string
    CollectionType leia.CollectionType
    // BackupShelf is the name of the shelf in the backup store.
    BackupShelf string
    // SearchQuery is used to fill the backup shelf if not present.
    SearchQuery leia.QueryPath
}

type kvBackedCollection struct {
    backup     stoabs.KVStore
    config     LeiaBackupConfiguration
    underlying leia.Collection
}

func (k *kvBackedLeiaStore) AddConfiguration(config LeiaBackupConfiguration) {
    k.collectionConfigSet[config.CollectionName] = config
}

func (k *kvBackedLeiaStore) HandleRestore() error {
    // leia indices have been added, so the collection names have been added to the collectionConfigSet.
    // Loop over this set to check if the backup store contains any documents for these collections and add them to the leia.Store if the named collection is empty there.
    for _, config := range k.collectionConfigSet {
        if err := k.handleRestore(config); err != nil {
            return fmt.Errorf("error handling restore for collection %s: %w", config.CollectionName, err)
        }
    }
    return nil
}

func (k *kvBackedLeiaStore) handleRestore(config LeiaBackupConfiguration) error {
    backupPresent := k.backupStorePresent(config.BackupShelf)
    collection := k.store.Collection(config.CollectionType, config.CollectionName)

    storePresent := storePresent(collection, config)

    if backupPresent && storePresent {
        // both are filled => normal operation, done
        return nil
    }

    if !backupPresent && !storePresent {
        // both are non-existent => empty node, done
        return nil
    }

    if !storePresent {
        log.Logger().
            WithField(core.LogFieldStoreShelf, config.BackupShelf).
            Info("Missing index for shelf, rebuilding")
        // empty node, backup has been restored, refill store
        return k.backup.ReadShelf(context.Background(), config.BackupShelf, func(reader stoabs.Reader) error {
            return reader.Iterate(func(key stoabs.Key, value []byte) error {
                return collection.Add([]leia.Document{value})
            }, stoabs.BytesKey{})
        })
    }

    log.Logger().
        WithField(core.LogFieldStoreShelf, config.BackupShelf).
        Info("Missing store for shelf, creating from index")

    // else !backupPresent, process per 100
    query := leia.New(leia.NotNil(config.SearchQuery))

    const limit = 100
    type refDoc struct {
        ref leia.Reference
        doc leia.Document
    }

    writeDocuments := func(set []refDoc) error {
        return k.backup.Write(context.Background(), func(tx stoabs.WriteTx) error {
            writer := tx.GetShelfWriter(config.BackupShelf)
            for _, entry := range set {
                if err := writer.Put(stoabs.BytesKey(entry.ref), entry.doc); err != nil {
                    return err
                }
            }
            return nil
        })
    }

    set := make([]refDoc, 0, limit)
    err := collection.Iterate(query, func(ref leia.Reference, value []byte) error {
        set = append(set, refDoc{ref: ref, doc: value})
        if len(set) >= limit {
            err := writeDocuments(set)
            set = make([]refDoc, 0, limit)
            return err
        }
        return nil
    })
    if err != nil {
        return err
    }

    if len(set) > 0 {
        return writeDocuments(set)
    }
    return nil
}

func (k *kvBackedLeiaStore) Collection(collectionType leia.CollectionType, name string) leia.Collection {
    config, ok := k.collectionConfigSet[name]
    if !ok {
        // we panic here because this is a programming error, not a runtime error
        panic("JSON collection not configured")
    }
    if config.CollectionType != collectionType {
        // we panic here because this is a programming error, not a runtime error
        panic("Incorrect collection configuration")
    }
    underlying := kvBackedCollection{
        backup:     k.backup,
        config:     config,
        underlying: k.store.Collection(collectionType, name),
    }
    return underlying
}

func (k *kvBackedLeiaStore) Close() error {
    return k.store.Close()
}

func (k kvBackedCollection) AddIndex(index ...leia.Index) error {
    return k.underlying.AddIndex(index...)
}

func (k kvBackedCollection) DropIndex(name string) error {
    return k.underlying.DropIndex(name)
}

func (k kvBackedCollection) NewIndex(name string, parts ...leia.FieldIndexer) leia.Index {
    return k.underlying.NewIndex(name, parts...)
}

func (k kvBackedCollection) Add(jsonSet []leia.Document) error {
    // first in backup
    for _, doc := range jsonSet {
        ref := k.Reference(doc)

        if err := k.backup.WriteShelf(context.Background(), k.config.BackupShelf, func(writer stoabs.Writer) error {
            return writer.Put(stoabs.BytesKey(ref), doc)
        }); err != nil {
            return err
        }
    }
    // then in index
    return k.underlying.Add(jsonSet)
}

func (k kvBackedCollection) Get(ref leia.Reference) (leia.Document, error) {
    return k.underlying.Get(ref)
}

func (k kvBackedCollection) Delete(doc leia.Document) error {
    // first in backup
    ref := k.Reference(doc)

    if err := k.backup.WriteShelf(context.Background(), k.config.BackupShelf, func(writer stoabs.Writer) error {
        return writer.Put(stoabs.BytesKey(ref), doc)
    }); err != nil {
        return err
    }
    // then in index
    return k.underlying.Delete(doc)
}

func (k kvBackedCollection) Find(ctx context.Context, query leia.Query) ([]leia.Document, error) {
    return k.underlying.Find(ctx, query)
}

func (k kvBackedCollection) Reference(doc leia.Document) leia.Reference {
    return k.underlying.Reference(doc)
}

func (k kvBackedCollection) Iterate(query leia.Query, walker leia.DocumentWalker) error {
    return k.underlying.Iterate(query, walker)
}

func (k kvBackedCollection) IndexIterate(query leia.Query, fn leia.ReferenceScanFn) error {
    return k.underlying.IndexIterate(query, fn)
}

func (k kvBackedCollection) ValuesAtPath(document leia.Document, queryPath leia.QueryPath) ([]leia.Scalar, error) {
    return k.underlying.ValuesAtPath(document, queryPath)
}

func (k kvBackedCollection) DocumentCount() (int, error) {
    return k.underlying.DocumentCount()
}

func (k *kvBackedLeiaStore) backupStorePresent(backupShelf string) bool {
    backupPresent := false

    _ = k.backup.ReadShelf(context.Background(), backupShelf, func(reader stoabs.Reader) error {
        isEmpty, err := reader.Empty()
        backupPresent = !isEmpty
        return err
    })

    return backupPresent
}

func storePresent(collection leia.Collection, config LeiaBackupConfiguration) bool {
    issuedPresent := false
    query := leia.New(leia.NotNil(config.SearchQuery))
    _ = collection.IndexIterate(query, func(key []byte, value []byte) error {
        issuedPresent = true
        return errors.New("exit")
    })

    return issuedPresent
}