aergoio/aergo

View on GitHub
p2p/waitpeermanager.go

Summary

Maintainability
A
3 hrs
Test Coverage
F
51%
/*
 * @file
 * @copyright defined in aergo/LICENSE.txt
 */

package p2p

import (
    "errors"
    "fmt"
    "sort"
    "time"

    "github.com/aergoio/aergo-lib/log"
    "github.com/aergoio/aergo/v2/p2p/p2pcommon"
    "github.com/aergoio/aergo/v2/p2p/p2putil"
    "github.com/aergoio/aergo/v2/types"
    "github.com/libp2p/go-libp2p-core/network"
)

// connPeerResult is result of connection work of the waitingPeerManager.
type connPeerResult struct {
    msgRW p2pcommon.MsgReadWriter

    remote       p2pcommon.RemoteInfo
    bestHash     types.BlockID
    bestNo       types.BlockNo
    Certificates []*p2pcommon.AgentCertificateV1
}

func NewWaitingPeerManager(logger *log.Logger, is p2pcommon.InternalService, pm *peerManager, lm p2pcommon.ListManager, maxCap int, useDiscover bool) p2pcommon.WaitingPeerManager {
    var wpm p2pcommon.WaitingPeerManager
    if !useDiscover {
        sp := &staticWPManager{basePeerManager: basePeerManager{is: is, pm: pm, lm: lm, logger: logger, workingJobs: make(map[types.PeerID]ConnWork)}}
        wpm = sp
    } else {
        dp := &dynamicWPManager{basePeerManager: basePeerManager{is: is, pm: pm, lm: lm, logger: logger, workingJobs: make(map[types.PeerID]ConnWork)}, maxPeers: maxCap}
        wpm = dp
    }

    return wpm
}

type basePeerManager struct {
    is p2pcommon.InternalService
    pm *peerManager
    lm p2pcommon.ListManager

    logger      *log.Logger
    workingJobs map[types.PeerID]ConnWork
}

func (dpm *basePeerManager) OnInboundConn(s network.Stream) {
    peerID := s.Conn().RemotePeer()
    addr := s.Conn().RemoteMultiaddr()
    ip, port, err := types.GetIPPortFromMultiaddr(addr)
    if err != nil {
        dpm.logger.Warn().Err(err).Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("Can't get ip address and port from inbound peer")
        s.Close()
    }
    conn := p2pcommon.RemoteConn{Outbound: false, IP: ip, Port: port}
    tempMeta := p2pcommon.PeerMeta{ID: peerID, Addresses: []types.Multiaddr{addr}}

    dpm.logger.Info().Str(p2putil.LogFullID, peerID.Pretty()).Str("multiaddr", addr.String()).Msg("new inbound peer arrived")
    if banned, _ := dpm.lm.IsBanned(ip.String(), peerID); banned {
        dpm.logger.Info().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Str("multiaddr", addr.String()).Msg("inbound peer is banned by list manager")
        s.Close()
        return
    }

    query := inboundConnEvent{conn: conn, meta: tempMeta, p2pVer: p2pcommon.P2PVersionUnknown, foundC: make(chan bool)}
    dpm.pm.inboundConnChan <- query
    if exist := <-query.foundC; exist {
        dpm.logger.Debug().Stringer(p2putil.LogPeerID, types.LogPeerShort(peerID)).Msg("same peer as inbound peer already exists.")
        s.Close()
        return
    }

    h := dpm.pm.hsFactory.CreateHSHandler(false, peerID)
    // check if remote peer is connected (already handshaked)
    _, added := dpm.tryAddPeer(false, tempMeta, s, h)
    if !added {
        s.Close()
    }
}

func (dpm *basePeerManager) CheckAndConnect() {
    dpm.logger.Debug().Msg("checking space to connect more peers")
    maxJobs := dpm.getRemainingSpaces()
    if maxJobs == 0 {
        return
    }
    dpm.connectWaitingPeers(maxJobs)
}

