lbryio/chainquery

View on GitHub
daemon/jobs/chainsync.go

Summary

Maintainability
C
1 day
Test Coverage
package jobs

import (
    "database/sql"
    "encoding/hex"
    "encoding/json"
    "fmt"
    "strconv"
    "strings"
    "time"

    util2 "github.com/lbryio/chainquery/util"

    "github.com/lbryio/chainquery/daemon/processing"
    "github.com/lbryio/chainquery/datastore"
    "github.com/lbryio/chainquery/global"
    "github.com/lbryio/chainquery/lbrycrd"
    "github.com/lbryio/chainquery/metrics"
    "github.com/lbryio/chainquery/model"

    "github.com/lbryio/lbry.go/v2/extras/errors"
    "github.com/lbryio/lbry.go/v2/schema/stake"

    "github.com/sirupsen/logrus"
    "github.com/volatiletech/null/v8"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"
)

var chainSyncRunning = false
var chainSync *chainSyncStatus

// ChainSyncRunDuration specifies the duration, in seconds, the chain sync job will run at a time before stopping and
// storing state. It will get triggered periodically.
var ChainSyncRunDuration int

// ChainSyncDelay Specifies the duration, in milliseconds, between each block it synchronizes. Depending on the usage of
//the database you will want to add some delay between blocks so it does not overload the db server.
var ChainSyncDelay int

const chainSyncJob = "chainsync"

//ChainSyncAsync triggers the chain sync job in the background and returns
func ChainSyncAsync() {
    if !chainSyncRunning {
        chainSyncRunning = true
        go ChainSync()
    }
}

func endChainSync() {
    chainSyncRunning = false
    if r := recover(); r != nil {
        logrus.Error("Recovered From: ", r)
    }
}

// ChainSync synchronizes the chain data when it does not match lbrycrd. It runs for x duration before it stores state.
func ChainSync() {
    metrics.JobLoad.WithLabelValues("chain_sync").Inc()
    defer metrics.JobLoad.WithLabelValues("chain_sync").Dec()
    defer metrics.Job(time.Now(), "chain_sync")
    defer endChainSync()
    if chainSync == nil {
        chainSync = &chainSyncStatus{}
    }

    job, err := getChainSyncJobStatus()
    if err != nil {
        logrus.Error(err)
        saveJobError(job, err)
        return
    }

    if chainSync.LastHeight >= chainSync.MaxHeightStored {
        err := chainSync.updateMaxHeightStored()
        if err != nil {
            saveJobError(job, err)
            logrus.Error(err)
            return
        }
    }

    timeLimit := time.Now().Add(time.Duration(ChainSyncRunDuration) * time.Second)
    for time.Now().Before(timeLimit) && chainSync.LastHeight < chainSync.MaxHeightStored {
        err := chainSync.processNextBlock()
        if err != nil {
            logrus.Debugf("FAILURE @%d: %s", chainSync.LastHeight, err.Error())
        }
        time.Sleep(time.Duration(ChainSyncDelay) * time.Millisecond)
    }
    doneChainSyncJob(job)
}

type chainSyncStatus struct {
    JobStatus       *model.JobStatus   `json:"-"`
    Block           *model.Block       `json:"-"`
    Tx              *model.Transaction `json:"-"`
    Vin             *model.Input       `json:"-"`
    Vout            *model.Output      `json:"-"`
    LastHeight      int64              `json:"last_height"`
    MaxHeightStored int64              `json:"max_height_stored"`
    Errors          []syncError        `json:"z_errors"`
}

type syncError struct {
    HeightFound []int64 `json:"height_found"`
    Error       string  `json:"error"`
    Area        string  `json:"area"`
}

