network/dag/dag.go
/*
* Copyright (C) 2021 Nuts community
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package dag
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sort"
"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/nuts-node/core"
"github.com/nuts-foundation/nuts-node/crypto/hash"
"github.com/nuts-foundation/nuts-node/network/log"
)
// metadataShelf is the name of the shelf that holds metadata.
const metadataShelf = "metadata"
// numberOfTransactionsKey is the key of the metadata property that holds the number of transactions on the DAG
const numberOfTransactionsKey = "tx_num"
// highestClockValue is the key of the metadata property that holds the highest lamport clock value of all transactions on the DAG
const highestClockValue = "lc_high"
// headRefKey is the of the metadata property that holds the latest HEAD of the DAG
const headRefKey = "head_ref"
// transactionsShelf is the name of the shelf that holds the actual transactions.
const transactionsShelf = "documents"
// headsShelf contains the name of the shelf the holds the heads.
const headsShelf = "heads"
// clockShelf is the name of the shelf that uses the Lamport clock as index to a set of TX refs.
const clockShelf = "clocks"
// TransactionCountDiagnostic is the name of the diagnostics result for the transaction count
const TransactionCountDiagnostic = "transaction_count"
type dag struct {
db stoabs.KVStore
}
type numberOfTransactionsStatistic struct {
numberOfTransactions uint
}
func (n numberOfTransactionsStatistic) Result() interface{} {
return n.numberOfTransactions
}
func (n numberOfTransactionsStatistic) Name() string {
return TransactionCountDiagnostic
}
func (n numberOfTransactionsStatistic) String() string {
return fmt.Sprintf("%d", n.numberOfTransactions)
}
type dataSizeStatistic struct {
sizeInBytes int64
}
func (s dataSizeStatistic) Result() interface{} {
return s.sizeInBytes
}
func (s dataSizeStatistic) Name() string {
return "stored_database_size_bytes"
}
func (s dataSizeStatistic) String() string {
return fmt.Sprintf("%d", s.sizeInBytes)
}
// newDAG creates a DAG backed by the given database.
func newDAG(db stoabs.KVStore) *dag {
return &dag{db: db}
}
func (d *dag) Migrate() error {
return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error {
writer := tx.GetShelfWriter(metadataShelf)
// Migrate highest LC value
// Todo: remove after V5 release
_, err := writer.Get(stoabs.BytesKey(highestClockValue))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Highest LC value not stored, migrating...")
highestLC := d.getHighestClockLegacy(tx)
err = d.setHighestClockValue(tx, highestLC)
}
if err != nil {
return err
}
// Migrate number of TXs
// Todo: remove after V5 release
_, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Number of transactions not stored, migrating...")
numberOfTXs := d.getNumberOfTransactionsLegacy(tx)
err = d.setNumberOfTransactions(tx, numberOfTXs)
}
if err != nil {
return err
}
// Migrate headsLegacy to single head
// Todo: remove after V6 release => then remove headsShelf
_, err = writer.Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Head not stored in metadata, migrating...")
heads := d.headsLegacy(tx)
err = nil // reset error
if len(heads) != 0 { // ignore for empty node
var latestHead hash.SHA256Hash
var latestLC uint32
for _, ref := range heads {
transaction, err := getTransaction(ref, tx)
if err != nil {
if errors.Is(err, ErrTransactionNotFound) {
return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref)
}
return err
}
if transaction.Clock() >= latestLC {
latestHead = ref
latestLC = transaction.Clock()
}
}
err = d.setHead(tx, latestHead)
}
}
if err != nil {
return err
}
return nil
})
}
func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult {
var stats Statistics
_ = d.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = d.statistics(tx)
return nil
})
result := make([]core.DiagnosticResult, 0)
result = append(result, numberOfTransactionsStatistic{numberOfTransactions: stats.NumberOfTransactions})
result = append(result, dataSizeStatistic{sizeInBytes: stats.DataSize})
return result
}
func (d dag) headsLegacy(ctx stoabs.ReadTx) []hash.SHA256Hash {
result := make([]hash.SHA256Hash, 0)
reader := ctx.GetShelfReader(headsShelf)
_ = reader.Iterate(func(key stoabs.Key, _ []byte) error {
result = append(result, hash.FromSlice(key.Bytes())) // FromSlice() copies
return nil
}, stoabs.HashKey{})
return result
}
func (d *dag) findBetweenLC(tx stoabs.ReadTx, startInclusive uint32, endExclusive uint32) ([]Transaction, error) {
var result []Transaction
err := d.visitBetweenLC(tx, startInclusive, endExclusive, func(transaction Transaction) bool {
result = append(result, transaction)
return true
})
if err != nil {
// Make sure not to return results in case of error
return nil, err
}
return result, nil
}
func (d *dag) visitBetweenLC(tx stoabs.ReadTx, startInclusive uint32, endExclusive uint32, visitor Visitor) error {
reader := tx.GetShelfReader(clockShelf)
// TODO: update to process in batches
return reader.Range(stoabs.Uint32Key(startInclusive), stoabs.Uint32Key(endExclusive), func(_ stoabs.Key, value []byte) error {
parsed := parseHashList(value)
// according to RFC004, lower byte value refs go first
sort.Slice(parsed, func(i, j int) bool {
return parsed[i].Compare(parsed[j]) <= 0
})
for _, next := range parsed {
transaction, err := getTransaction(next, tx)
if err != nil {
return err
}
visitor(transaction)
}
return nil
}, true)
}
func (d *dag) isPresent(tx stoabs.ReadTx, ref hash.SHA256Hash) bool {
return exists(tx.GetShelfReader(transactionsShelf), ref)
}
func (d *dag) add(tx stoabs.WriteTx, transactions ...Transaction) error {
highestLC := d.getHighestClockValue(tx)
headRef := hash.EmptyHash()
for _, transaction := range transactions {
if transaction != nil {
if err := d.addSingle(tx, transaction); err != nil {
return err
}
if transaction.Clock() > highestLC || transaction.Clock() == 0 {
highestLC = transaction.Clock()
headRef = transaction.Ref()
}
}
}
// update highest LC
if err := d.setHighestClockValue(tx, highestLC); err != nil {
return err
}
// update head
if !headRef.Equals(hash.EmptyHash()) {
if err := d.setHead(tx, headRef); err != nil {
return err
}
}
// update TX count
txCount := d.getNumberOfTransactions(tx) + uint64(len(transactions))
return d.setNumberOfTransactions(tx, txCount)
}
func (d dag) getNumberOfTransactions(tx stoabs.ReadTx) uint64 {
value, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(numberOfTransactionsKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
return 0
}
if err != nil {
log.Logger().
WithError(err).
Error("Unable to retrieve number of transactions")
return 0
}
return bytesToCount(value)
}
func (d dag) setNumberOfTransactions(tx stoabs.WriteTx, count uint64) error {
writer := tx.GetShelfWriter(metadataShelf)
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes[:], count)
return writer.Put(stoabs.BytesKey(numberOfTransactionsKey), bytes)
}
func (d dag) setHead(tx stoabs.WriteTx, ref hash.SHA256Hash) error {
writer := tx.GetShelfWriter(metadataShelf)
return writer.Put(stoabs.BytesKey(headRefKey), ref.Slice())
}
func (d dag) getHighestClockValue(tx stoabs.ReadTx) uint32 {
value, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(highestClockValue))
if errors.Is(err, stoabs.ErrKeyNotFound) {
return 0
}
if err != nil {
log.Logger().
WithError(err).
Error("Unable to retrieve highest LC value")
return 0
}
return bytesToClock(value)
}
// getHighestClockLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getHighestClockLegacy(tx stoabs.ReadTx) uint32 {
reader := tx.GetShelfReader(clockShelf)
var clock uint32
err := reader.Iterate(func(key stoabs.Key, _ []byte) error {
currentClock := uint32(key.(stoabs.Uint32Key))
if currentClock > clock {
clock = currentClock
}
return nil
}, stoabs.Uint32Key(0))
if err != nil {
log.Logger().
WithError(err).
Error("Failed to read clock shelf")
return 0
}
return clock
}
func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) {
head, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
return hash.EmptyHash(), nil
}
if err != nil {
return hash.EmptyHash(), err
}
return hash.FromSlice(head), nil
}
// getNumberOfTransactionsLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getNumberOfTransactionsLegacy(tx stoabs.ReadTx) uint64 {
reader := tx.GetShelfReader(transactionsShelf)
return uint64(reader.Stats().NumEntries)
}
func (d dag) setHighestClockValue(tx stoabs.WriteTx, count uint32) error {
writer := tx.GetShelfWriter(metadataShelf)
bytes := make([]byte, 4)
binary.BigEndian.PutUint32(bytes[:], count)
return writer.Put(stoabs.BytesKey(highestClockValue), bytes)
}
func (d dag) statistics(tx stoabs.ReadTx) Statistics {
var result Statistics
shelfStats := tx.GetShelfReader(transactionsShelf).Stats()
result.DataSize = int64(shelfStats.ShelfSize)
result.NumberOfTransactions = uint(d.getNumberOfTransactions(tx))
return result
}
func (d *dag) addSingle(tx stoabs.WriteTx, transaction Transaction) error {
ref := transaction.Ref()
refKey := stoabs.NewHashKey(ref)
transactions := tx.GetShelfWriter(transactionsShelf)
lc := tx.GetShelfWriter(clockShelf)
if exists(transactions, ref) {
log.Logger().
WithField(core.LogFieldTransactionRef, ref).
Trace("Transaction already exists, not adding it again.")
return nil
}
if len(transaction.Previous()) == 0 {
if getRoots(lc) != nil {
return errRootAlreadyExists
}
}
if err := indexClockValue(tx, transaction); err != nil {
return fmt.Errorf("unable to calculate LC value for %s: %w", ref, err)
}
return transactions.Put(refKey, transaction.Data())
}
func indexClockValue(tx stoabs.WriteTx, transaction Transaction) error {
lc := tx.GetShelfWriter(clockShelf)
clockKey := stoabs.Uint32Key(transaction.Clock())
ref := transaction.Ref()
currentRefs, err := lc.Get(clockKey)
if err != nil && !errors.Is(err, stoabs.ErrKeyNotFound) {
return err
}
for _, cRef := range parseHashList(currentRefs) {
if ref.Equals(cRef) {
// should only be in the list once
return nil
}
}
if err := lc.Put(clockKey, appendHashList(currentRefs, ref)); err != nil {
return err
}
log.Logger().
WithField(core.LogFieldTransactionRef, ref).
Tracef("Storing transaction logical clock (LC: %d)", clockKey)
return nil
}
func bytesToClock(clockBytes []byte) uint32 {
return binary.BigEndian.Uint32(clockBytes)
}
func bytesToCount(clockBytes []byte) uint64 {
return binary.BigEndian.Uint64(clockBytes)
}
func getRoots(lcBucket stoabs.Reader) []hash.SHA256Hash {
roots, err := lcBucket.Get(stoabs.Uint32Key(0))
if err != nil {
return nil
}
return parseHashList(roots) // no need to copy, calls FromSlice() (which copies)
}
// getTransaction returns the transaction, or an error. returns ErrTransactionNotFound if the transaction cannot be found.
func getTransaction(hash hash.SHA256Hash, tx stoabs.ReadTx) (Transaction, error) {
transactions := tx.GetShelfReader(transactionsShelf)
transactionBytes, err := transactions.Get(stoabs.NewHashKey(hash))
if errors.Is(err, stoabs.ErrKeyNotFound) {
return nil, ErrTransactionNotFound
}
if err != nil {
return nil, err
}
parsedTx, err := ParseTransaction(transactionBytes)
if err != nil {
return nil, fmt.Errorf("unable to parse transaction %s: %w", hash, err)
}
return parsedTx, nil
}
// exists checks whether the transaction with the given ref exists.
func exists(transactions stoabs.Reader, ref hash.SHA256Hash) bool {
_, err := transactions.Get(stoabs.NewHashKey(ref))
// stoabs.ErrKeyNotFound means that it does not exist. default to false for all other errors
return err == nil
}
// parseHashList splits a list of concatenated hashes into separate hashes.
func parseHashList(input []byte) []hash.SHA256Hash {
if len(input) == 0 {
return nil
}
num := (len(input) - (len(input) % hash.SHA256HashSize)) / hash.SHA256HashSize
result := make([]hash.SHA256Hash, num)
for i := 0; i < num; i++ {
result[i] = hash.FromSlice(input[i*hash.SHA256HashSize : i*hash.SHA256HashSize+hash.SHA256HashSize])
}
return result
}
func appendHashList(list []byte, h hash.SHA256Hash) []byte {
newList := make([]byte, 0, len(list)+hash.SHA256HashSize)
newList = append(newList, list...)
newList = append(newList, h.Slice()...)
return newList
}