Fantom-foundation/go-lachesis

View on GitHub
gossip/ordering/event_buffer.go

Summary

Maintainability
A
0 mins
Test Coverage
package ordering

import (
    lru "github.com/hashicorp/golang-lru"

    "github.com/Fantom-foundation/go-lachesis/eventcheck"
    "github.com/Fantom-foundation/go-lachesis/hash"
    "github.com/Fantom-foundation/go-lachesis/inter"
)

type (
    // event is a inter.Event and data for ordering purpose.
    event struct {
        *inter.Event

        peer string
    }

    // Callback is a set of EventBuffer()'s args.
    Callback struct {
        Process func(e *inter.Event) error
        Drop    func(e *inter.Event, peer string, err error)
        Get     func(hash.Event) *inter.EventHeaderData
        Exists  func(hash.Event) bool
        Check   func(e *inter.Event, parents []*inter.EventHeaderData) error
    }
)

type EventBuffer struct {
    incompletes *lru.Cache // event hash -> event
    callback    Callback
    buffSize    int
}

func New(buffSize int, callback Callback) *EventBuffer {
    incompletes, _ := lru.New(buffSize)
    return &EventBuffer{
        incompletes: incompletes,
        callback:    callback,
        buffSize:    buffSize,
    }
}

func (buf *EventBuffer) PushEvent(e *inter.Event, peer string) {
    w := &event{
        Event: e,
        peer:  peer,
    }

    buf.pushEvent(w, nil, true)
}

func (buf *EventBuffer) getIncompleteEventsList() []*event {
    if buf.incompletes.Len() == buf.buffSize {
        for buf.incompletes.Len() > buf.buffSize/4 {
            buf.incompletes.RemoveOldest()
        }
        buf.incompletes.Purge()
    }
    res := make([]*event, 0, buf.incompletes.Len())
    for _, childID := range buf.incompletes.Keys() {
        child, _ := buf.incompletes.Peek(childID)
        if child == nil {
            continue
        }
        res = append(res, child.(*event))
    }
    return res
}

func (buf *EventBuffer) pushEvent(e *event, incompleteEventsList []*event, strict bool) {
    // LRU is thread-safe, no need in mutex
    if buf.callback.Exists(e.Hash()) {
        if strict {
            buf.callback.Drop(e.Event, e.peer, eventcheck.ErrAlreadyConnectedEvent)
        }
        return
    }

    parents := make([]*inter.EventHeaderData, len(e.Parents)) // use local buffer for thread safety
    for i, p := range e.Parents {
        _, _ = buf.incompletes.Get(p) // updating the "recently used"-ness of the key
        parent := buf.callback.Get(p)
        if parent == nil {
            buf.incompletes.Add(e.Hash(), e)
            return
        }
        parents[i] = parent
    }

    // validate
    if buf.callback.Check != nil {
        err := buf.callback.Check(e.Event, parents)
        if err != nil {
            buf.callback.Drop(e.Event, e.peer, err)
            return
        }
    }

    // process
    err := buf.callback.Process(e.Event)
    if err != nil {
        buf.callback.Drop(e.Event, e.peer, err)
        return
    }

    // now child events may become complete, check it again
    eHash := e.Hash()
    buf.incompletes.Remove(eHash)
    if incompleteEventsList == nil {
        incompleteEventsList = buf.getIncompleteEventsList()
    }
    for _, child := range incompleteEventsList {
        for _, parent := range child.Parents {
            if parent == eHash {
                buf.pushEvent(child, incompleteEventsList, false)
            }
        }
    }
}

func (buf *EventBuffer) IsBuffered(id hash.Event) bool {
    return buf.incompletes.Contains(id) // LRU is thread-safe, no need in mutex
}

func (buf *EventBuffer) Clear() {
    buf.incompletes.Purge() // LRU is thread-safe, no need in mutex
}