lbryio/chainquery

View on GitHub
daemon/jobs/claimtriesync.go

Summary

Maintainability
C
1 day
Test Coverage
package jobs

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "runtime"
    "strconv"
    "sync"
    "sync/atomic"
    "time"

    "github.com/lbryio/chainquery/datastore"
    "github.com/lbryio/chainquery/lbrycrd"
    "github.com/lbryio/chainquery/metrics"
    "github.com/lbryio/chainquery/model"

    "github.com/lbryio/lbry.go/v2/extras/errors"
    "github.com/lbryio/lbry.go/v2/extras/query"

    "github.com/sirupsen/logrus"
    "github.com/volatiletech/null/v8"
    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"
)

const claimTrieSyncJob = "claimtriesyncjob"
const debugClaimTrieSync = false

var expirationHardForkHeight uint = 400155    // https://github.com/lbryio/lbrycrd/pull/137
var hardForkBlocksToExpiration uint = 2102400 // https://github.com/lbryio/lbrycrd/pull/137
var blockHeight uint64
var blocksToExpiration uint = 262974 //Hardcoded! https://lbry.com/faq/claimtrie-implementation
// ClaimTrieSyncRunning is a variable used to show whether the job is running already.
var claimTrieSyncRunning = false

var lastSync *claimTrieSyncStatus

type claimTrieSyncStatus struct {
    JobStatus        *model.JobStatus `json:"-"`
    PreviousSyncTime time.Time        `json:"previous_sync"`
    LastHeight       int64            `json:"last_height"`
}

// ClaimTrieSyncAsync synchronizes claimtrie information that is calculated and enforced by lbrycrd.
func ClaimTrieSyncAsync() {
    if !claimTrieSyncRunning {
        claimTrieSyncRunning = true
        //Run in background so the application can shutdown properly.
        go ClaimTrieSync()
    }
}

// ClaimTrieSync syncs the claim trie bidstate, effective amount and effective height
func ClaimTrieSync() {
    metrics.JobLoad.WithLabelValues("claimtrie_sync").Inc()
    defer metrics.JobLoad.WithLabelValues("claimtrie_sync").Dec()
    defer metrics.Job(time.Now(), "claimtrie_sync")
    defer func() {
        claimTrieSyncRunning = false
    }()
    //defer util.TimeTrack(time.Now(), "ClaimTrieSync", "always")
    printDebug("ClaimTrieSync: started... ")
    if lastSync == nil {
        lastSync = &claimTrieSyncStatus{}
    }
    jobStatus, err := getClaimTrieSyncJobStatus()
    if err != nil {
        logrus.Error(err)
        return
    }
    isFirstClaimTrieSync := jobStatus.LastSync.IsZero()
    printDebug("ClaimTrieSync: updating spent claims")
    //For Updating claims that are spent ( no longer in claimtrie )
    if err := updateSpentClaims(); err != nil {
        logrus.Error("ClaimTrieSync:", err)
        saveJobError(jobStatus, err)
        return
    }

    started := time.Now()
    printDebug("ClaimTrieSync: getting block height")
    //Get blockheight for calculating expired status
    count, err := lbrycrd.GetBlockCount()
    if err != nil {
        logrus.Error("ClaimTrieSync: Error getting block height", err)
        return
    }
    blockHeight = *count

    lastSync.PreviousSyncTime = jobStatus.LastSync
    lastSync.LastHeight = int64(blockHeight)

    if isFirstClaimTrieSync {
        logrus.Infof("first claimtriesync run detected. Boosters equipped for faster processing!")
    }
    claimsChan := make(chan *model.Claim, 50000)
    success := false
    processedClaims := int64(0)
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func(claimsChan chan *model.Claim, currentHeight uint64, wg *sync.WaitGroup, success *bool) {
        defer wg.Done()
        err = reprocessUpdatedClaims(claimsChan, blockHeight, &processedClaims)
        if err != nil {
            logrus.Error("ClaimTrieSync:", err)
            saveJobError(jobStatus, err)
            *success = false
            return
        }
        *success = true
    }(claimsChan, blockHeight, wg, &success)

    printDebug("ClaimTrieSync: getting modified claims since " + jobStatus.LastSync.String())
    err = getModifiedClaims(jobStatus.LastSync, claimsChan)
    if err != nil {
        logrus.Error("ClaimTrieSync:", err)
        saveJobError(jobStatus, err)
        return
    }
    if !isFirstClaimTrieSync {
        printDebug("ClaimTrieSync: getting newly supported claims since " + jobStatus.LastSync.String())
        err = getSupportedClaims(jobStatus.LastSync, claimsChan)
        if err != nil {
            logrus.Error("ClaimTrieSync:", err)
            saveJobError(jobStatus, err)
            return
        }
        printDebug("ClaimTrieSync: getting new valid claims up to block height " + strconv.Itoa(int(lastSync.LastHeight)))
        err = getNewValidClaims(uint(lastSync.LastHeight), claimsChan)
        if err != nil {
            logrus.Error("ClaimTrieSync:", err)
            saveJobError(jobStatus, err)
            return
        }
    }
    close(claimsChan)
    logrus.Infof("ClaimTrieSync: finished getting claims to reprocess. Now waiting on consumer")
    wg.Wait()
    if success {
        jobStatus.LastSync = started
        jobStatus.IsSuccess = true
        jobStatus.ErrorMessage.Valid = false
        bytes, err := json.Marshal(&lastSync)
        if err != nil {
            logrus.Error(err)
            return
        }
        jobStatus.State.SetValid(bytes)
        if err := jobStatus.UpdateG(boil.Infer()); err != nil {
            logrus.Panic(err)
        }
        printDebug("ClaimTrieSync: Processed " + strconv.Itoa(int(atomic.LoadInt64(&processedClaims))) + " claims.")
    }
}

