aergoio/aergo

View on GitHub
p2p/peermanager.go

Summary

Maintainability
D
1 day
Test Coverage
F
48%
/* @file @copyright defined in aergo/LICENSE.txt */

package p2p

import (
    "strconv"
    "sync"
    "sync/atomic"
    "time"

    "github.com/aergoio/aergo-actor/actor"
    "github.com/aergoio/aergo-lib/log"
    cfg "github.com/aergoio/aergo/v2/config"
    "github.com/aergoio/aergo/v2/p2p/metric"
    "github.com/aergoio/aergo/v2/p2p/p2pcommon"
    "github.com/aergoio/aergo/v2/p2p/p2pkey"
    "github.com/aergoio/aergo/v2/p2p/p2putil"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
    "github.com/libp2p/go-libp2p-core/protocol"
)

const (
    initial = iota
    running
    stopping
    stopped
)

/**
 * peerManager connect to and listen from other nodes.
 * It implements  Component interface
 */
type peerManager struct {
    status            int32
    is                p2pcommon.InternalService
    nt                p2pcommon.NetworkTransport
    hsFactory         p2pcommon.HSHandlerFactory
    actorService      p2pcommon.ActorService
    peerFactory       p2pcommon.PeerFactory
    mm                metric.MetricsManager
    lm                p2pcommon.ListManager
    cm                p2pcommon.CertificateManager
    skipHandshakeSync bool

    peerFinder p2pcommon.PeerFinder
    wpManager  p2pcommon.WaitingPeerManager

    mutex        *sync.Mutex
    manageNumber uint32
    remotePeers  map[types.PeerID]p2pcommon.RemotePeer
    waitingPeers map[types.PeerID]*p2pcommon.WaitingPeer

    conf *cfg.P2PConfig
    // peerCache is copy-on-write style
    peerCache []p2pcommon.RemotePeer
    // blockManagePeers is sum of blockProducers and agent peer
    bpClassPeers []p2pcommon.RemotePeer
    // watchers and uncertified agents
    watchClassPeers []p2pcommon.RemotePeer

    getPeerChannel    chan getPeerTask
    peerConnected     chan connPeerResult
    removePeerChannel chan p2pcommon.RemotePeer
    fillPoolChannel   chan []p2pcommon.PeerMeta
    addPeerChannel    chan p2pcommon.PeerMeta
    inboundConnChan   chan inboundConnEvent
    workDoneChannel   chan p2pcommon.ConnWorkResult
    taskChannel       chan pmTask
    finishChannel     chan struct{}

    eventListeners []p2pcommon.PeerEventListener

    // designatedPeers and hiddenPeerSet is set in construction time once and will not be changed
    designatedPeers map[types.PeerID]p2pcommon.PeerMeta
    hiddenPeerSet   map[types.PeerID]bool

    logger *log.Logger
}

// getPeerTask is struct to get peer for concurrent use
type getPeerTask struct {
    id  types.PeerID
    ret chan p2pcommon.RemotePeer
}

var _ p2pcommon.PeerManager = (*peerManager)(nil)

