Fantom-foundation/go-lachesis

View on GitHub
vector/index.go

Summary

Maintainability
C
1 day
Test Coverage
package vector

import (
    lru "github.com/hashicorp/golang-lru"

    "github.com/Fantom-foundation/go-lachesis/hash"
    "github.com/Fantom-foundation/go-lachesis/inter"
    "github.com/Fantom-foundation/go-lachesis/inter/idx"
    "github.com/Fantom-foundation/go-lachesis/inter/pos"
    "github.com/Fantom-foundation/go-lachesis/kvdb"
    "github.com/Fantom-foundation/go-lachesis/kvdb/flushable"
    "github.com/Fantom-foundation/go-lachesis/kvdb/table"
    "github.com/Fantom-foundation/go-lachesis/logger"
)

const (
    forklessCauseCacheSize     = 5000
    highestBeforeSeqCacheSize  = 1000
    highestBeforeTimeCacheSize = 1000
    lowestAfterSeqCacheSize    = 1000
)

// IndexCacheConfig - config for cache sizes of Index
type IndexCacheConfig struct {
    ForklessCause     int `json:"forklessCause"`
    HighestBeforeSeq  int `json:"highestBeforeSeq"`
    HighestBeforeTime int `json:"highestBeforeTime"`
    LowestAfterSeq    int `json:"lowestAfterSeq"`
}

// IndexConfig - Index config (cache sizes)
type IndexConfig struct {
    Caches IndexCacheConfig `json:"cacheSizes"`
}

// Index is a data to detect forkless-cause condition, calculate median timestamp, detect forks.
type Index struct {
    validators    *pos.Validators
    validatorIdxs map[idx.StakerID]idx.Validator

    bi *branchesInfo

    getEvent func(hash.Event) *inter.EventHeaderData

    vecDb kvdb.FlushableKeyValueStore
    table struct {
        HighestBeforeSeq  kvdb.KeyValueStore `table:"S"`
        HighestBeforeTime kvdb.KeyValueStore `table:"T"`
        LowestAfterSeq    kvdb.KeyValueStore `table:"s"`

        EventBranch  kvdb.KeyValueStore `table:"b"`
        BranchesInfo kvdb.KeyValueStore `table:"B"`
    }

    cache struct {
        HighestBeforeSeq  *lru.Cache
        HighestBeforeTime *lru.Cache
        LowestAfterSeq    *lru.Cache
        ForklessCause     *lru.Cache
    }

    cfg IndexConfig

    logger.Instance
}

// DefaultIndexConfig return default index config for tests
func DefaultIndexConfig() IndexConfig {
    return IndexConfig{
        Caches: IndexCacheConfig{
            ForklessCause:     forklessCauseCacheSize,
            HighestBeforeSeq:  highestBeforeSeqCacheSize,
            HighestBeforeTime: highestBeforeTimeCacheSize,
            LowestAfterSeq:    lowestAfterSeqCacheSize,
        },
    }
}

// NewIndex creates Index instance.
func NewIndex(config IndexConfig, validators *pos.Validators, db kvdb.KeyValueStore, getEvent func(hash.Event) *inter.EventHeaderData) *Index {
    vi := &Index{
        Instance: logger.MakeInstance(),
        cfg:      config,
    }
    vi.cache.ForklessCause, _ = lru.New(vi.cfg.Caches.ForklessCause)
    vi.cache.HighestBeforeSeq, _ = lru.New(vi.cfg.Caches.HighestBeforeSeq)
    vi.cache.HighestBeforeTime, _ = lru.New(vi.cfg.Caches.HighestBeforeTime)
    vi.cache.LowestAfterSeq, _ = lru.New(vi.cfg.Caches.LowestAfterSeq)
    vi.Reset(validators, db, getEvent)

    return vi
}

// Reset resets buffers.
func (vi *Index) Reset(validators *pos.Validators, db kvdb.KeyValueStore, getEvent func(hash.Event) *inter.EventHeaderData) {
    // we use wrapper to be able to drop failed events by dropping cache
    vi.getEvent = getEvent
    vi.vecDb = flushable.Wrap(db)
    vi.validators = validators.Copy()
    vi.validatorIdxs = validators.Idxs()
    vi.DropNotFlushed()
    vi.cache.ForklessCause.Purge()
    vi.dropDependentCaches()

    table.MigrateTables(&vi.table, vi.vecDb)
}

func (vi *Index) dropDependentCaches() {
    vi.cache.HighestBeforeSeq.Purge()
    vi.cache.HighestBeforeTime.Purge()
    vi.cache.LowestAfterSeq.Purge()
}

// Add calculates vector clocks for the event and saves into DB.
func (vi *Index) Add(e *inter.EventHeaderData) {
    // sanity check
    if vi.GetHighestBeforeSeq(e.Hash()) != nil {
        vi.Log.Warn("Event already exists", "event", e.Hash().String())
        return
    }
    vi.initBranchesInfo()
    _ = vi.fillEventVectors(e)
}

