lbryio/chainquery

View on GitHub
daemon/processing/transaction.go

Summary

Maintainability
A
2 hrs
Test Coverage
package processing

import (
    "strconv"
    "sync"
    "time"

    "github.com/lbryio/chainquery/sockety"
    "github.com/lbryio/sockety/socketyapi"

    "github.com/sirupsen/logrus"

    "github.com/lbryio/chainquery/datastore"
    "github.com/lbryio/chainquery/lbrycrd"
    "github.com/lbryio/chainquery/metrics"
    "github.com/lbryio/chainquery/model"
    "github.com/lbryio/chainquery/util"

    "github.com/lbryio/lbry.go/v2/extras/errors"
    "github.com/lbryio/lbry.go/v2/extras/stop"

    "github.com/volatiletech/sqlboiler/v4/boil"
    "github.com/volatiletech/sqlboiler/v4/queries/qm"
)

//MaxParallelTxProcessing sets the maximum concurrent transactions to process in a block.
var MaxParallelTxProcessing int

type txToProcess struct {
    tx          *lbrycrd.TxRawResult
    blockTime   uint64
    blockHeight uint64
    failcount   int
}

type txProcessResult struct {
    tx          *lbrycrd.TxRawResult
    blockTime   uint64
    blockHeight uint64
    err         error
    failcount   int
}

func initTxWorkers(s *stop.Group, nrWorkers int, jobs <-chan txToProcess, results chan<- txProcessResult) {
    for i := 0; i < nrWorkers; i++ {
        s.Add(1)
        go func(worker int) {
            defer s.Done()
            txProcessor(s, jobs, results, worker)
            q(strconv.Itoa(worker) + " - WORKER TX - Finished all jobs")
        }(i)
    }
}

func txProcessor(s *stop.Group, jobs <-chan txToProcess, results chan<- txProcessResult, worker int) {
    for {
        select {
        case <-s.Ch():
            return
        case job := <-jobs:
            q(strconv.Itoa(worker) + " - WORKER TX - Start new job " + job.tx.Txid)
            err := ProcessTx(job.tx, job.blockTime, job.blockHeight)
            if err != nil {
                metrics.ProcessingFailures.WithLabelValues("transaction").Inc()
                logrus.Debugf("processing tx failed %d times %s: %s", job.failcount+1, job.tx.Txid, err.Error())
            } else if job.failcount > 0 {
                logrus.Debugf("processing tx success after %d times %s", job.failcount, job.tx.Txid)
            }
            result := txProcessResult{
                tx:          job.tx,
                blockTime:   job.blockTime,
                blockHeight: job.blockHeight,
                err:         err,
                failcount:   job.failcount + 1}
            q(strconv.Itoa(worker) + " - WORKER TX - Finished new job " + job.tx.Txid)
            select {
            case <-s.Ch():
                q(strconv.Itoa(worker) + " - WORKER TX - discard finished job and stop " + job.tx.Txid)
                return
            default:
                q(strconv.Itoa(worker) + " - WORKER TX - Start sending result of job " + job.tx.Txid)
                results <- result
                q(strconv.Itoa(worker) + " - WORKER TX - End sending result of job " + job.tx.Txid)
            }
        }
    }
}

type txDebitCredits struct {
    addrDCMap map[string]*addrDebitCredits
    mutex     *sync.RWMutex
}

func newTxDebitCredits() *txDebitCredits {
    t := txDebitCredits{}
    v := make(map[string]*addrDebitCredits)
    t.addrDCMap = v
    t.mutex = &sync.RWMutex{}

    return &t

}

type addrDebitCredits struct {
    debits  float64
    credits float64
}

func (addDC *addrDebitCredits) Debits() float64 {
    return addDC.debits
}

func (addDC *addrDebitCredits) Credits() float64 {
    return addDC.credits
}

func (txDC *txDebitCredits) subtract(address string, value float64) {
    txDC.mutex.Lock()
    if txDC.addrDCMap[address] == nil {
        addrDC := addrDebitCredits{}
        txDC.addrDCMap[address] = &addrDC
    }
    txDC.addrDCMap[address].debits = txDC.addrDCMap[address].debits + value
    txDC.mutex.Unlock()
}

func (txDC *txDebitCredits) add(address string, value float64) {
    txDC.mutex.Lock()
    if txDC.addrDCMap[address] == nil {
        addrDC := addrDebitCredits{}
        txDC.addrDCMap[address] = &addrDC
    }
    txDC.addrDCMap[address].credits = txDC.addrDCMap[address].credits + value
    txDC.mutex.Unlock()
}

// ProcessTx processes an individual transaction from a block.
func ProcessTx(jsonTx *lbrycrd.TxRawResult, blockTime uint64, blockHeight uint64) error {
    defer metrics.Processing(time.Now(), "transaction")
    defer util.TimeTrack(time.Now(), "processTx "+jsonTx.Txid+" -- ", "daemonprofile")

    //Save transaction before the id is used anywhere else otherwise it will be 0
    transaction, err := saveUpdateTransaction(jsonTx)
    if err != nil {
        return err
    }

    txDbCrAddrMap := newTxDebitCredits()

    _, err = createUpdateVoutAddresses(transaction, &jsonTx.Vout, blockTime)
    if err != nil {
        return errors.Prefix("Vout Address Creation Error", err)
    }
    _, err = createUpdateVinAddresses(transaction, &jsonTx.Vin, blockTime)
    if err != nil {
        return errors.Prefix("Vin Address Creation Error", err)
    }

    // Process the inputs of the tranasction
    err = saveUpdateInputs(transaction, jsonTx, txDbCrAddrMap)
    if err != nil {
        return err
    }

    // Process the outputs of the transaction
    err = saveUpdateOutputs(transaction, jsonTx, txDbCrAddrMap, blockHeight)
    if err != nil {
        return err
    }
    //Set the send and receive values for the transaction
    err = setSendReceive(transaction, txDbCrAddrMap)
    if err != nil {
        return err
    }

    go sockety.SendNotification(socketyapi.SendNotificationArgs{
        Service: socketyapi.BlockChain,
        Type:    "new_tx",
        IDs:     []string{"transactions", jsonTx.Txid},
        Data:    map[string]interface{}{"transaction": jsonTx},
    })

    return nil
}