// NewPeerManager creates a peer manager object.
func NewPeerManager(is p2pcommon.InternalService, hsFactory p2pcommon.HSHandlerFactory, actor p2pcommon.ActorService, pf p2pcommon.PeerFactory, nt p2pcommon.NetworkTransport, mm metric.MetricsManager, lm p2pcommon.ListManager, logger *log.Logger, cfg *cfg.Config, skipHandshakeSync bool) p2pcommon.PeerManager {
    p2pConf := cfg.P2P
    //logger.SetLevel("debug")
    pm := &peerManager{
        is:                is,
        nt:                nt,
        hsFactory:         hsFactory,
        actorService:      actor,
        conf:              p2pConf,
        peerFactory:       pf,
        mm:                mm,
        lm:                lm,
        logger:            logger,
        mutex:             &sync.Mutex{},
        skipHandshakeSync: skipHandshakeSync,

        status:          initial,
        designatedPeers: make(map[types.PeerID]p2pcommon.PeerMeta, len(cfg.P2P.NPAddPeers)),
        hiddenPeerSet:   make(map[types.PeerID]bool, len(cfg.P2P.NPHiddenPeers)),

        remotePeers: make(map[types.PeerID]p2pcommon.RemotePeer, p2pConf.NPMaxPeers),

        waitingPeers: make(map[types.PeerID]*p2pcommon.WaitingPeer, p2pConf.NPPeerPool),

        peerCache:       make([]p2pcommon.RemotePeer, 0, p2pConf.NPMaxPeers),
        bpClassPeers:    make([]p2pcommon.RemotePeer, 0, p2pConf.NPMaxPeers>>2),
        watchClassPeers: make([]p2pcommon.RemotePeer, 0, p2pConf.NPMaxPeers),

        getPeerChannel:    make(chan getPeerTask),
        peerConnected:     make(chan connPeerResult),
        removePeerChannel: make(chan p2pcommon.RemotePeer),
        fillPoolChannel:   make(chan []p2pcommon.PeerMeta, 2),
        addPeerChannel:    make(chan p2pcommon.PeerMeta),
        inboundConnChan:   make(chan inboundConnEvent),
        workDoneChannel:   make(chan p2pcommon.ConnWorkResult),
        eventListeners:    make([]p2pcommon.PeerEventListener, 0, 4),
        taskChannel:       make(chan pmTask, 4),
        finishChannel:     make(chan struct{}),
    }

    // additional initializations
    pm.init()

    return pm
}

func (pm *peerManager) SelfMeta() p2pcommon.PeerMeta {
    return pm.nt.SelfMeta()
}
func (pm *peerManager) SelfNodeID() types.PeerID {
    return p2pkey.NodeID()
}

func (pm *peerManager) init() {
    // set designated peers
    pm.initDesignatedPeerList()
    // init hidden peers
    for _, pidStr := range pm.conf.NPHiddenPeers {
        pid, err := types.IDB58Decode(pidStr)
        if err != nil {
            panic("Invalid pid in NPHiddenPeers : " + pidStr + " err " + err.Error())
        }
        pm.hiddenPeerSet[pid] = true
    }

    pm.peerFinder = NewPeerFinder(pm.logger, pm, pm.actorService, pm.conf.NPPeerPool, pm.conf.NPDiscoverPeers, pm.conf.NPUsePolaris)
    pm.wpManager = NewWaitingPeerManager(pm.logger, pm.is, pm, pm.lm, pm.conf.NPPeerPool, pm.conf.NPDiscoverPeers)
    pm.AddPeerEventListener(pm.peerFinder)
    pm.AddPeerEventListener(pm.wpManager)
    // add designated peers to waiting pool at initial time.
    for _, meta := range pm.designatedPeers {
        if _, foundInWait := pm.waitingPeers[meta.ID]; !foundInWait {
            pm.waitingPeers[meta.ID] = &p2pcommon.WaitingPeer{Meta: meta, Designated: true, NextTrial: time.Now()}
        }
    }
}

func (pm *peerManager) AddPeerEventListener(l p2pcommon.PeerEventListener) {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    pm.eventListeners = append(pm.eventListeners, l)
}

func (pm *peerManager) Start() error {
    // connect other sub modules
    pm.cm = pm.is.CertificateManager()
    go pm.runManagePeers()

    return nil
}

func (pm *peerManager) Stop() error {
    if atomic.CompareAndSwapInt32(&pm.status, running, stopping) {
        pm.finishChannel <- struct{}{}
    } else {
        // leave stopped if already stopped
        if atomic.SwapInt32(&pm.status, stopping) == stopped {
            atomic.StoreInt32(&pm.status, stopped)
        }
    }
    return nil
}

func (pm *peerManager) initDesignatedPeerList() {
    // add remote node from config
    for _, addrStr := range pm.conf.NPAddPeers {
        ma, err := types.ParseMultiaddr(addrStr)
        if err != nil {
            pm.logger.Warn().Err(err).Str("str", addrStr).Msg("invalid NPAddPeer address")
            continue
        }
        peerMeta, err := p2putil.FromMultiAddrToPeerInfo(ma)
        if err != nil {
            pm.logger.Warn().Err(err).Str("str", addrStr).Msg("invalid NPAddPeer address")
            continue
        }
        pm.logger.Info().Str(p2putil.LogFullID, peerMeta.ID.Pretty()).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerMeta.ID)).Str("addr", peerMeta.Addresses[0].String()).Msg("Adding Designated peer")
        pm.designatedPeers[peerMeta.ID] = peerMeta
    }
}

