Fantom-foundation/go-lachesis

View on GitHub
gossip/service.go

Summary

Maintainability
A
35 mins
Test Coverage
package gossip

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/accounts"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core"
    "github.com/ethereum/go-ethereum/core/types"
    notify "github.com/ethereum/go-ethereum/event"
    "github.com/ethereum/go-ethereum/node"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/discv5"
    "github.com/ethereum/go-ethereum/p2p/enr"
    "github.com/ethereum/go-ethereum/rpc"

    "github.com/Fantom-foundation/go-lachesis/app"
    "github.com/Fantom-foundation/go-lachesis/ethapi"
    "github.com/Fantom-foundation/go-lachesis/eventcheck"
    "github.com/Fantom-foundation/go-lachesis/eventcheck/basiccheck"
    "github.com/Fantom-foundation/go-lachesis/eventcheck/epochcheck"
    "github.com/Fantom-foundation/go-lachesis/eventcheck/gaspowercheck"
    "github.com/Fantom-foundation/go-lachesis/eventcheck/heavycheck"
    "github.com/Fantom-foundation/go-lachesis/eventcheck/parentscheck"
    "github.com/Fantom-foundation/go-lachesis/evmcore"
    "github.com/Fantom-foundation/go-lachesis/gossip/filters"
    "github.com/Fantom-foundation/go-lachesis/gossip/gasprice"
    "github.com/Fantom-foundation/go-lachesis/gossip/occuredtxs"
    "github.com/Fantom-foundation/go-lachesis/gossip/upgnotifier"
    "github.com/Fantom-foundation/go-lachesis/hash"
    "github.com/Fantom-foundation/go-lachesis/inter"
    "github.com/Fantom-foundation/go-lachesis/inter/idx"
    "github.com/Fantom-foundation/go-lachesis/lachesis"
    "github.com/Fantom-foundation/go-lachesis/lachesis/params"
    "github.com/Fantom-foundation/go-lachesis/logger"
)

const (
    txsRingBufferSize = 20000 // Maximum number of stored hashes of included but not confirmed txs
)

type ServiceFeed struct {
    scope notify.SubscriptionScope

    newEpoch        notify.Feed
    newPack         notify.Feed
    newEmittedEvent notify.Feed
    newBlock        notify.Feed
    newTxs          notify.Feed
    newLogs         notify.Feed
}

func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription {
    return f.scope.Track(f.newEpoch.Subscribe(ch))
}

func (f *ServiceFeed) SubscribeNewPack(ch chan<- idx.Pack) notify.Subscription {
    return f.scope.Track(f.newPack.Subscribe(ch))
}

func (f *ServiceFeed) SubscribeNewEmitted(ch chan<- *inter.Event) notify.Subscription {
    return f.scope.Track(f.newEmittedEvent.Subscribe(ch))
}

func (f *ServiceFeed) SubscribeNewBlock(ch chan<- evmcore.ChainHeadNotify) notify.Subscription {
    return f.scope.Track(f.newBlock.Subscribe(ch))
}

func (f *ServiceFeed) SubscribeNewTxs(ch chan<- core.NewTxsEvent) notify.Subscription {
    return f.scope.Track(f.newTxs.Subscribe(ch))
}

func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscription {
    return f.scope.Track(f.newLogs.Subscribe(ch))
}

// Service implements go-ethereum/node.Service interface.
type Service struct {
    config *Config

    wg   sync.WaitGroup
    done chan struct{}

    // server
    p2pServer *p2p.Server
    Name      string
    Topic     discv5.Topic

    serverPool *serverPool

    // application
    accountManager      *accounts.Manager
    store               *Store
    app                 *app.Store
    engine              Consensus
    engineMu            *sync.RWMutex
    emitter             *Emitter
    txpool              *evmcore.TxPool
    occurredTxs         *occuredtxs.Buffer
    heavyCheckReader    HeavyCheckReader
    gasPowerCheckReader GasPowerCheckReader
    checkers            *eventcheck.Checkers
    upgNotifier         *upgnotifier.Logger
    lastBlockProcessed  time.Time

    // global variables. TODO refactor to pass them as arguments if possible
    blockParticipated map[idx.StakerID]bool // validators who participated in last block
    currentEvent      hash.Event            // current event which is being processed

    feed ServiceFeed

    // application protocol
    pm *ProtocolManager

    EthAPI        *EthAPIBackend
    netRPCService *ethapi.PublicNetAPI

    stopped   bool
    migration bool

    logger.Instance
}