func (c *chainSyncStatus) processNextBlock() error {
    c.LastHeight = c.LastHeight + 1
    blockHash, err := lbrycrd.LBRYcrdClient.GetBlockHash(c.LastHeight)
    if err != nil {
        return c.recordAndReturnError(c.LastHeight, "lbrycrd-getblockhash", err)
    }
    lbrycrdBlock, err := lbrycrd.GetBlock(blockHash.String())
    if err != nil {
        return c.recordAndReturnError(c.LastHeight, "mysql-getblock", err)
    }
    recordedBlock, err := model.Blocks(model.BlockWhere.Hash.EQ(blockHash.String())).OneG()
    if err != nil {
        if errors.Is(err, sql.ErrNoRows) {
            logrus.Warningf("Missing block %d, populating it now", c.LastHeight)
            _, err = processing.ProcessBlock(uint64(c.LastHeight), nil, lbrycrdBlock)
            if err != nil {
                return c.recordAndReturnError(c.LastHeight, "daemon-process-block", err)
            }
        }
        return c.recordAndReturnError(c.LastHeight, "mysql-getblock", err)
    }
    c.Block = recordedBlock
    if err := c.alignBlock(lbrycrdBlock); err != nil {
        return c.recordAndReturnError(c.LastHeight, "block-alignment", err)
    }
    if err := c.alignTxs(recordedBlock, lbrycrdBlock.Tx); err != nil {
        return c.recordAndReturnError(c.LastHeight, "tx-alignment", err)
    }
    return nil
}

func (c *chainSyncStatus) alignTxs(block *model.Block, txHashes []string) error {
    for _, txHash := range txHashes {
        lbrycrdTx, err := lbrycrd.GetRawTransactionResponse(txHash)
        if err != nil {
            return c.recordAndReturnError(c.LastHeight, "tx-hash-creation", err)
        }
        w := model.TransactionWhere
        recordedTx, err := model.Transactions(w.BlockHashID.EQ(null.StringFrom(block.Hash)), w.Hash.EQ(txHash)).OneG()
        if err != nil {
            if errors.Is(err, sql.ErrNoRows) {
                err = processing.ProcessTx(lbrycrdTx, block.BlockTime, uint64(c.LastHeight))
                if err != nil {
                    c.recordError(c.LastHeight, "tx-processing", err)
                    continue
                }
            }
            return c.recordAndReturnError(c.LastHeight, "mysql-tx", err)
        }
        c.Tx = recordedTx
        if err := c.alignTx(lbrycrdTx); err != nil {
            return c.recordAndReturnError(c.LastHeight, "tx-alignment", err)
        }
        if err := c.alignVins(lbrycrdTx.Vin); err != nil {
            return c.recordAndReturnError(c.LastHeight, "vin-alignment", err)
        }
    }
    return nil
}

func (c chainSyncStatus) alignVouts(vouts []lbrycrd.Vout) error {
    for i, vout := range vouts {
        output := datastore.GetOutput(c.Tx.Hash, uint(vout.N))
        if output == nil {
            err := processing.ProcessVout(&vout, c.Tx, nil, uint64(i))
            if err != nil {
                return errors.Err(err)
            }
        } else {
            c.Vout = output
            err := c.alignVout(vout)
            if err != nil {
                c.recordError(c.LastHeight, "vout-alignment", err)
            }
        }
    }

    return nil
}

func (c *chainSyncStatus) alignVout(v lbrycrd.Vout) error {
    colsToUpdate := make([]string, 0)
    if c.Vout.Value.Float64 != v.Value {
        c.Vout.Value.SetValid(v.Value)
        colsToUpdate = append(colsToUpdate, model.OutputColumns.Value)
    }
    if len(colsToUpdate) > 0 {
        logrus.Debugf("found unaligned vout @%d and Tx %s with the following columns out of alignment: %s", c.LastHeight, c.Tx.Hash, strings.Join(colsToUpdate, ","))
        err := c.Vout.UpdateG(boil.Whitelist(colsToUpdate...))
        if err != nil {
            return errors.Err(err)
        }
    }
    if !c.Vout.ClaimID.IsZero() && !c.Vout.IsSpent {
        return c.alignClaim()
    } else if !c.Vout.ClaimID.IsZero() && c.Vout.Type.String == lbrycrd.NonStandard {
        scriptBytes, err := hex.DecodeString(c.Vout.ScriptPubKeyHex.String)
        if err != nil {
            return errors.Err(err)
        }
        if lbrycrd.IsClaimSupportScript(scriptBytes) {
            support := datastore.GetSupport(c.Tx.Hash, c.Vout.Vout)
            if support != nil {
                _, _, value, _, err := lbrycrd.ParseClaimSupportScript(scriptBytes)
                if err != nil {
                    return err
                }
                if len(value) > 0 {
                    s, err := stake.DecodeSupportBytes(value, global.BlockChainName)
                    if err != nil {
                        return err
                    }
                    support.SupportedByClaimID.SetValid(hex.EncodeToString(util2.ReverseBytes(s.ClaimID)))
                    err = support.UpdateG(boil.Whitelist(model.SupportColumns.SupportedByClaimID))
                    if err != nil {
                        return err
                    }
                }
            }
        }
    }
    return nil
}