func reprocessUpdatedClaims(claimsChan chan *model.Claim, currentHeight uint64, processedClaims *int64) error {
    const BatchSize = 5000
    reprocessedNamesMap := make(map[string]bool, 500000)
    claimsBatch := make(model.ClaimSlice, 0, BatchSize)
    for {
        select {
        case c, hasMore := <-claimsChan:
            if hasMore && !reprocessedNamesMap[c.Name] {
                claimsBatch = append(claimsBatch, c)
                reprocessedNamesMap[c.Name] = true
            }
            if len(claimsBatch) > BatchSize || !hasMore {
                printDebug("ClaimTrieSync: Claims to update " + strconv.Itoa(len(claimsBatch)))
                //For syncing the claims
                err := SyncClaims(claimsBatch)
                if err != nil {
                    return err
                }

                //For Setting Controlling Claims
                err = SetControllingClaimForNames(claimsBatch, currentHeight)
                if err != nil {
                    return err
                }
                atomic.AddInt64(processedClaims, int64(len(claimsBatch)))
                claimsBatch = make(model.ClaimSlice, 0, BatchSize)
            }
            if !hasMore {
                return nil
            }
        }
    }
}

func initSyncWorkers(nrWorkers int, jobs <-chan lbrycrd.Claim, wg *sync.WaitGroup) {

    for i := 0; i < nrWorkers; i++ {
        wg.Add(1)
        go syncProcessor(jobs, wg)
    }
}

func initControllingWorkers(nrWorkers int, jobs <-chan string, wg *sync.WaitGroup, atHeight uint64) {

    for i := 0; i < nrWorkers; i++ {
        wg.Add(1)
        go controllingProcessor(jobs, wg, atHeight)
    }
}

func syncProcessor(jobs <-chan lbrycrd.Claim, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        syncClaim(&job)
    }
}

func controllingProcessor(names <-chan string, wg *sync.WaitGroup, atHeight uint64) {
    defer wg.Done()
    for name := range names {
        setBidStateOfClaimsForName(name, atHeight)
    }
}