func NewService(stack *node.Node, config *Config, store *Store, engine Consensus) (*Service, error) {
    if config.TxPool.Journal != "" {
        config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
    }

    svc, err := newService(config, store, engine)
    if err != nil {
        return nil, err
    }

    svc.accountManager = stack.AccountManager()
    svc.p2pServer = stack.Server()
    // Create the net API service
    svc.netRPCService = ethapi.NewPublicNetAPI(svc.p2pServer, svc.config.Net.NetworkID)

    return svc, err
}

func newService(config *Config, store *Store, engine Consensus) (*Service, error) {
    svc := &Service{
        config: config,

        done: make(chan struct{}),

        Name: fmt.Sprintf("Node-%d", rand.Int()),

        store:       store,
        app:         store.app,
        upgNotifier: upgnotifier.New(store),

        engineMu:           new(sync.RWMutex),
        occurredTxs:        occuredtxs.New(txsRingBufferSize, types.NewEIP155Signer(config.Net.EvmChainConfig().ChainID)),
        blockParticipated:  make(map[idx.StakerID]bool),
        lastBlockProcessed: time.Now(),

        Instance: logger.MakeInstance(),
    }

    // wrap engine
    svc.engine = &HookedEngine{
        engine:       engine,
        processEvent: svc.processEvent,
    }
    svc.engine.Bootstrap(inter.ConsensusCallbacks{
        ApplyBlock:              svc.applyBlock,
        SelectValidatorsGroup:   svc.selectValidatorsGroup,
        OnEventConfirmed:        svc.onEventConfirmed,
        IsEventAllowedIntoBlock: svc.isEventAllowedIntoBlock,
    })

    // find highest lamport
    var highestLamport idx.Lamport
    for _, id := range store.GetHeads(svc.engine.GetEpoch()) {
        if highestLamport < id.Lamport() {
            highestLamport = id.Lamport()
        }
    }
    store.SetHighestLamport(highestLamport)

    // create server pool
    trustedNodes := []string{}
    svc.serverPool = newServerPool(store.async.table.Peers, svc.done, &svc.wg, trustedNodes)

    // create tx pool
    stateReader := svc.GetEvmStateReader()
    svc.txpool = evmcore.NewTxPool(config.TxPool, config.Net.EvmChainConfig(), stateReader)

    // create checkers
    svc.heavyCheckReader.Addrs.Store(ReadEpochPubKeys(svc.app, svc.engine.GetEpoch()))                                                                     // read pub keys of current epoch from disk
    svc.gasPowerCheckReader.Ctx.Store(ReadGasPowerContext(svc.store, svc.app, svc.engine.GetValidators(), svc.engine.GetEpoch(), &svc.config.Net.Economy)) // read gaspower check data from disk
    svc.checkers = makeCheckers(&svc.config.Net, &svc.heavyCheckReader, &svc.gasPowerCheckReader, svc.engine, svc.store)

    // create protocol manager
    var err error
    svc.pm, err = NewProtocolManager(config, &svc.feed, svc.txpool, svc.engineMu, svc.checkers, store, svc.engine, svc.serverPool, svc.IsMigration)
    if err != nil {
        return nil, err
    }

    // create API backend
    svc.EthAPI = &EthAPIBackend{config.ExtRPCEnabled, svc, stateReader, nil}
    svc.EthAPI.gpo = gasprice.NewOracle(svc.EthAPI, svc.config.GPO)

    return svc, nil
}

// GetEngine returns service's engine
func (s *Service) GetEngine() Consensus {
    return s.engine
}

// makeCheckers builds event checkers
func makeCheckers(net *lachesis.Config, heavyCheckReader *HeavyCheckReader, gasPowerCheckReader *GasPowerCheckReader, engine Consensus, store *Store) *eventcheck.Checkers {
    // create signatures checker
    ledgerID := net.EvmChainConfig().ChainID
    heavyCheck := heavycheck.NewDefault(&net.Dag, heavyCheckReader, types.NewEIP155Signer(ledgerID))

    // create gaspower checker
    gaspowerCheck := gaspowercheck.New(gasPowerCheckReader)

    return &eventcheck.Checkers{
        Basiccheck:    basiccheck.New(&net.Dag),
        Epochcheck:    epochcheck.New(&net.Dag, engine),
        Parentscheck:  parentscheck.New(&net.Dag),
        Heavycheck:    heavyCheck,
        Gaspowercheck: gaspowerCheck,
    }
}