func (c *chainSyncStatus) alignClaim() error {
    storedClaim := datastore.GetClaim(c.Vout.ClaimID.String)
    if storedClaim == nil {
        return errors.Err("could not find claim with id %s", c.Vout.ClaimID.String)
    }
    helper, err := stake.DecodeClaimHex(storedClaim.ValueAsHex, global.BlockChainName)
    if err != nil {
        return err
    }
    if helper == nil {
        return errors.Err("could not create help for claim %s from ValueAsHex", c.Vout.ClaimID)
    }
    original := *storedClaim
    colsToUpdate := make([]string, 0)
    err = processing.UpdateClaimData(helper, storedClaim)
    if err != nil {
        return err
    }

    if storedClaim.VoutUpdate.IsZero() {
        storedClaim.VoutUpdate.SetValid(c.Vout.Vout)
        colsToUpdate = append(colsToUpdate, model.ClaimColumns.VoutUpdate)
    }

    if storedClaim.TransactionHashUpdate.IsZero() {
        storedClaim.TransactionHashUpdate.SetValid(storedClaim.TransactionHashID.String)
        colsToUpdate = append(colsToUpdate, model.ClaimColumns.TransactionHashUpdate)
    }

    //Check for deltas here to update for
    if original.License.String != storedClaim.License.String {
        colsToUpdate = append(colsToUpdate, model.ClaimColumns.License)
    }

    //Update Claim
    if len(colsToUpdate) > 0 {
        logrus.Debugf("found unaligned claim @%s and Tx %s with the following columns out of alignment: %s", c.Vout.ClaimID.String, c.Tx.Hash, strings.Join(colsToUpdate, ","))
        err := storedClaim.UpdateG(boil.Whitelist(colsToUpdate...))
        if err != nil {
            return errors.Err(err)
        }
    }

    return nil
}

func (c *chainSyncStatus) alignVins(vins []lbrycrd.Vin) error {
    for i, vin := range vins {
        input := datastore.GetInput(c.Tx.Hash, false, vin.TxID, uint(vin.Vout))
        if input == nil && len(vin.Coinbase) > 0 {
            input = datastore.GetInput(c.Tx.Hash, true, vin.TxID, uint(vin.Vout))
        }
        if input == nil {
            err := processing.ProcessVin(&vin, c.Tx, nil, uint64(i))
            if err != nil {
                return err
            }
        } else {
            c.Vin = input
            err := c.alignVin(vin)
            if err != nil {
                c.recordError(c.LastHeight, "vin-alignment", err)
            }
        }
    }
    return nil
}