func saveUpdateTransaction(jsonTx *lbrycrd.TxRawResult) (*model.Transaction, error) {
    transaction := &model.Transaction{}
    // Error is not helpful. It returns an error if there is nothing in the database.
    foundTx, _ := model.Transactions(qm.Where(model.TransactionColumns.Hash+"=?", jsonTx.Txid)).OneG()
    if foundTx != nil {
        transaction = foundTx
    }
    transaction.BlockHashID.SetValid(jsonTx.BlockHash)
    transaction.InputCount = uint(len(jsonTx.Vin))
    transaction.OutputCount = uint(len(jsonTx.Vout))
    transaction.TransactionTime.SetValid(uint64(jsonTx.Time))
    transaction.TransactionSize = uint64(jsonTx.Size)
    transaction.Hash = jsonTx.Txid
    transaction.Version = int(jsonTx.Version)
    transaction.LockTime = uint(jsonTx.LockTime)
    transaction.CreatedTime = time.Unix(jsonTx.Blocktime, 0)
    transactionAmount := 0.0
    for _, vout := range jsonTx.Vout {
        transactionAmount += vout.Value
    }
    transaction.Value = transactionAmount

    if foundTx != nil {
        if err := transaction.UpdateG(boil.Infer()); err != nil {
            return transaction, err
        }
    } else {
        if err := transaction.InsertG(boil.Infer()); err != nil {
            return nil, err
        }
    }

    return transaction, nil
}

func saveUpdateInputs(transaction *model.Transaction, jsonTx *lbrycrd.TxRawResult, txDbCrAddrMap *txDebitCredits) error {
    vins := jsonTx.Vin
    vinjobs := make(chan vinToProcess)
    errorchan := make(chan error)
    workers := util.Min(len(vins), MaxParallelVinProcessing)
    sQ := stop.New(nil)
    initVinWorkers(sQ, workers, vinjobs, errorchan)
    // Queue
    q("VIN SYNC started")
    sQ.Add(1)
    go func() {
        defer sQ.Done()
        //q("VIN start queueing")
        for i := range vins {
            select {
            case <-sQ.Ch():
                return
            default:
                //q("VIN start passing new job")
                vinjobs <- vinToProcess{jsonVin: &vins[i], tx: transaction, txDC: txDbCrAddrMap, vin: uint64(i)}
                //q("VIN end pass new job")
            }
        }
        //q("VIN end queueing")
        close(vinjobs)
    }()

    //Error check
    leftToProcess := len(vins)
    for err := range errorchan {
        leftToProcess--
        if err != nil {
            q("VIN error..stopping")
            sQ.StopAndWait()
            q("VIN error..stopped")
            return errors.Prefix("Vin Error->", err)
        }
        q("VIN processing..." + strconv.Itoa(leftToProcess))
        if leftToProcess == 0 {
            q("VIN stopping...")
            sQ.StopAndWait()
            q("VIN stopped...")
            q("VIN returning")
            return nil
        }
        continue
    }
    q("VIN SYNC ended")
    return nil
}

func saveUpdateOutputs(transaction *model.Transaction, jsonTx *lbrycrd.TxRawResult, txDbCrAddrMap *txDebitCredits, blockHeight uint64) error {
    vouts := jsonTx.Vout
    workers := util.Min(len(vouts), MaxParallelVoutProcessing)
    voutjobs := make(chan voutToProcess)
    errorchan := make(chan error)
    sQ := stop.New(nil)
    initVoutWorkers(sQ, workers, voutjobs, errorchan)
    // Queue
    q("VOUT SYNC started")
    sQ.Add(1)
    go func() {
        defer sQ.Done()
        q("VOUT start queueing")
        for i := range vouts {
            select {
            case <-sQ.Ch():
                return
            default:
                q("VOUT start passing new job")
                voutjobs <- voutToProcess{jsonVout: &vouts[i], tx: transaction, txDC: txDbCrAddrMap, blockHeight: blockHeight}
                q("VOUT end pass new job")
            }
        }
        q("VOUT SYNC finished")
        close(voutjobs)
    }()

    //Error check
    leftToProcess := len(vouts)
    var voutErr error
    for err := range errorchan {
        leftToProcess--
        if err != nil {
            q("VOUT error found...")
            if voutErr == nil {
                voutErr = errors.Prefix("Vout Error->", err)
            }
        }
        if leftToProcess == 0 {
            q("VOUT stopping...")
            sQ.StopAndWait()
            q("VOUT stopped")
            q("VOUT returning")
            return voutErr
        }
        continue
    }
    q("VOUT SYNC ended")
    return voutErr
}

func setSendReceive(transaction *model.Transaction, txDbCrAddrMap *txDebitCredits) error {
    for addr, DC := range txDbCrAddrMap.addrDCMap {

        address := datastore.GetAddress(addr)
        if address == nil {
            return errors.Err("missing address for setting amounts!")
        }

        txAddr := datastore.GetTxAddress(transaction.ID, address.ID)

        txAddr.CreditAmount = DC.Credits()
        txAddr.DebitAmount = DC.Debits()
        err := datastore.UpdateTxAddressAmounts(txAddr)
        if err != nil {
            return err //Should never happen or something is wrong
        }
    }
    return nil
}