// SetControllingClaimForNames sets the bid state for claims with these names.
func SetControllingClaimForNames(claims model.ClaimSlice, atHeight uint64) error {
    printDebug("ClaimTrieSync: controlling claim status update started... ")
    controlWg := sync.WaitGroup{}
    names := make(map[string]string)
    printDebug("ClaimTrieSync: Making name map...")
    for _, claim := range claims {
        names[claim.Name] = claim.Name
    }
    printDebug("ClaimTrieSync: Finished making name map...[", len(names), "]")
    setControllingQueue := make(chan string, 1000)
    initControllingWorkers(runtime.NumCPU()-1, setControllingQueue, &controlWg, atHeight)
    for _, name := range names {
        setControllingQueue <- name
    }
    close(setControllingQueue)
    controlWg.Wait()
    printDebug("ClaimTrieSync: controlling claim status update complete... ")

    return nil
}

func setBidStateOfClaimsForName(name string, atHeight uint64) {
    claims, _ := model.Claims(
        qm.Where(model.ClaimColumns.Name+"=?", name),
        qm.Where(model.ClaimColumns.ValidAtHeight+"<=?", atHeight),
        qm.OrderBy(model.ClaimColumns.EffectiveAmount+" DESC")).AllG()
    printDebug("ClaimTrieSync: found ", len(claims), " claims matching the name ", name)
    foundControlling := false
    for _, claim := range claims {
        if !foundControlling && getClaimStatus(claim, atHeight) == "Active" {
            if claim.BidState != "Controlling" {
                claim.BidState = "Controlling"
                err := datastore.PutClaim(claim)
                if err != nil {
                    panic(err)
                }
            }
            foundControlling = true
        } else {
            status := getClaimStatus(claim, atHeight)
            if status != claim.BidState {
                claim.BidState = status
                err := datastore.PutClaim(claim)
                if err != nil {
                    panic(err)
                }
            }
        }
    }
}

// SyncClaims syncs the claims' with these names effective amount and valid at height with the lbrycrd claimtrie.
func SyncClaims(claims model.ClaimSlice) error {
    printDebug("ClaimTrieSync: claim update started... ")
    claimNameMap := make(map[string]bool)
    for _, claim := range claims {
        claimNameMap[claim.Name] = true
    }
    var names []string
    for name := range claimNameMap {
        names = append(names, name)
    }
    printDebug("ClaimTrieSync: ", len(names), " names to sync from lbrycrd...")
    syncwg := sync.WaitGroup{}
    processingQueue := make(chan lbrycrd.Claim, 1000)
    initSyncWorkers(runtime.NumCPU()-1, processingQueue, &syncwg)
    for i, name := range names {
        if i%1000 == 0 {
            printDebug("ClaimTrieSync: syncing ", i, " of ", len(names), " queued - queue size: ", len(processingQueue))
        }
        claims, err := lbrycrd.GetClaimsForName(name)
        if err != nil {
            printDebug("ClaimTrieSync: Could not get claims for name: ", name, " Error: ", err)
        }
        for _, claimJSON := range claims.Claims {
            processingQueue <- claimJSON
        }
    }
    close(processingQueue)
    syncwg.Wait()

    printDebug("ClaimTrieSync: claim update complete... ")

    return nil
}