// Flush writes vector clocks to persistent store.
func (vi *Index) Flush() {
    if vi.bi != nil {
        vi.setBranchesInfo(vi.bi)
    }
    if err := vi.vecDb.Flush(); err != nil {
        vi.Log.Crit("Failed to flush db", "err", err)
    }
}

// DropNotFlushed not connected clocks. Call it if event has failed.
func (vi *Index) DropNotFlushed() {
    vi.bi = nil
    if vi.vecDb.NotFlushedPairs() != 0 {
        vi.vecDb.DropNotFlushed()
        vi.dropDependentCaches()
    }
}

func (vi *Index) fillGlobalBranchID(e *inter.EventHeaderData, meIdx idx.Validator) idx.Validator {
    // sanity checks
    if len(vi.bi.BranchIDCreatorIdxs) != len(vi.bi.BranchIDLastSeq) {
        vi.Log.Crit("Inconsistent BranchIDCreators len (inconsistent DB)", "event", e.String())
    }
    if len(vi.bi.BranchIDCreatorIdxs) < vi.validators.Len() {
        vi.Log.Crit("Inconsistent BranchIDCreators len (inconsistent DB)", "event", e.String())
    }

    if e.SelfParent() == nil {
        // is it first event indeed?
        if vi.bi.BranchIDLastSeq[meIdx] == 0 {
            // OK, not a new fork
            vi.bi.BranchIDLastSeq[meIdx] = e.Seq
            return meIdx
        }
    } else {
        selfParentBranchID := vi.getEventBranchID(*e.SelfParent())
        // sanity checks
        if len(vi.bi.BranchIDCreatorIdxs) != len(vi.bi.BranchIDLastSeq) {
            vi.Log.Crit("Inconsistent BranchIDCreators len (inconsistent DB)", "event", e.String())
        }

        if vi.bi.BranchIDLastSeq[selfParentBranchID]+1 == e.Seq {
            vi.bi.BranchIDLastSeq[selfParentBranchID] = e.Seq
            // OK, not a new fork
            return selfParentBranchID
        }
    }

    // if we're here, then new fork is observed (only globally), create new branchID due to a new fork
    vi.bi.BranchIDLastSeq = append(vi.bi.BranchIDLastSeq, e.Seq)
    vi.bi.BranchIDCreatorIdxs = append(vi.bi.BranchIDCreatorIdxs, meIdx)
    newBranchID := idx.Validator(len(vi.bi.BranchIDLastSeq) - 1)
    vi.bi.BranchIDByCreators[meIdx] = append(vi.bi.BranchIDByCreators[meIdx], newBranchID)
    return newBranchID
}

func (vi *Index) setForkDetected(beforeSeq HighestBeforeSeq, branchID idx.Validator) {
    creatorIdx := vi.bi.BranchIDCreatorIdxs[branchID]
    for _, branchID := range vi.bi.BranchIDByCreators[creatorIdx] {
        beforeSeq.Set(idx.Validator(branchID), forkDetectedSeq)
    }
    // sanity check
    if !vi.atLeastOneFork() {
        vi.Log.Crit("Not written the correct branches info (inconsistent DB)")
    }
}

