Fantom-foundation/go-lachesis

View on GitHub
eventcheck/heavycheck/heavy_check.go

Summary

Maintainability
A
40 mins
Test Coverage
package heavycheck

import (
    "errors"
    "sync"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/trie"

    "github.com/Fantom-foundation/go-lachesis/eventcheck/epochcheck"
    "github.com/Fantom-foundation/go-lachesis/inter"
    "github.com/Fantom-foundation/go-lachesis/inter/idx"
    "github.com/Fantom-foundation/go-lachesis/lachesis"
)

var (
    ErrWrongEventSig  = errors.New("event has wrong signature")
    ErrMalformedTxSig = errors.New("tx has wrong signature")
    ErrWrongTxHash    = errors.New("tx has wrong txs Merkle tree root")

    errTerminated = errors.New("terminated") // internal err
)

const (
    maxQueuedTasks = 128 // the maximum number of events to queue up
    maxBatch       = 4   // Maximum number of events in an task batch (batch is divided if exceeded)
)

// OnValidatedFn is a callback type for notifying about validation result.
type OnValidatedFn func(*TaskData)

// DagReader is accessed by the validator to get the current state.
type DagReader interface {
    GetEpochPubKeys() (map[idx.StakerID]common.Address, idx.Epoch)
}

// Check which require only parents list + current epoch info
type Checker struct {
    config   *lachesis.DagConfig
    txSigner types.Signer
    reader   DagReader

    numOfThreads int

    tasksQ chan *TaskData
    quit   chan struct{}
    wg     sync.WaitGroup
}

type TaskData struct {
    Events inter.Events // events to validate
    Result []error      // resulting errors of events, nil if ok

    onValidated OnValidatedFn
}

// NewDefault uses N-1 threads
func NewDefault(config *lachesis.DagConfig, reader DagReader, txSigner types.Signer) *Checker {
    return New(config, reader, txSigner, 1)
}

// New validator which performs heavy checks, related to signatures validation and Merkle tree validation
func New(config *lachesis.DagConfig, reader DagReader, txSigner types.Signer, numOfThreads int) *Checker {
    return &Checker{
        config:       config,
        txSigner:     txSigner,
        reader:       reader,
        numOfThreads: numOfThreads,
        tasksQ:       make(chan *TaskData, maxQueuedTasks),
        quit:         make(chan struct{}),
    }
}

func (v *Checker) Start() {
    for i := 0; i < v.numOfThreads; i++ {
        v.wg.Add(1)
        go v.loop()
    }
}

func (v *Checker) Stop() {
    close(v.quit)
    v.wg.Wait()
}

func (v *Checker) Overloaded() bool {
    return len(v.tasksQ) > maxQueuedTasks/2
}

func (v *Checker) Enqueue(events inter.Events, onValidated OnValidatedFn) error {
    // divide big batch into smaller ones
    for start := 0; start < len(events); start += maxBatch {
        end := len(events)
        if end > start+maxBatch {
            end = start + maxBatch
        }
        op := &TaskData{
            Events:      events[start:end],
            onValidated: onValidated,
        }
        select {
        case v.tasksQ <- op:
            continue
        case <-v.quit:
            return errTerminated
        }
    }
    return nil
}

// Validate event
func (v *Checker) Validate(e *inter.Event) error {
    addrs, epoch := v.reader.GetEpochPubKeys()
    if e.Epoch != epoch {
        return epochcheck.ErrNotRelevant
    }
    // stakerID
    addr, ok := addrs[e.Creator]
    if !ok {
        return epochcheck.ErrAuth
    }
    // event sig
    if !e.VerifySignature(addr) {
        return ErrWrongEventSig
    }
    // pre-cache tx sig
    for _, tx := range e.Transactions {
        _, err := types.Sender(v.txSigner, tx)
        if err != nil {
            return ErrMalformedTxSig
        }
    }
    // Merkle tree
    if e.TxHash != types.DeriveSha(e.Transactions, new(trie.Trie)) {
        return ErrWrongTxHash
    }

    return nil
}

func (v *Checker) loop() {
    defer v.wg.Done()
    for {
        select {
        case <-v.quit:
            return

        case op := <-v.tasksQ:
            op.Result = make([]error, len(op.Events))
            for i, e := range op.Events {
                op.Result[i] = v.Validate(e)
            }
            op.onValidated(op)
        }
    }
}