func (pm *peerManager) runManagePeers() {

    pm.logger.Info().Str("p2p_proto", p2putil.ProtocolIDsToString([]protocol.ID{p2pcommon.P2PSubAddr})).Msg("Starting p2p listening")
    pm.nt.AddStreamHandler(p2pcommon.P2PSubAddr, pm.wpManager.OnInboundConn)

    if !atomic.CompareAndSwapInt32(&pm.status, initial, running) {
        panic("wrong internal status")
    }
    instantStart := time.Millisecond << 4
    initialAddrDelay := time.Second * 2
    finderTimer := time.NewTimer(initialAddrDelay)
    connManTimer := time.NewTimer(initialAddrDelay << 1)

MANLOOP:
    for {
        select {
        case req := <-pm.getPeerChannel:
            peer, exist := pm.remotePeers[req.id]
            if exist {
                req.ret <- peer
            } else {
                req.ret <- nil
            }
        case hsreslt := <-pm.peerConnected:
            if peer := pm.tryRegister(hsreslt); peer != nil {
                pm.checkSync(peer)

                // query other peers
                if !finderTimer.Stop() {
                    <-finderTimer.C
                }
                finderTimer.Reset(instantStart)
            }
        case peer := <-pm.removePeerChannel:
            if pm.removePeer(peer) {
                // add code if needed
            }
            if !connManTimer.Stop() {
                <-connManTimer.C
            }
            connManTimer.Reset(instantStart)
        case inInfo := <-pm.inboundConnChan:
            id := inInfo.meta.ID
            if _, found := pm.remotePeers[id]; found {
                inInfo.foundC <- true
            } else {
                inInfo.foundC <- false
            }
        case workResult := <-pm.workDoneChannel:
            pm.wpManager.OnWorkDone(workResult)
            // Retry
            if !connManTimer.Stop() {
                <-connManTimer.C
            }
            connManTimer.Reset(instantStart)
        case <-finderTimer.C:
            pm.peerFinder.CheckAndFill()
            finderTimer.Reset(DiscoveryQueryInterval)
        case <-connManTimer.C:
            pm.wpManager.CheckAndConnect()
            // fire at next interval
            connManTimer.Reset(p2pcommon.WaitingPeerManagerInterval)
            //connManTimer.Reset(time.Second*5)
        case peerMeta := <-pm.addPeerChannel:
            pm.wpManager.InstantConnect(peerMeta)
        case peerMetas := <-pm.fillPoolChannel:
            if pm.wpManager.OnDiscoveredPeers(peerMetas) > 0 {
                if !connManTimer.Stop() {
                    <-connManTimer.C
                }
                connManTimer.Reset(instantStart)
            }
        case task := <-pm.taskChannel:
            task()
        case <-pm.finishChannel:
            finderTimer.Stop()
            connManTimer.Stop()
            break MANLOOP
        }
    }
    // guaranty no new peer connection will be made
    pm.nt.RemoveStreamHandler(p2pcommon.P2PSubAddr)

    pm.logger.Info().Msg("Finishing peerManager")

    go func() {
        // closing all peer connections
        for _, peer := range pm.peerCache {
            peer.Stop()
        }
    }()
    timer := time.NewTimer(time.Second * 30)
    finishPoll := time.NewTicker(time.Millisecond << 6)
CLEANUPLOOP:
    for {
        select {
        case req := <-pm.getPeerChannel:
            req.ret <- nil
        case peer := <-pm.removePeerChannel:
            pm.removePeer(peer)
        case <-finishPoll.C:
            if len(pm.remotePeers) == 0 {
                pm.logger.Debug().Msg("All peers were finished peerManager")
                break CLEANUPLOOP
            }
        case <-timer.C:
            pm.logger.Warn().Int("remained", len(pm.peerCache)).Msg("peermanager stop timeout. some peers were not finished.")
            break CLEANUPLOOP
        }
    }
    atomic.StoreInt32(&pm.status, stopped)
}