func (c *chainSyncStatus) alignVin(v lbrycrd.Vin) error {
    colsToUpdate := make([]string, 0)
    if c.Vin.Coinbase.String != v.Coinbase {
        c.Vin.Coinbase.String = v.Coinbase
        c.Vin.Coinbase.Valid = v.Coinbase != ""
        colsToUpdate = append(colsToUpdate, model.InputColumns.Coinbase)
    }
    if c.Vin.Witness.String != strings.Join(v.Witness, ",") {
        c.Vin.Witness.String = strings.Join(v.Witness, ",")
        c.Vin.Witness.Valid = strings.Join(v.Witness, ",") != ""
        colsToUpdate = append(colsToUpdate, model.InputColumns.Witness)
    }
    if v.ScriptSig != nil {
        if c.Vin.ScriptSigHex.String != v.ScriptSig.Hex {
            c.Vin.ScriptSigHex.String = v.ScriptSig.Hex
            c.Vin.ScriptSigHex.Valid = v.ScriptSig.Hex != ""
            colsToUpdate = append(colsToUpdate, model.InputColumns.ScriptSigHex)
        }
        if c.Vin.ScriptSigAsm.String != v.ScriptSig.Asm {
            c.Vin.ScriptSigAsm.String = v.ScriptSig.Asm
            c.Vin.ScriptSigAsm.Valid = v.ScriptSig.Asm != ""
            colsToUpdate = append(colsToUpdate, model.InputColumns.ScriptSigAsm)
        }
    } else {
        if c.Vin.ScriptSigHex.Valid {
            c.Vin.ScriptSigHex.Valid = false
            colsToUpdate = append(colsToUpdate, model.InputColumns.ScriptSigHex)
        }
        if c.Vin.ScriptSigAsm.Valid {
            c.Vin.ScriptSigAsm.Valid = false
            colsToUpdate = append(colsToUpdate, model.InputColumns.ScriptSigAsm)
        }
    }
    srcOutput := datastore.GetOutput(c.Vin.PrevoutHash.String, c.Vin.PrevoutN.Uint)
    if srcOutput != nil {
        if c.Vin.Value.Float64 != srcOutput.Value.Float64 {
            c.Vin.Value.SetValid(srcOutput.Value.Float64)
            colsToUpdate = append(colsToUpdate, model.InputColumns.Value)
        }
    }
    if len(colsToUpdate) > 0 {
        logrus.Debugf("found unaligned vin @%d and Tx %s with the following columns out of alignment: %s", c.LastHeight, c.Tx.Hash, strings.Join(colsToUpdate, ","))
        err := c.Vin.UpdateG(boil.Whitelist(colsToUpdate...))
        if err != nil {
            return errors.Err(err)
        }
    }
    return nil
}

func (c *chainSyncStatus) alignTx(l *lbrycrd.TxRawResult) error {
    colsToUpdate := make([]string, 0)
    if c.Tx.Version != int(l.Version) {
        c.Tx.Version = int(l.Version)
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.Version)
    }
    if c.Tx.TransactionTime.Uint64 != uint64(l.Time) {
        c.Tx.TransactionTime.Uint64 = uint64(l.Time)
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.TransactionTime)
    }
    if c.Tx.TransactionSize != uint64(l.Size) {
        c.Tx.TransactionSize = uint64(l.Size)
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.TransactionSize)
    }
    if c.Tx.LockTime != uint(l.LockTime) {
        c.Tx.LockTime = uint(l.LockTime)
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.LockTime)
    }
    if c.Tx.InputCount != uint(len(l.Vin)) {
        c.Tx.InputCount = uint(len(l.Vin))
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.InputCount)
    }
    if c.Tx.OutputCount != uint(len(l.Vout)) {
        c.Tx.OutputCount = uint(len(l.Vout))
        colsToUpdate = append(colsToUpdate, model.TransactionColumns.OutputCount)
    }
    if len(colsToUpdate) > 0 {
        logrus.Debugf("found unaligned tx @%d and hash %s with the following columns out of alignment: %s", c.LastHeight, c.Tx.Hash, strings.Join(colsToUpdate, ","))
        err := c.Tx.UpdateG(boil.Whitelist(colsToUpdate...))
        if err != nil {
            return errors.Err(err)
        }
    }
    return nil

}