func (dpm *basePeerManager) InstantConnect(meta p2pcommon.PeerMeta) {
    if _, ok := dpm.pm.remotePeers[meta.ID]; ok {
        // skip    if peer is already connected
        return
    } else if wp, ok := dpm.pm.waitingPeers[meta.ID]; ok {
        // reset next trial to try connect
        wp.NextTrial = time.Now().Add(-time.Hour)
        wp.TrialCnt = 0
    } else {
        // add to waiting peer
        _, designated := dpm.pm.designatedPeers[meta.ID]
        dpm.pm.waitingPeers[meta.ID] = &p2pcommon.WaitingPeer{Meta: meta, Designated: designated, NextTrial: time.Now().Add(-time.Hour)}
    }
    dpm.connectWaitingPeers(1)
}

func (dpm *basePeerManager) connectWaitingPeers(maxJob int) {
    // do try to connection at most maxJobs cnt,
    peers := make([]*p2pcommon.WaitingPeer, 0, len(dpm.pm.waitingPeers))
    for _, wp := range dpm.pm.waitingPeers {
        peers = append(peers, wp)
    }
    sort.Sort(byNextTrial(peers))

    added := 0
    now := time.Now()
    for _, wp := range peers {
        if added >= maxJob {
            break
        }
        if !wp.NextTrial.After(now) {
            // check if peer is currently working now
            if _, exist := dpm.workingJobs[wp.Meta.ID]; exist {
                continue
            }
            // 2019.09.02 connecting to outbound peer is not affected by whitelist. inbound peer will block
            //if banned, _ := dpm.lm.IsBanned(wp.Meta.IPAddress, wp.Meta.ID); banned {
            //    dpm.logger.Info().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(wp.Meta)).Msg("Skipping banned peer")
            //    continue
            //}
            dpm.logger.Info().Int("trial", wp.TrialCnt).Stringer(p2putil.LogPeerID, types.LogPeerShort(wp.Meta.ID)).Msg("Starting scheduled try to connect peer")

            dpm.workingJobs[wp.Meta.ID] = ConnWork{Meta: wp.Meta, PeerID: wp.Meta.ID, StartTime: time.Now()}
            go dpm.runTryOutboundConnect(wp)
            added++
        } else {
            continue
        }
    }
}

// getRemainingSpaces check and return the number that can do connection work.
// the number depends on the number of current works and the number of waiting peers
func (dpm *basePeerManager) getRemainingSpaces() int {
    // simpler version. just check total count
    // has space to add more connection
    if len(dpm.pm.waitingPeers) <= 0 {
        return 0
    }
    affordWorker := p2pcommon.MaxConcurrentHandshake - len(dpm.workingJobs)
    if affordWorker <= 0 {
        return 0
    }
    return affordWorker
}

func (dpm *basePeerManager) runTryOutboundConnect(wp *p2pcommon.WaitingPeer) {
    workResult := p2pcommon.ConnWorkResult{Meta: wp.Meta, TargetPeer: wp}
    defer func() {
        dpm.pm.workDoneChannel <- workResult
    }()

    meta := wp.Meta
    s, err := dpm.getStream(meta)
    if err != nil {
        dpm.logger.Info().Err(err).Stringer(p2putil.LogPeerID, types.LogPeerShort(meta.ID)).Msg("Failed to get stream.")
        workResult.Result = err
        return
    }
    h := dpm.pm.hsFactory.CreateHSHandler(true, meta.ID)
    // handshake
    _, added := dpm.tryAddPeer(true, meta, s, h)
    if !added {
        s.Close()
        workResult.Result = errors.New("handshake failed")
        return
        //} else {
        //    if meta.IPAddress != completeMeta.IPAddress {
        //        dpm.logger.Debug().Stringer(p2putil.LogPeerID, types.LogPeerShort(completeMeta.ID)).Str("before", meta.IPAddress).Str("after", completeMeta.IPAddress).Msg("IP address of remote peer is changed to ")
        //    }
    }
}

// getStream returns is wire handshake is legacy or newer
func (dpm *basePeerManager) getStream(meta p2pcommon.PeerMeta) (network.Stream, error) {
    // try connect peer with possible versions
    s, err := dpm.pm.nt.GetOrCreateStream(meta, p2pcommon.P2PSubAddr)
    if err != nil {
        return nil, err
    }
    switch s.Protocol() {
    case p2pcommon.P2PSubAddr:
        return s, nil
    default:
        return nil, fmt.Errorf("unknown p2p wire protocol %v", s.Protocol())
    }
}