// tryRegister register peer to peer manager, if peer with same peer
func (pm *peerManager) tryRegister(hsResult connPeerResult) p2pcommon.RemotePeer {
    remote := hsResult.remote
    meta := remote.Meta
    peerID := meta.ID
    preExistPeer, ok := pm.remotePeers[peerID]
    if ok {
        pm.logger.Info().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("Peer add collision. Outbound connection of higher hash will survive.")
        iAmLowerOrEqual := p2putil.ComparePeerID(pm.is.SelfNodeID(), meta.ID) <= 0
        if iAmLowerOrEqual == remote.Connection.Outbound {
            pm.logger.Info().Stringer("local_peer_id", types.LogPeerShort(pm.is.SelfNodeID())).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Bool("outbound", remote.Connection.Outbound).Msg("Close connection and keep earlier handshake connection.")
            return nil
        } else {
            pm.logger.Info().Stringer("local_peer_id", types.LogPeerShort(pm.is.SelfNodeID())).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Bool("outbound", remote.Connection.Outbound).Msg("Keep connection and close earlier handshake connection.")
            pm.logger.Info().Stringer("local_peer_id", types.LogPeerShort(pm.is.SelfNodeID())).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Bool("outbound", remote.Connection.Outbound).Msg("Keep connection and close earlier handshake connection.")
            // stopping lower valued connection
            preExistPeer.Stop()
        }
    }

    remote = pm.changePeerAttributes(remote, peerID)
    newPeer := pm.peerFactory.CreateRemotePeer(remote, pm.GetNextManageNum(), hsResult.msgRW)
    newPeer.UpdateBlkCache(hsResult.bestHash, hsResult.bestNo)

    go newPeer.RunPeer()

    pm.insertPeer(peerID, newPeer)
    pm.logger.Info().Str("claimedRole", newPeer.Meta().Role.String()).Str("role", newPeer.AcceptedRole().String()).Bool("outbound", remote.Connection.Outbound).Str("zone", remote.Zone.String()).Str(p2putil.LogPeerName, newPeer.Name()).Str("addr", remote.Connection.IP.String()+":"+strconv.Itoa(int(remote.Connection.Port))).Msg("peer is added to peerService")

    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    for _, listener := range pm.eventListeners {
        listener.OnPeerConnect(peerID)
    }

    return newPeer
}

func (pm *peerManager) changePeerAttributes(remote p2pcommon.RemoteInfo, peerID types.PeerID) p2pcommon.RemoteInfo {
    // override options by configurations of node
    _, remote.Designated = pm.designatedPeers[peerID]
    // hidden is set by either remote peer's asking or local node's config
    if _, exist := pm.hiddenPeerSet[peerID]; exist {
        remote.Hidden = true
    }

    return remote
}

func (pm *peerManager) GetNextManageNum() uint32 {
    return atomic.AddUint32(&pm.manageNumber, 1)
}

func (pm *peerManager) AddNewPeer(meta p2pcommon.PeerMeta) {
    pm.addPeerChannel <- meta
}

func (pm *peerManager) RemovePeer(peer p2pcommon.RemotePeer) {
    pm.removePeerChannel <- peer
}

func (pm *peerManager) NotifyPeerAddressReceived(metas []p2pcommon.PeerMeta) {
    pm.fillPoolChannel <- metas
}

func (pm *peerManager) UpdatePeerRole(changes []p2pcommon.RoleModifier) {
    pm.taskChannel <- func() {
        changedCnt := 0
        pm.logger.Debug().Int("size", len(changes)).Msg("changing roles of peers")
        rm := pm.is.RoleManager()
        for _, ch := range changes {
            if peer, found := pm.remotePeers[ch.ID]; found {
                if rm.CheckRole(peer.RemoteInfo(), ch.Role) {
                    changedCnt++
                    pm.logger.Debug().Str(p2putil.LogPeerName, peer.Name()).Str("from", peer.AcceptedRole().String()).Str("to", ch.Role.String()).Msg("changing role of peer")
                    peer.ChangeRole(ch.Role)
                } else {
                    pm.logger.Debug().Str(p2putil.LogPeerName, peer.Name()).Str("from", peer.AcceptedRole().String()).Str("to", ch.Role.String()).Msg("refuse to change role of peer")
                }
            }
        }
        if changedCnt > 0 {
            pm.updatePeerCache()
        }
    }
}