func (s *Service) makeEmitter() *Emitter {
    // randomize event time to decrease peak load, and increase chance of catching double instances of validator
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    emitterCfg := s.config.Emitter // copy data
    emitterCfg.EmitIntervals = *emitterCfg.EmitIntervals.RandomizeEmitTime(r)

    return NewEmitter(&s.config.Net, &emitterCfg,
        EmitterWorld{
            Store:       s.store,
            App:         s.app,
            Engine:      s.engine,
            EngineMu:    s.engineMu,
            Txpool:      s.txpool,
            Am:          s.AccountManager(),
            OccurredTxs: s.occurredTxs,
            Checkers:    s.checkers,
            MinGasPrice: s.MinGasPrice,
            OnEmitted: func(emitted *inter.Event) {
                // s.engineMu is locked here

                err := s.engine.ProcessEvent(emitted)
                if err != nil {
                    s.Log.Crit("Self-event connection failed", "err", err.Error())
                }

                s.feed.newEmittedEvent.Send(emitted) // PM listens and will broadcast it
                if err != nil {
                    s.Log.Crit("Failed to post self-event", "err", err.Error())
                }
            },
            IsSynced: func() bool {
                return atomic.LoadUint32(&s.pm.synced) != 0
            },
            LastBlockProcessed: func() time.Time {
                return s.lastBlockProcessed
            },
            PeersNum: func() int {
                return s.pm.peers.Len()
            },
            IsMigration: s.IsMigration,
            AddVersion: func(e *inter.Event) *inter.Event {
                // serialization version
                e.Version = 0
                // node version
                if e.Seq <= 1 && len(s.config.Emitter.VersionToPublish) > 0 {
                    version := []byte("v-" + s.config.Emitter.VersionToPublish)
                    if len(version) <= params.MaxExtraData {
                        e.Extra = version
                    }
                }

                return e
            },
        },
    )
}

// Protocols returns protocols the service can communicate on.
func (s *Service) Protocols() []p2p.Protocol {
    protos := make([]p2p.Protocol, len(ProtocolVersions))
    for i, vsn := range ProtocolVersions {
        protos[i] = s.pm.makeProtocol(vsn)
        protos[i].Attributes = []enr.Entry{s.currentEnr()}
    }
    return protos
}

// APIs returns api methods the service wants to expose on rpc channels.
func (s *Service) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.EthAPI)

    apis = append(apis, []rpc.API{
        {
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicEthereumAPI(s),
            Public:    true,
        }, {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.EthAPI),
            Public:    true,
        }, {
            Namespace: "net",
            Version:   "1.0",
            Service:   s.netRPCService,
            Public:    true,
        },
    }...)

    return apis
}

// Start method invoked when the node is ready to start the service.
func (s *Service) Start() error {
    var genesis common.Hash
    genesis = s.engine.GetGenesisHash()
    s.Topic = discv5.Topic("lachesis@" + genesis.Hex())

    if s.p2pServer.DiscV5 != nil {
        go func(topic discv5.Topic) {
            s.Log.Info("Starting topic registration")
            defer s.Log.Info("Terminated topic registration")

            s.p2pServer.DiscV5.RegisterTopic(topic, s.done)
        }(s.Topic)
    }

    s.pm.Start(s.p2pServer.MaxPeers)

    s.serverPool.start(s.p2pServer, s.Topic)

    s.emitter = s.makeEmitter()
    s.emitter.SetValidator(s.config.Emitter.Validator)
    s.emitter.StartEventEmission()
    if s.config.Upgrade.WarningIfNotUpgraded {
        s.upgNotifier.Start()
    }

    return nil
}

func (s *Service) IsMigration() bool {
    return s.migration
}

func (s *Service) Emitter() *Emitter {
    return s.emitter
}

// Stop method invoked when the node terminates the service.
func (s *Service) Stop() error {
    if s.config.Upgrade.WarningIfNotUpgraded {
        s.upgNotifier.Stop()
    }
    close(s.done)
    s.emitter.StopEventEmission()
    s.pm.Stop()
    s.wg.Wait()
    s.feed.scope.Close()

    // flush the state at exit, after all the routines stopped
    s.engineMu.Lock()
    defer s.engineMu.Unlock()
    s.stopped = true

    return s.store.Commit(nil, true)
}

// AccountManager return node's account manager
func (s *Service) AccountManager() *accounts.Manager {
    return s.accountManager
}