
View on GitHub


35 mins
Test Coverage
package gossip

import (

    notify ""


const (
    txsRingBufferSize = 20000 // Maximum number of stored hashes of included but not confirmed txs

type ServiceFeed struct {
    scope notify.SubscriptionScope

    newEpoch        notify.Feed
    newPack         notify.Feed
    newEmittedEvent notify.Feed
    newBlock        notify.Feed
    newTxs          notify.Feed
    newLogs         notify.Feed

func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription {
    return f.scope.Track(f.newEpoch.Subscribe(ch))

func (f *ServiceFeed) SubscribeNewPack(ch chan<- idx.Pack) notify.Subscription {
    return f.scope.Track(f.newPack.Subscribe(ch))

func (f *ServiceFeed) SubscribeNewEmitted(ch chan<- *inter.Event) notify.Subscription {
    return f.scope.Track(f.newEmittedEvent.Subscribe(ch))

func (f *ServiceFeed) SubscribeNewBlock(ch chan<- evmcore.ChainHeadNotify) notify.Subscription {
    return f.scope.Track(f.newBlock.Subscribe(ch))

func (f *ServiceFeed) SubscribeNewTxs(ch chan<- core.NewTxsEvent) notify.Subscription {
    return f.scope.Track(f.newTxs.Subscribe(ch))

func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscription {
    return f.scope.Track(f.newLogs.Subscribe(ch))

// Service implements go-ethereum/node.Service interface.
type Service struct {
    config *Config

    wg   sync.WaitGroup
    done chan struct{}

    // server
    p2pServer *p2p.Server
    Name      string
    Topic     discv5.Topic

    serverPool *serverPool

    // application
    accountManager      *accounts.Manager
    store               *Store
    app                 *app.Store
    engine              Consensus
    engineMu            *sync.RWMutex
    emitter             *Emitter
    txpool              *evmcore.TxPool
    occurredTxs         *occuredtxs.Buffer
    heavyCheckReader    HeavyCheckReader
    gasPowerCheckReader GasPowerCheckReader
    checkers            *eventcheck.Checkers
    upgNotifier         *upgnotifier.Logger
    lastBlockProcessed  time.Time

    // global variables. TODO refactor to pass them as arguments if possible
    blockParticipated map[idx.StakerID]bool // validators who participated in last block
    currentEvent      hash.Event            // current event which is being processed

    feed ServiceFeed

    // application protocol
    pm *ProtocolManager

    EthAPI        *EthAPIBackend
    netRPCService *ethapi.PublicNetAPI

    stopped   bool
    migration bool


func NewService(stack *node.Node, config *Config, store *Store, engine Consensus) (*Service, error) {
    if config.TxPool.Journal != "" {
        config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)

    svc, err := newService(config, store, engine)
    if err != nil {
        return nil, err

    svc.accountManager = stack.AccountManager()
    svc.p2pServer = stack.Server()
    // Create the net API service
    svc.netRPCService = ethapi.NewPublicNetAPI(svc.p2pServer, svc.config.Net.NetworkID)

    return svc, err

func newService(config *Config, store *Store, engine Consensus) (*Service, error) {
    svc := &Service{
        config: config,

        done: make(chan struct{}),

        Name: fmt.Sprintf("Node-%d", rand.Int()),

        store:       store,
        upgNotifier: upgnotifier.New(store),

        engineMu:           new(sync.RWMutex),
        occurredTxs:        occuredtxs.New(txsRingBufferSize, types.NewEIP155Signer(config.Net.EvmChainConfig().ChainID)),
        blockParticipated:  make(map[idx.StakerID]bool),
        lastBlockProcessed: time.Now(),

        Instance: logger.MakeInstance(),

    // wrap engine
    svc.engine = &HookedEngine{
        engine:       engine,
        processEvent: svc.processEvent,
        ApplyBlock:              svc.applyBlock,
        SelectValidatorsGroup:   svc.selectValidatorsGroup,
        OnEventConfirmed:        svc.onEventConfirmed,
        IsEventAllowedIntoBlock: svc.isEventAllowedIntoBlock,

    // find highest lamport
    var highestLamport idx.Lamport
    for _, id := range store.GetHeads(svc.engine.GetEpoch()) {
        if highestLamport < id.Lamport() {
            highestLamport = id.Lamport()

    // create server pool
    trustedNodes := []string{}
    svc.serverPool = newServerPool(store.async.table.Peers, svc.done, &svc.wg, trustedNodes)

    // create tx pool
    stateReader := svc.GetEvmStateReader()
    svc.txpool = evmcore.NewTxPool(config.TxPool, config.Net.EvmChainConfig(), stateReader)

    // create checkers
    svc.heavyCheckReader.Addrs.Store(ReadEpochPubKeys(, svc.engine.GetEpoch()))                                                                     // read pub keys of current epoch from disk
    svc.gasPowerCheckReader.Ctx.Store(ReadGasPowerContext(,, svc.engine.GetValidators(), svc.engine.GetEpoch(), &svc.config.Net.Economy)) // read gaspower check data from disk
    svc.checkers = makeCheckers(&svc.config.Net, &svc.heavyCheckReader, &svc.gasPowerCheckReader, svc.engine,

    // create protocol manager
    var err error, err = NewProtocolManager(config, &svc.feed, svc.txpool, svc.engineMu, svc.checkers, store, svc.engine, svc.serverPool, svc.IsMigration)
    if err != nil {
        return nil, err

    // create API backend
    svc.EthAPI = &EthAPIBackend{config.ExtRPCEnabled, svc, stateReader, nil}
    svc.EthAPI.gpo = gasprice.NewOracle(svc.EthAPI, svc.config.GPO)

    return svc, nil

// GetEngine returns service's engine
func (s *Service) GetEngine() Consensus {
    return s.engine

// makeCheckers builds event checkers
func makeCheckers(net *lachesis.Config, heavyCheckReader *HeavyCheckReader, gasPowerCheckReader *GasPowerCheckReader, engine Consensus, store *Store) *eventcheck.Checkers {
    // create signatures checker
    ledgerID := net.EvmChainConfig().ChainID
    heavyCheck := heavycheck.NewDefault(&net.Dag, heavyCheckReader, types.NewEIP155Signer(ledgerID))

    // create gaspower checker
    gaspowerCheck := gaspowercheck.New(gasPowerCheckReader)

    return &eventcheck.Checkers{
        Basiccheck:    basiccheck.New(&net.Dag),
        Epochcheck:    epochcheck.New(&net.Dag, engine),
        Parentscheck:  parentscheck.New(&net.Dag),
        Heavycheck:    heavyCheck,
        Gaspowercheck: gaspowerCheck,

func (s *Service) makeEmitter() *Emitter {
    // randomize event time to decrease peak load, and increase chance of catching double instances of validator
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    emitterCfg := s.config.Emitter // copy data
    emitterCfg.EmitIntervals = *emitterCfg.EmitIntervals.RandomizeEmitTime(r)

    return NewEmitter(&s.config.Net, &emitterCfg,
            Engine:      s.engine,
            EngineMu:    s.engineMu,
            Txpool:      s.txpool,
            Am:          s.AccountManager(),
            OccurredTxs: s.occurredTxs,
            Checkers:    s.checkers,
            MinGasPrice: s.MinGasPrice,
            OnEmitted: func(emitted *inter.Event) {
                // s.engineMu is locked here

                err := s.engine.ProcessEvent(emitted)
                if err != nil {
                    s.Log.Crit("Self-event connection failed", "err", err.Error())

                s.feed.newEmittedEvent.Send(emitted) // PM listens and will broadcast it
                if err != nil {
                    s.Log.Crit("Failed to post self-event", "err", err.Error())
            IsSynced: func() bool {
                return atomic.LoadUint32(& != 0
            LastBlockProcessed: func() time.Time {
                return s.lastBlockProcessed
            PeersNum: func() int {
            IsMigration: s.IsMigration,
            AddVersion: func(e *inter.Event) *inter.Event {
                // serialization version
                e.Version = 0
                // node version
                if e.Seq <= 1 && len(s.config.Emitter.VersionToPublish) > 0 {
                    version := []byte("v-" + s.config.Emitter.VersionToPublish)
                    if len(version) <= params.MaxExtraData {
                        e.Extra = version

                return e

// Protocols returns protocols the service can communicate on.
func (s *Service) Protocols() []p2p.Protocol {
    protos := make([]p2p.Protocol, len(ProtocolVersions))
    for i, vsn := range ProtocolVersions {
        protos[i] =
        protos[i].Attributes = []enr.Entry{s.currentEnr()}
    return protos

// APIs returns api methods the service wants to expose on rpc channels.
func (s *Service) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.EthAPI)

    apis = append(apis, []rpc.API{
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicEthereumAPI(s),
            Public:    true,
        }, {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.EthAPI),
            Public:    true,
        }, {
            Namespace: "net",
            Version:   "1.0",
            Service:   s.netRPCService,
            Public:    true,

    return apis

// Start method invoked when the node is ready to start the service.
func (s *Service) Start() error {
    var genesis common.Hash
    genesis = s.engine.GetGenesisHash()
    s.Topic = discv5.Topic("lachesis@" + genesis.Hex())

    if s.p2pServer.DiscV5 != nil {
        go func(topic discv5.Topic) {
            s.Log.Info("Starting topic registration")
            defer s.Log.Info("Terminated topic registration")

            s.p2pServer.DiscV5.RegisterTopic(topic, s.done)

    s.serverPool.start(s.p2pServer, s.Topic)

    s.emitter = s.makeEmitter()
    if s.config.Upgrade.WarningIfNotUpgraded {

    return nil

func (s *Service) IsMigration() bool {
    return s.migration

func (s *Service) Emitter() *Emitter {
    return s.emitter

// Stop method invoked when the node terminates the service.
func (s *Service) Stop() error {
    if s.config.Upgrade.WarningIfNotUpgraded {

    // flush the state at exit, after all the routines stopped
    defer s.engineMu.Unlock()
    s.stopped = true

    return, true)

// AccountManager return node's account manager
func (s *Service) AccountManager() *accounts.Manager {
    return s.accountManager