// tryAddPeer will do check connecting peer and add. it will return peer meta information received from
// remote peer. stream s will be owned to remotePeer if succeed to add peer.
func (dpm *basePeerManager) tryAddPeer(outbound bool, meta p2pcommon.PeerMeta, s network.Stream, h p2pcommon.HSHandler) (p2pcommon.PeerMeta, bool) {
    hResult, err := h.Handle(s, defaultHandshakeTTL)
    if err != nil {
        dpm.logger.Debug().Err(err).Bool("outbound", outbound).Stringer(p2putil.LogPeerID, types.LogPeerShort(meta.ID)).Msg("Failed to handshake")
        return meta, false
    }

    // update peer meta info using sent information from remote peer
    remoteInfo := dpm.createRemoteInfo(s.Conn(), *hResult, outbound)

    dpm.pm.peerConnected <- connPeerResult{remote: remoteInfo, msgRW: hResult.MsgRW, bestHash: hResult.BestBlockHash, bestNo: hResult.BestBlockNo, Certificates: hResult.Certificates}
    return remoteInfo.Meta, true
}

// createRemoteInfo create incomplete struct, field acceptedRole is not set yet
func (dpm *basePeerManager) createRemoteInfo(conn network.Conn, r p2pcommon.HandshakeResult, outbound bool) p2pcommon.RemoteInfo {
    rma := conn.RemoteMultiaddr()
    ip, port, err := types.GetIPPortFromMultiaddr(rma)
    if err != nil {
        panic("conn information is wrong : " + err.Error())
    }

    connection := p2pcommon.RemoteConn{IP: ip, Port: port, Outbound: outbound}
    zone := p2pcommon.PeerZone(p2putil.IsContainedIP(ip, dpm.is.LocalSettings().InternalZones))
    ri := p2pcommon.RemoteInfo{Meta: r.Meta, Connection: connection, Hidden: r.Hidden, Certificates: r.Certificates, AcceptedRole: types.PeerRole_Watcher, Zone: zone}

    // TODO Is it OK to this function has logic for policy?
    // check role
    switch r.Meta.Role {
    case types.PeerRole_Producer:
        // TODO check consensus and peer id is in to top vote list or bp list
        ri.AcceptedRole = types.PeerRole_Producer
    case types.PeerRole_Agent:
        // check if agent has at least one certificate
        if len(r.Certificates) > 0 {
            ri.AcceptedRole = types.PeerRole_Agent
        } else {
            dpm.logger.Debug().Stringer(p2putil.LogPeerID, types.LogPeerShort(r.Meta.ID)).Msg("treat peer which claims agent but with no certificates, as Watcher")
        }
    default:
        ri.AcceptedRole = r.Meta.Role
    }

    return ri
}

func (dpm *basePeerManager) OnWorkDone(result p2pcommon.ConnWorkResult) {
    meta := result.Meta
    delete(dpm.workingJobs, meta.ID)
    wp, ok := dpm.pm.waitingPeers[meta.ID]
    if !ok {
        dpm.logger.Debug().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(meta)).Err(result.Result).Msg("Connection job finished")
        return
    } else {
        dpm.logger.Debug().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(meta)).Int("trial", wp.TrialCnt).Err(result.Result).Msg("Connection job finished")
    }
    wp.LastResult = result.Result
    // success to connect
    if result.Result == nil {
        dpm.logger.Debug().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(meta)).Msg("Connected job succeeded, so delete it from waiting peers")
        delete(dpm.pm.waitingPeers, meta.ID)
    } else {
        // leave waiting peer if needed to reconnect
        if !setNextTrial(wp) {
            dpm.logger.Debug().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(meta)).Msg("Connected job failed, but will not retry unimportant peer.")
            delete(dpm.pm.waitingPeers, meta.ID)
        } else {
            dpm.logger.Debug().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(meta)).Time("next_time", wp.NextTrial).Msg("Connected job failed, and will retry important peer")
        }
    }
}

type staticWPManager struct {
    basePeerManager
}

func (spm *staticWPManager) OnPeerConnect(pid types.PeerID) {
    delete(spm.pm.waitingPeers, pid)
}