func (c *chainSyncStatus) alignBlock(l *lbrycrd.GetBlockResponse) error {
    colsToUpdate := make([]string, 0)
    if c.Block.Hash != l.Hash {
        c.Block.Hash = l.Hash
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Hash)
    }
    if c.Block.BlockTime != uint64(l.Time) {
        c.Block.BlockTime = uint64(l.Time)
        colsToUpdate = append(colsToUpdate, model.BlockColumns.BlockTime)
    }
    if c.Block.Version != uint64(l.Version) {
        c.Block.Version = uint64(l.Version)
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Version)
    }
    if c.Block.Bits != l.Bits {
        c.Block.Bits = l.Bits
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Bits)
    }
    if c.Block.BlockSize != uint64(l.Size) {
        c.Block.BlockSize = uint64(l.Size)
        colsToUpdate = append(colsToUpdate, model.BlockColumns.BlockSize)
    }
    if c.Block.Chainwork != l.ChainWork {
        c.Block.Chainwork = l.ChainWork
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Chainwork)
    }
    lDifficulty, _ := strconv.ParseFloat(fmt.Sprintf("%.6f", l.Difficulty), 64)
    rDifficulty, _ := strconv.ParseFloat(fmt.Sprintf("%.6f", c.Block.Difficulty), 64)
    if rDifficulty != lDifficulty {
        c.Block.Difficulty = lDifficulty
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Difficulty)
    }
    if c.Block.MerkleRoot != l.MerkleRoot {
        c.Block.MerkleRoot = l.MerkleRoot
        colsToUpdate = append(colsToUpdate, model.BlockColumns.MerkleRoot)
    }
    if c.Block.NameClaimRoot != l.NameClaimRoot {
        c.Block.NameClaimRoot = l.NameClaimRoot
        colsToUpdate = append(colsToUpdate, model.BlockColumns.NameClaimRoot)
    }
    if c.Block.PreviousBlockHash.String != l.PreviousBlockHash {
        c.Block.PreviousBlockHash.SetValid(l.PreviousBlockHash)
        colsToUpdate = append(colsToUpdate, model.BlockColumns.PreviousBlockHash)
    }
    if c.Block.Nonce != l.Nonce {
        c.Block.Nonce = l.Nonce
        colsToUpdate = append(colsToUpdate, model.BlockColumns.Nonce)
    }
    if c.Block.VersionHex != l.VersionHex {
        c.Block.VersionHex = l.VersionHex
        colsToUpdate = append(colsToUpdate, model.BlockColumns.VersionHex)
    }
    if len(colsToUpdate) > 0 {
        logrus.Debugf("found unaligned block @%d with the following columns out of alignment: %s", c.LastHeight, strings.Join(colsToUpdate, ","))
        err := c.Block.UpdateG(boil.Whitelist(colsToUpdate...))
        if err != nil {
            return errors.Err(err)
        }
    }
    return nil
}

func (c *chainSyncStatus) recordAndReturnError(height int64, area string, err error) error {
    for _, e := range c.Errors {
        if area == e.Area && e.Error == err.Error() {
            e.HeightFound = append(e.HeightFound, height)
        }
    }
    c.Errors = append(c.Errors, syncError{
        HeightFound: []int64{height},
        Error:       err.Error(),
        Area:        area,
    })

    return err
}

func (c *chainSyncStatus) recordError(height int64, area string, err error) {
    _ = c.recordAndReturnError(height, area, err)
}

func (c *chainSyncStatus) updateMaxHeightStored() error {
    lastBlock, err := model.Blocks(qm.OrderBy(model.BlockColumns.Height+" DESC"), qm.Limit(1)).OneG()
    if err != nil {
        return err
    }
    if lastBlock != nil {
        if c.LastHeight >= int64(lastBlock.Height) {
            c.LastHeight = 0 //Reset
            c.Errors = make([]syncError, 0)
        } else {
            c.MaxHeightStored = int64(lastBlock.Height)
        }
    }
    return nil
}

func getChainSyncJobStatus() (*model.JobStatus, error) {
    jobStatus, err := model.FindJobStatusG(chainSyncJob)
    if errors.Is(sql.ErrNoRows, err) {
        syncState := chainSyncStatus{LastHeight: 0}
        bytes, err := json.Marshal(syncState)
        if err != nil {
            return nil, errors.Err(err)
        }
        jobStatus = &model.JobStatus{JobName: chainSyncJob, LastSync: time.Time{}, State: null.JSONFrom(bytes)}
        if err := jobStatus.InsertG(boil.Infer()); err != nil {
            logrus.Panic("Cannot Retrieve/Create JobStatus for " + chainSyncJob)
        }
    } else if err != nil {
        return nil, errors.Err(err)
    }

    err = json.Unmarshal(jobStatus.State.JSON, chainSync)
    if err != nil {
        return nil, errors.Err(err)
    }
    if chainSync.MaxHeightStored == 0 {
        return jobStatus, chainSync.updateMaxHeightStored()
    }

    return jobStatus, nil
}

func doneChainSyncJob(jobStatus *model.JobStatus) {
    jobStatus.LastSync = time.Now()
    jobStatus.IsSuccess = true
    bytes, err := json.Marshal(&chainSync)
    if err != nil {
        logrus.Error(err)
        return
    }
    jobStatus.State.SetValid(bytes)
    if err := jobStatus.UpdateG(boil.Infer()); err != nil {
        logrus.Panic(err)
    }
}