func syncClaim(claimJSON *lbrycrd.Claim) {
    hasChanges := false
    c := model.ClaimColumns
    claim, err := model.Claims(qm.Select(c.ID, c.ValidAtHeight, c.EffectiveAmount), model.ClaimWhere.ClaimID.EQ(claimJSON.ClaimID)).OneG()
    if err == sql.ErrNoRows {
        unknown, _ := model.AbnormalClaims(qm.Where(model.AbnormalClaimColumns.ClaimID+"=?", claimJSON.ClaimID)).OneG()
        if unknown == nil {
            printDebug("ClaimTrieSync: Missing Claim ", claimJSON.ClaimID, " ", claimJSON.TxID, " ", claimJSON.N)
        }
        return
    }
    if err != nil {
        logrus.Error("ClaimTrieSync: ", err)
        return
    }
    if claim.ValidAtHeight != uint(claimJSON.ValidAtHeight) {
        claim.ValidAtHeight = uint(claimJSON.ValidAtHeight)
        hasChanges = true
    }
    if claim.EffectiveAmount != claimJSON.EffectiveAmount {
        if claimJSON.PendingAmount != 0 && claim.EffectiveAmount != claimJSON.PendingAmount {
            claim.EffectiveAmount = claimJSON.PendingAmount
        } else {
            claim.EffectiveAmount = claimJSON.EffectiveAmount
        }
        hasChanges = true
    }
    if hasChanges {
        err := claim.UpdateG(boil.Whitelist(c.ValidAtHeight, c.EffectiveAmount))
        if err != nil {
            logrus.Error("ClaimTrieSync: unable to sync claim ", claim.ClaimID, ". JSON-", claimJSON)
            printDebug("Error: ", err)
        }
    }
}

func getClaimStatus(claim *model.Claim, atHeight uint64) string {
    status := "Accepted"
    var transaction *model.Transaction
    var err error
    t := model.TransactionColumns
    if !claim.TransactionHashUpdate.IsZero() {
        transaction, err = model.Transactions(qm.Select(t.ID), model.TransactionWhere.Hash.EQ(claim.TransactionHashUpdate.String)).OneG()
    } else { //Transaction and output should never be missing if the claim exists.
        transaction, err = claim.TransactionHash(qm.Select(t.ID)).OneG()
    }

    if err != nil {
        logrus.Errorf("could not find transaction %s for claim id %d at height %d: %s", claim.TransactionHashID.String, claim.ID, atHeight, err.Error())
        return status
    }
    o := model.OutputColumns
    output, err := transaction.Outputs(qm.Select(o.ID, o.IsSpent), qm.Where(model.OutputColumns.Vout+"=?", claim.VoutUpdate)).OneG()
    if err != nil {
        logrus.Errorf("could not find output %s - %d: %s", claim.TransactionHashID.String, claim.Vout, err)
        return "ERROR"
    }

    if output.IsSpent {
        status = "Spent"
    }
    height := claim.Height
    if GetIsExpiredAtHeight(height, uint(atHeight)) {
        status = "Expired"
    }

    //Neither Spent or Expired = Active
    if status == "Accepted" {
        status = "Active"
    }

    return status
}