func (spm *staticWPManager) OnPeerDisconnect(peer p2pcommon.RemotePeer) {
    // if peer is designated peer , try reconnect by add peermeta to waiting peer
    if _, ok := spm.pm.designatedPeers[peer.ID()]; ok {
        spm.logger.Debug().Str(p2putil.LogPeerID, peer.Name()).Msg("server will try to reconnect designated peer after cooltime")
        // These peers must have cool time.
        spm.pm.waitingPeers[peer.ID()] = &p2pcommon.WaitingPeer{Meta: peer.Meta(), Designated: true, NextTrial: time.Now().Add(firstReconnectCoolTime)}
    }
}

func (spm *staticWPManager) OnDiscoveredPeers(metas []p2pcommon.PeerMeta) int {
    // static manager don't need to discovered peer.
    return 0
}

type dynamicWPManager struct {
    basePeerManager

    maxPeers int
}

func (dpm *dynamicWPManager) OnPeerConnect(pid types.PeerID) {
    // remove peer from wait pool
    delete(dpm.pm.waitingPeers, pid)
}

func (dpm *dynamicWPManager) OnPeerDisconnect(peer p2pcommon.RemotePeer) {
    // if peer is designated peer or trusted enough , try reconnect by add peermeta to waiting peer
    // TODO check by trust level is not implemented yet.
    if _, ok := dpm.pm.designatedPeers[peer.ID()]; ok {
        dpm.logger.Debug().Str(p2putil.LogPeerID, peer.Name()).Msg("server will try to reconnect designated peer after cooltime")
        // These peers must have cool time.
        dpm.pm.waitingPeers[peer.ID()] = &p2pcommon.WaitingPeer{Meta: peer.Meta(), Designated: true, NextTrial: time.Now().Add(firstReconnectCoolTime)}
        //dpm.pm.addAwait(peer.Meta())
    }
}

func (dpm *dynamicWPManager) OnDiscoveredPeers(metas []p2pcommon.PeerMeta) int {
    addedWP := 0
    for _, meta := range metas {
        if _, ok := dpm.pm.remotePeers[meta.ID]; ok {
            // skip connected peer
            continue
        } else if _, ok := dpm.pm.waitingPeers[meta.ID]; ok {
            // skip already waiting peer
            continue
        }

        // TODO check blacklist later.
        dpm.pm.waitingPeers[meta.ID] = &p2pcommon.WaitingPeer{Meta: meta, NextTrial: time.Now()}
        addedWP++
    }
    return addedWP
}

func (dpm *dynamicWPManager) CheckAndConnect() {
    dpm.logger.Debug().Msg("checking space to connect more peers")
    maxJobs := dpm.getRemainingSpaces()
    if maxJobs == 0 {
        return
    }
    dpm.connectWaitingPeers(maxJobs)
}

func (dpm *dynamicWPManager) getRemainingSpaces() int {
    // simpler version. just check total count
    // has space to add more connection
    affordCnt := dpm.maxPeers - len(dpm.pm.remotePeers) - len(dpm.workingJobs)
    if affordCnt <= 0 {
        return 0
    }
    affordWorker := dpm.basePeerManager.getRemainingSpaces()
    if affordCnt < affordWorker {
        return affordCnt
    } else {
        return affordWorker
    }
}

type inboundConnEvent struct {
    conn   p2pcommon.RemoteConn
    meta   p2pcommon.PeerMeta
    p2pVer p2pcommon.P2PVersion
    foundC chan bool
}

type byNextTrial []*p2pcommon.WaitingPeer

func (a byNextTrial) Len() int           { return len(a) }
func (a byNextTrial) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a byNextTrial) Less(i, j int) bool { return a[i].NextTrial.Before(a[j].NextTrial) }

type ConnWork struct {
    PeerID    types.PeerID
    Meta      p2pcommon.PeerMeta
    StartTime time.Time
}

// setNextTrial check if peer is worthy to connect, and set time when the server try to connect next time.
// It will true if this node is worth to try connect again, or return false if not.
func setNextTrial(wp *p2pcommon.WaitingPeer) bool {
    if wp.Designated {
        wp.TrialCnt++
        wp.NextTrial = time.Now().Add(getNextInterval(wp.TrialCnt))
        return true
    } else {
        return false
    }
}