// fillEventVectors calculates (and stores) event's vectors, and updates LowestAfter of newly-observed events.
func (vi *Index) fillEventVectors(e *inter.EventHeaderData) allVecs {
    meIdx := vi.validatorIdxs[e.Creator]
    myVecs := allVecs{
        beforeSeq:  NewHighestBeforeSeq(len(vi.bi.BranchIDCreatorIdxs)),
        beforeTime: NewHighestBeforeTime(len(vi.bi.BranchIDCreatorIdxs)),
        after:      NewLowestAfterSeq(len(vi.bi.BranchIDCreatorIdxs)),
    }

    meBranchID := vi.fillGlobalBranchID(e, meIdx)

    // pre-load parents into RAM for quick access
    parentsVecs := make([]allVecs, len(e.Parents))
    parentsBranchIDs := make([]idx.Validator, len(e.Parents))
    for i, p := range e.Parents {
        parentsBranchIDs[i] = vi.getEventBranchID(p)
        parentsVecs[i] = allVecs{
            beforeSeq:  vi.GetHighestBeforeSeq(p),
            beforeTime: vi.GetHighestBeforeTime(p),
            //after : vi.GetLowestAfterSeq(p), not needed
        }
        if parentsVecs[i].beforeSeq == nil {
            vi.Log.Crit("Processed out of order, parent not found (inconsistent DB)", "parent", p.String())
        }
    }

    // observed by himself
    myVecs.after.Set(meBranchID, e.Seq)
    myVecs.beforeSeq.Set(meBranchID, BranchSeq{Seq: e.Seq, MinSeq: e.Seq})
    myVecs.beforeTime.Set(meBranchID, e.ClaimedTime)

    for _, pVec := range parentsVecs {
        // calculate HighestBefore vector. Detect forks for a case when parent observes a fork
        for branchID := idx.Validator(0); branchID < idx.Validator(len(vi.bi.BranchIDCreatorIdxs)); branchID++ {
            hisSeq := pVec.beforeSeq.Get(branchID)
            if hisSeq.Seq == 0 && !hisSeq.IsForkDetected() {
                // hisSeq doesn't observe anything about this branchID
                continue
            }
            mySeq := myVecs.beforeSeq.Get(branchID)

            if mySeq.IsForkDetected() {
                // mySeq observes the maximum already
                continue
            }
            if hisSeq.IsForkDetected() {
                // set fork detected
                vi.setForkDetected(myVecs.beforeSeq, branchID)
            } else {
                if mySeq.Seq == 0 || mySeq.MinSeq > hisSeq.MinSeq {
                    // take hisSeq.MinSeq
                    mySeq.MinSeq = hisSeq.MinSeq
                    myVecs.beforeSeq.Set(branchID, mySeq)
                }
                if mySeq.Seq < hisSeq.Seq {
                    // take hisSeq.Seq
                    mySeq.Seq = hisSeq.Seq
                    myVecs.beforeSeq.Set(branchID, mySeq)
                    myVecs.beforeTime.Set(branchID, pVec.beforeTime.Get(branchID))
                }
            }
        }
    }
    // Detect forks, which were not observed by parents
    for n := idx.Validator(0); n < idx.Validator(vi.validators.Len()); n++ {
        if myVecs.beforeSeq.Get(n).IsForkDetected() {
            // fork is already detected from the creator
            continue
        }
        for _, branchID1 := range vi.bi.BranchIDByCreators[n] {
            for _, branchID2 := range vi.bi.BranchIDByCreators[n] {
                if branchID1 == branchID2 {
                    continue
                }

                a := myVecs.beforeSeq.Get(branchID1)
                b := myVecs.beforeSeq.Get(branchID2)

                if a.Seq == 0 || b.Seq == 0 {
                    continue
                }
                if a.MinSeq <= b.Seq && b.MinSeq <= a.Seq {
                    vi.setForkDetected(myVecs.beforeSeq, n)
                    goto nextCreator
                }
            }
        }
    nextCreator:
    }

    // graph traversal starting from e, but excluding e
    onWalk := func(walk hash.Event) (godeeper bool) {
        wLowestAfterSeq := vi.GetLowestAfterSeq(walk)

        godeeper = wLowestAfterSeq.Get(meBranchID) == 0
        if !godeeper {
            return
        }

        // update LowestAfter vector of the old event, because newly-connected event observes it
        wLowestAfterSeq.Set(meBranchID, e.Seq)
        vi.SetLowestAfter(walk, wLowestAfterSeq)

        return
    }
    err := vi.dfsSubgraph(e, onWalk)
    if err != nil {
        vi.Log.Crit("VectorClock: Failed to walk subgraph", "err", err)
    }

    // store calculated vectors
    vi.SetHighestBefore(e.Hash(), myVecs.beforeSeq, myVecs.beforeTime)
    vi.SetLowestAfter(e.Hash(), myVecs.after)
    vi.setEventBranchID(e.Hash(), meBranchID)

    return myVecs
}

// GetHighestBeforeAllBranches returns HighestBefore vector clock without branches, where branches are merged into one
func (vi *Index) GetHighestBeforeAllBranches(id hash.Event) HighestBeforeSeq {
    mergedSeq, _ := vi.getHighestBeforeAllBranchesTime(id)
    return mergedSeq
}

func (vi *Index) getHighestBeforeAllBranchesTime(id hash.Event) (HighestBeforeSeq, HighestBeforeTime) {
    vi.initBranchesInfo()

    if vi.atLeastOneFork() {
        beforeSeq := vi.GetHighestBeforeSeq(id)
        times := vi.GetHighestBeforeTime(id)
        mergedTimes := NewHighestBeforeTime(vi.validators.Len())
        mergedSeq := NewHighestBeforeSeq(vi.validators.Len())
        for creatorIdx, branches := range vi.bi.BranchIDByCreators {
            // read all branches to find highest event
            highestBranchSeq := BranchSeq{}
            highestBranchTime := inter.Timestamp(0)
            for _, branchID := range branches {
                branch := beforeSeq.Get(branchID)
                if branch.IsForkDetected() {
                    highestBranchSeq = branch
                    break
                }
                if branch.Seq > highestBranchSeq.Seq {
                    highestBranchSeq = branch
                    highestBranchTime = times.Get(branchID)
                }
            }
            mergedTimes.Set(idx.Validator(creatorIdx), highestBranchTime)
            mergedSeq.Set(idx.Validator(creatorIdx), highestBranchSeq)
        }

        return mergedSeq, mergedTimes
    }
    return vi.GetHighestBeforeSeq(id), vi.GetHighestBeforeTime(id)
}