//GetIsExpiredAtHeight checks the claim height compared to the current height to determine expiration.
func GetIsExpiredAtHeight(height, blockHeight uint) bool {
    if height == 0 {
        return false
    }
    if height >= expirationHardForkHeight {
        // https://github.com/lbryio/lbrycrd/pull/137 - HardFork extends claim expiration.
        if height+hardForkBlocksToExpiration < blockHeight {
            return true
        }
    } else if height+blocksToExpiration >= expirationHardForkHeight {
        // https://github.com/lbryio/lbrycrd/pull/137 - HardFork extends claim expiration.
        if height+hardForkBlocksToExpiration < blockHeight {
            return true
        }
    } else {
        if height+blocksToExpiration < blockHeight {
            return true
        }
    }
    return false
}
func getSupportedClaims(since time.Time, claimsChan chan *model.Claim) error {
    // CLAIMS THAT HAVE SUPPORTS THAT WERE MODIFIED [SELECT DISTINCT support.supported_claim_id FROM support WHERE support.modified_at >= '2019-11-03 19:48:58';]
    s := model.SupportColumns
    supports, err := model.Supports(qm.Select("DISTINCT "+s.SupportedClaimID), model.SupportWhere.ModifiedAt.GTE(since)).AllG()
    if err != nil {
        return errors.Err(err)
    }
    var claimIds []interface{}
    for _, support := range supports {
        claimIds = append(claimIds, support.SupportedClaimID)
    }
    c := model.ClaimColumns

    batch := 15000
    for i := 0; i < len(claimIds); i += batch {
        j := i + batch
        if j > len(claimIds) {
            j = len(claimIds)
        }
        claims, err := model.Claims(qm.Select("DISTINCT "+c.Name), qm.WhereIn(c.ClaimID+" IN ?", claimIds[i:j]...)).AllG()
        if err != nil {
            return errors.Err(err)
        }
        logrus.Debugf("sending %d claim for reprocessing - batch %d-%d/%d", len(claims), i, j, len(claimIds))
        for _, c := range claims {
            claimsChan <- c
        }
        logrus.Debugf("%d claimIds left to process", len(claimIds)-i)
    }
    return nil
}
func getModifiedClaims(since time.Time, claimsChan chan *model.Claim) error {
    // CLAIMS THAT WERE MODIFIED [SELECT DISTINCT claim.name FROM claim WHERE claim.modified_at >= '2019-11-03 19:48:58';]
    c := model.ClaimColumns
    prevID := -1
    clauses := make([]qm.QueryMod, 0)
    if !since.IsZero() {
        clauses = append(clauses, model.ClaimWhere.ModifiedAt.GTE(since))
    }
    clauses = append(clauses, qm.Select(c.ID, c.Name), qm.Limit(15000))
    for {
        finalClauses := append(clauses, qm.Where(c.ID+">?", prevID))
        claims, err := model.Claims(finalClauses...).AllG()
        if err != nil {
            return errors.Err(err)
        }
        oldPrevID := prevID
        logrus.Debugf("[getModifiedClaims] sending claim %d claims for reprocessing - claim id batch: %d", len(claims), prevID)
        for _, c := range claims {
            claimsChan <- c
            prevID = int(c.ID)
        }
        if oldPrevID == prevID {
            break
        }
    }
    return nil
}
func getNewValidClaims(lastHeight uint, claimsChan chan *model.Claim) error {
    // CLAIMS THAT BECAME VALID SINCE [SELECT DISTINCT claim.name FROM claim WHERE claim.valid_at_height >= 852512;]
    c := model.ClaimColumns
    prevID := -1
    for {
        claims, err := model.Claims(qm.Select(c.ID, c.Name), qm.Where(c.ID+">?", prevID), model.ClaimWhere.ValidAtHeight.GTE(lastHeight), qm.Limit(15000)).AllG()
        if err != nil {
            return errors.Err(err)
        }
        oldPrevID := prevID
        logrus.Debugf("[getNewValidClaims] sending claim %d claims for reprocessing - claim id batch: %d", len(claims), prevID)
        for _, c := range claims {
            claimsChan <- c
            prevID = int(c.ID)
        }
        if oldPrevID == prevID {
            break
        }
    }
    return nil
}

func getSpentClaimsToUpdate(hasUpdate bool, lastProcessed uint64) (model.ClaimSlice, uint64, error) {
    w := model.OutputWhere
    o := model.OutputColumns
    outputMods := []qm.QueryMod{
        qm.Select(o.ID, o.IsSpent, o.TransactionHash),
        w.ModifiedAt.GTE(lastSync.PreviousSyncTime),
        w.IsSpent.EQ(true),
        w.ID.GT(lastProcessed),
        qm.Limit(15000),
    }
    var outputs model.OutputSlice
    var err error
    outputs, err = model.Outputs(outputMods...).AllG()
    if err != nil {
        return nil, 0, errors.Err(err)
    }
    if len(outputs) == 0 {
        return nil, lastProcessed, nil
    }

    var txHashList []interface{}
    for _, o := range outputs {
        txHashList = append(txHashList, o.TransactionHash)
    }
    txHashCol := model.ClaimColumns.TransactionHashID
    if hasUpdate {
        txHashCol = model.ClaimColumns.TransactionHashUpdate
    }
    c := model.ClaimColumns
    claims, err := model.Claims(qm.Select(c.ID, c.ClaimID, txHashCol), qm.WhereIn(txHashCol+" IN ?", txHashList...)).AllG()
    if err != nil {
        return nil, 0, errors.Err(err)
    }
    if len(claims) > 0 {
        logrus.Debugf("found %d outputs, %d claims - last claim id: %d", len(outputs), len(claims), claims[len(claims)-1].ID)
    }
    lastProcessed = outputs[len(outputs)-1].ID

    return claims, lastProcessed, nil
}