// removePeer unregister managed remote peer connection
// It return true if peer is exist and managed by peermanager
// it must called in peermanager goroutine
func (pm *peerManager) removePeer(peer p2pcommon.RemotePeer) bool {
    peerID := peer.ID()
    target, ok := pm.remotePeers[peerID]
    if !ok {
        return false
    }
    if target.ManageNumber() != peer.ManageNumber() {
        pm.logger.Debug().Uint32("remove_num", peer.ManageNumber()).Uint32("exist_num", target.ManageNumber()).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("remove peer is requested but already removed and other instance is on")
        return false
    }
    if target.State() == types.RUNNING {
        pm.logger.Warn().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("remove peer is requested but peer is still running")
    }
    pm.deletePeer(peer)
    pm.logger.Info().Uint32("manage_num", peer.ManageNumber()).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("removed peer in peermanager")

    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    for _, listener := range pm.eventListeners {
        listener.OnPeerDisconnect(peer)
    }

    return true
}

func (pm *peerManager) GetPeer(ID types.PeerID) (p2pcommon.RemotePeer, bool) {

    gc := getPeerTask{id: ID, ret: make(chan p2pcommon.RemotePeer)}
    // vs code's lint does not allow direct return of map operation
    pm.getPeerChannel <- gc
    ptr := <-gc.ret
    if ptr == nil {
        return nil, false
    }
    return ptr, true
}

func (pm *peerManager) GetPeers() []p2pcommon.RemotePeer {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    return pm.peerCache
}

func (pm *peerManager) GetProducerClassPeers() []p2pcommon.RemotePeer {
    return pm.bpClassPeers
}
func (pm *peerManager) GetWatcherClassPeers() []p2pcommon.RemotePeer {
    return pm.watchClassPeers
}

func (pm *peerManager) GetPeerBlockInfos() []types.PeerBlockInfo {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    infos := make([]types.PeerBlockInfo, len(pm.peerCache))
    for i, peer := range pm.peerCache {
        infos[i] = peer
    }
    return infos
}

func (pm *peerManager) GetPeerAddresses(noHidden bool, showSelf bool) []*message.PeerInfo {
    peers := make([]*message.PeerInfo, 0, len(pm.peerCache))
    if showSelf {
        meta := pm.is.SelfMeta()
        addr := meta.ToPeerAddress()
        bestBlk, err := pm.actorService.GetChainAccessor().GetBestBlock()
        if err != nil {
            return nil
        }
        // add self certificates if local peer is agent
        localCerts, err := p2putil.ConvertCertsToProto(pm.cm.GetCertificates())
        selfpi := &message.PeerInfo{
            Addr: &addr, Certificates: localCerts, AcceptedRole: meta.Role, Version: meta.Version, Hidden: meta.Hidden, CheckTime: time.Now(), LastBlockHash: bestBlk.BlockHash(), LastBlockNumber: bestBlk.Header.BlockNo, State: types.RUNNING, Self: true}
        peers = append(peers, selfpi)
    }
    for _, aPeer := range pm.peerCache {
        ri := aPeer.RemoteInfo()
        if noHidden && ri.Hidden {
            continue
        }
        meta := aPeer.Meta()
        addr := meta.ToPeerAddress()
        lastStatus := aPeer.LastStatus()
        rCerts, _ := p2putil.ConvertCertsToProto(aPeer.RemoteInfo().Certificates)
        pi := &message.PeerInfo{
            Addr: &addr, Certificates: rCerts, AcceptedRole: aPeer.AcceptedRole(), Version: meta.Version, Hidden: ri.Hidden, CheckTime: lastStatus.CheckTime, LastBlockHash: lastStatus.BlockHash, LastBlockNumber: lastStatus.BlockNumber, State: aPeer.State(), Self: false}
        peers = append(peers, pi)
    }
    return peers
}

// this method should be called inside pm.mutex
func (pm *peerManager) insertPeer(ID types.PeerID, peer p2pcommon.RemotePeer) {
    pm.remotePeers[ID] = peer
    pm.updatePeerCache()
}

// this method should be called inside pm.mutex
func (pm *peerManager) deletePeer(peer p2pcommon.RemotePeer) {
    pm.mm.Remove(peer.ID(), peer.ManageNumber())
    delete(pm.remotePeers, peer.ID())
    pm.updatePeerCache()
}

