aergoio/aergo

View on GitHub
syncer/finder.go

Summary

Maintainability
A
40 mins
Test Coverage
B
88%
package syncer

import (
    "bytes"
    "sync"
    "time"

    "github.com/aergoio/aergo/v2/chain"
    "github.com/aergoio/aergo/v2/internal/enc/base58"
    "github.com/aergoio/aergo/v2/pkg/component"
    "github.com/aergoio/aergo/v2/types"
    "github.com/aergoio/aergo/v2/types/message"
    "github.com/pkg/errors"
)

type Finder struct {
    compRequester component.IComponentRequester //for communicate with other service
    chain         types.ChainAccessor

    anchorCh chan chain.ChainAnchor
    lScanCh  chan *types.BlockInfo
    fScanCh  chan *message.GetHashByNoRsp

    quitCh chan interface{}

    lastAnchor []byte //point last block during lightscan
    ctx        types.SyncContext

    dfltTimeout time.Duration

    cfg *SyncerConfig

    isRunning bool
    waitGroup *sync.WaitGroup
}

type FinderResult struct {
    ancestor *types.BlockInfo
    err      error
}

var (
    ErrFinderQuit               = errors.New("sync finder quit")
    ErrorGetSyncAncestorTimeout = errors.New("timeout for GetSyncAncestor")
    ErrFinderTimeout            = errors.New("Finder timeout")
    ErrAlreadySyncDone          = errors.New("Already sync done")
)

func newFinder(ctx *types.SyncContext, compRequester component.IComponentRequester, chain types.ChainAccessor, cfg *SyncerConfig) *Finder {
    finder := &Finder{ctx: *ctx, compRequester: compRequester, chain: chain, cfg: cfg}

    finder.dfltTimeout = cfg.fetchTimeOut
    finder.quitCh = make(chan interface{})
    finder.lScanCh = make(chan *types.BlockInfo)
    finder.lScanCh = make(chan *types.BlockInfo)
    finder.fScanCh = make(chan *message.GetHashByNoRsp)

    return finder
}

// TODO refactoring: move logic to SyncContext (sync Object)
func (finder *Finder) start() {
    finder.waitGroup = &sync.WaitGroup{}
    finder.waitGroup.Add(1)
    finder.isRunning = true

    run := func() {
        var ancestor *types.BlockInfo
        var err error

        defer RecoverSyncer(NameFinder, finder.GetSeq(), finder.compRequester, func() { finder.waitGroup.Done() })

        logger.Debug().Msg("start to find common ancestor")

        //1. light sync
        //   gather summary of my chain nodes, runTask searching ancestor to remote node
        ancestor, err = finder.lightscan()

        //2. heavy sync
        //     full binary search in my chain
        if ancestor == nil && err == nil {
            ancestor, err = finder.fullscan()
        }

        if err != nil {
            logger.Debug().Msg("quit finder")
            stopSyncer(finder.compRequester, finder.GetSeq(), NameFinder, err)
            return
        }

        finder.compRequester.TellTo(message.SyncerSvc, &message.FinderResult{Seq: finder.GetSeq(), Ancestor: ancestor, Err: nil})
        logger.Info().Msg("stopped finder successfully")
    }

    go run()
}

func (finder *Finder) stop() {
    if finder == nil {
        return
    }

    logger.Info().Msg("finder stop#1")

    if finder.isRunning {
        logger.Debug().Msg("finder closed quitChannel")

        close(finder.quitCh)
        finder.isRunning = false
    }

    finder.waitGroup.Wait()

    logger.Info().Msg("finder stop#2")
}

func (finder *Finder) GetSeq() uint64 {
    return finder.ctx.Seq
}

func (finder *Finder) GetHashByNoRsp(rsp *message.GetHashByNoRsp) {
    finder.fScanCh <- rsp
}

func (finder *Finder) lightscan() (*types.BlockInfo, error) {
    if finder.cfg.useFullScanOnly {
        finder.ctx.LastAnchor = finder.ctx.BestNo + 1
        return nil, nil
    }

    var ancestor *types.BlockInfo

    anchors, err := finder.getAnchors()
    if err != nil {
        return nil, err
    }

    ancestor, err = finder.getAncestor(anchors)

    if ancestor == nil {
        logger.Debug().Msg("not found ancestor in lightscan")
    } else {
        logger.Info().Str("hash", base58.Encode(ancestor.Hash)).Uint64("no", ancestor.No).Msg("find ancestor in lightscan")

        if ancestor.No >= finder.ctx.TargetNo {
            logger.Info().Msg("already synchronized")
            return nil, ErrAlreadySyncDone
        }
    }

    return ancestor, err
}