func updateSpentClaims() error {
    var lastProcessed uint64
    for {
        //Claims without updates
        claims, newLastProcessed, err := getSpentClaimsToUpdate(false, lastProcessed)
        if err != nil {
            return err
        }
        for _, claim := range claims {
            if !claim.TransactionHashUpdate.IsZero() {
                continue
            }
            claim.BidState = "Spent"
            claim.ModifiedAt = time.Now()
            if err := claim.UpdateG(boil.Whitelist(model.ClaimColumns.BidState, model.ClaimColumns.ModifiedAt)); err != nil {
                return err
            }
        }
        if lastProcessed == newLastProcessed {
            break
        }
        lastProcessed = newLastProcessed
    }
    lastProcessed = 0
    for {
        //Claims without updates
        claims, newLastProcessed, err := getSpentClaimsToUpdate(true, lastProcessed)
        if err != nil {
            return err
        }
        for _, claim := range claims {
            claim.BidState = "Spent"
            claim.ModifiedAt = time.Now()
        }

        batch := 15000
        for i := 0; i < len(claims); i += batch {
            j := i + batch
            if j > len(claims) {
                j = len(claims)
            }
            args := []interface{}{time.Now()}
            for _, c := range claims[i:j] {
                args = append(args, c.ID)
            }
            updateQuery := fmt.Sprintf(`UPDATE claim use index(id) SET bid_state="Spent", modified_at = ? WHERE id IN (%s)`, query.Qs(len(claims[i:j])))
            if _, err := boil.GetDB().Exec(updateQuery, args...); err != nil {
                return err
            }
            logrus.Debugf("%d claims left to update", len(claims)-i)
        }

        if lastProcessed == newLastProcessed {
            break
        }
        lastProcessed = newLastProcessed
    }
    return nil
}

func getClaimTrieSyncJobStatus() (*model.JobStatus, error) {
    jobStatus, err := model.FindJobStatusG(claimTrieSyncJob)
    if errors.Is(sql.ErrNoRows, err) {
        syncState := claimTrieSyncStatus{PreviousSyncTime: time.Unix(458265600, 0), LastHeight: 0}
        bytes, err := json.Marshal(syncState)
        if err != nil {
            return nil, errors.Err(err)
        }
        jobStatus = &model.JobStatus{JobName: claimTrieSyncJob, LastSync: time.Time{}, State: null.JSONFrom(bytes)}
        if err := jobStatus.InsertG(boil.Infer()); err != nil {
            logrus.Panic("Cannot Retrieve/Create JobStatus for " + claimTrieSyncJob)
        }
    } else if err != nil {
        return nil, errors.Err(err)
    }

    err = json.Unmarshal(jobStatus.State.JSON, lastSync)
    if err != nil {
        return nil, errors.Err(err)
    }

    return jobStatus, nil
}

func saveJobError(jobStatus *model.JobStatus, error error) {
    jobStatus.ErrorMessage.SetValid(error.Error())
    jobStatus.IsSuccess = false
    if err := jobStatus.UpsertG(boil.Infer(), boil.Infer()); err != nil {
        logrus.Error(errors.Prefix("Saving Job Error Message "+error.Error(), err))
    }
}

func printDebug(args ...interface{}) {
    if debugClaimTrieSync {
        logrus.Info(args...)
    } else {
        logrus.Debug(args...)
    }
}