func (pm *peerManager) updatePeerCache() {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    size := len(pm.remotePeers)
    newSlice := make([]p2pcommon.RemotePeer, 0, size)
    newBSlice := make([]p2pcommon.RemotePeer, 0, size)
    newWSlice := make([]p2pcommon.RemotePeer, 0, size)
    for _, rPeer := range pm.remotePeers {
        newSlice = append(newSlice, rPeer)
        if rPeer.AcceptedRole() == types.PeerRole_Producer || rPeer.AcceptedRole() == types.PeerRole_Agent {
            newBSlice = append(newBSlice, rPeer)
        } else {
            newWSlice = append(newWSlice, rPeer)
        }

    }
    pm.peerCache = newSlice
    pm.bpClassPeers = newBSlice
    pm.watchClassPeers = newWSlice
}

func (pm *peerManager) checkSync(peer p2pcommon.RemotePeer) {
    if !pm.skipHandshakeSync {
        pm.logger.Debug().Uint64("target", peer.LastStatus().BlockNumber).Msg("request new syncer")
        pm.actorService.SendRequest(message.SyncerSvc, &message.SyncStart{PeerID: peer.ID(), TargetNo: peer.LastStatus().BlockNumber})
    }

    // send txs in mempool
    peer.DoTask(func(p p2pcommon.RemotePeer) {
        future := pm.actorService.FutureRequest(message.MemPoolSvc, &message.MemPoolList{Limit: DefaultPeerTxQueueSize * 100}, p2pcommon.DefaultActorMsgTTL<<1)
        go doSlowPush(pm.logger, peer, future)
    })
}

func doSlowPush(logger *log.Logger, peer p2pcommon.RemotePeer, future *actor.Future) {
    raw, err := future.Result()
    if err != nil {
        logger.Debug().Err(err).Str(p2putil.LogPeerName, peer.Name()).Msg("Failed to get txs in mempool, skip notifying tx to newly connected peer")
        return
    }
    resp, ok := raw.(*message.MemPoolListRsp)
    if !ok {
        logger.Debug().Str(p2putil.LogPeerName, peer.Name()).Msg("mempool response unexpeted type, skip notifying tx to newly connected peer")
    }
    if len(resp.Hashes) > 0 {
        logger.Debug().Str(p2putil.LogPeerName, peer.Name()).Array("txIDs", types.NewLogTxIDsMarshaller(resp.Hashes, 10)).Msg("Sending txIds to newly connected peer")
        slowPush(peer, resp.Hashes, txNoticeInterval>>2)
    }
}

func slowPush(peer p2pcommon.RemotePeer, hashes []types.TxID, pushInterval time.Duration) {
    unitSize := DefaultPeerTxQueueSize >> 1
    remain := len(hashes)
    var cut []types.TxID
    for remain > 0 {
        if peer.State() != types.RUNNING {
            break
        }
        if remain > unitSize {
            cut = hashes[:unitSize]
        } else {
            cut = hashes[:remain]
        }
        hashes = hashes[len(cut):]
        remain = len(hashes)
        peer.PushTxsNotice(cut)
        time.Sleep(pushInterval)
    }
}

func (pm *peerManager) AddDesignatedPeer(meta p2pcommon.PeerMeta) {
    finished := make(chan interface{})
    pm.taskChannel <- func() {
        pm.designatedPeers[meta.ID] = meta
        finished <- struct{}{}
    }
    <-finished
}

func (pm *peerManager) RemoveDesignatedPeer(peerID types.PeerID) {
    finished := make(chan interface{})
    pm.taskChannel <- func() {
        delete(pm.designatedPeers, peerID)
        finished <- struct{}{}
    }
    <-finished
}

func (pm *peerManager) ListDesignatedPeers() []p2pcommon.PeerMeta {
    retChan := make(chan []p2pcommon.PeerMeta)
    pm.taskChannel <- func() {
        arr := make([]p2pcommon.PeerMeta, 0, len(pm.designatedPeers))
        for _, m := range pm.designatedPeers {
            arr = append(arr, m)
        }
        retChan <- arr
    }
    return <-retChan
}

// pmTask should not consume lots of time to process.
type pmTask func()