func (finder *Finder) getAnchors() ([][]byte, error) {
    result, err := finder.compRequester.RequestToFutureResult(message.ChainSvc, &message.GetAnchors{Seq: finder.GetSeq()}, finder.dfltTimeout, "Finder/getAnchors")
    if err != nil {
        logger.Error().Err(err).Msg("failed to get anchors")
        return nil, err
    }

    anchors := result.(message.GetAnchorsRsp).Hashes
    if len(anchors) > 0 {
        finder.ctx.LastAnchor = result.(message.GetAnchorsRsp).LastNo
    }

    logger.Info().Str("start", base58.Encode(anchors[0])).Int("count", len(anchors)).Uint64("last", finder.ctx.LastAnchor).Msg("get anchors from chain")

    return anchors, nil
}

func (finder *Finder) getAncestor(anchors [][]byte) (*types.BlockInfo, error) {
    //    send remote Peer
    logger.Debug().Stringer("peer", types.LogPeerShort(finder.ctx.PeerID)).Msg("send GetAncestor message to peer")
    finder.compRequester.TellTo(message.P2PSvc, &message.GetSyncAncestor{Seq: finder.GetSeq(), ToWhom: finder.ctx.PeerID, Hashes: anchors})

    timer := time.NewTimer(finder.dfltTimeout)

    for {
        select {
        case result := <-finder.lScanCh:
            //valid response
            if result == nil || result.No >= finder.ctx.LastAnchor {
                return result, nil
            }
        case <-timer.C:
            logger.Error().Float64("sec", finder.dfltTimeout.Seconds()).Msg("get ancestor response timeout")
            return nil, ErrorGetSyncAncestorTimeout
        case <-finder.quitCh:
            return nil, ErrFinderQuit
        }
    }
}

// TODO binary search scan
func (finder *Finder) fullscan() (*types.BlockInfo, error) {
    logger.Debug().Msg("finder fullscan")

    ancestor, err := finder.binarySearch(0, finder.ctx.LastAnchor-1)
    if err != nil {
        logger.Error().Err(err).Msg("finder fullscan failed")
        return nil, err
    }

    if ancestor == nil {
        logger.Info().Msg("failed to search ancestor in fullscan")
    } else {
        logger.Info().Uint64("no", ancestor.No).Str("hash", base58.Encode(ancestor.Hash)).Msg("find ancestor in fullscan")
    }

    return ancestor, err
}

func (finder *Finder) binarySearch(left uint64, right uint64) (*types.BlockInfo, error) {
    var mid uint64
    var lastMatch *types.BlockInfo
    for left <= right {
        // get median
        mid = (left + right) / 2
        // request hash of median from remote
        logger.Debug().Uint64("left", left).Uint64("right", right).Uint64("mid", mid).Msg("finder scan")

        midHash, err := finder.chain.GetHashByNo(mid)
        if err != nil {
            logger.Error().Uint64("no", mid).Err(err).Msg("finder failed to get local hash")
            return nil, err
        }

        exist, err := finder.hasSameHash(mid, midHash)
        if err != nil {
            logger.Error().Err(err).Msg("finder failed to check remote hash")
            return nil, err
        }

        if exist {
            left = mid + 1

            lastMatch = &types.BlockInfo{Hash: midHash, No: mid}
            logger.Debug().Uint64("mid", mid).Msg("matched")
        } else {
            if mid == 0 {
                break
            } else {
                right = mid - 1
            }
        }
    }

    return lastMatch, nil
}

func (finder *Finder) hasSameHash(no types.BlockNo, localHash []byte) (bool, error) {
    finder.compRequester.TellTo(message.P2PSvc, &message.GetHashByNo{Seq: finder.GetSeq(), ToWhom: finder.ctx.PeerID, BlockNo: no})

    recvHashRsp := func() (*message.GetHashByNoRsp, error) {
        timer := time.NewTimer(finder.dfltTimeout)

        for {
            select {
            case result := <-finder.fScanCh:
                return result, result.Err
            case <-timer.C:
                logger.Error().Float64("sec", finder.dfltTimeout.Seconds()).Msg("finder get response timeout")
                return nil, ErrFinderTimeout
            case <-finder.quitCh:
                return nil, ErrFinderQuit
            }
        }
    }

    rspMsg, err := recvHashRsp()
    if err != nil || rspMsg.BlockHash == nil {
        logger.Error().Err(err).Msg("finder failed to get remote hash")
        return false, err
    }

    if bytes.Equal(localHash, rspMsg.BlockHash) {
        logger.Debug().Uint64("no", no).Msg("exist hash")
        return true, nil
    } else {
        logger.Debug().Uint64("no", no).Msg("not exist hash")
        return false, nil
    }
}