mesg-foundation/core

View on GitHub
cosmos/rpc.go

Summary

Maintainability
A
30 mins
Test Coverage
package cosmos

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"

    "github.com/cosmos/cosmos-sdk/client/flags"
    "github.com/cosmos/cosmos-sdk/codec"
    "github.com/cosmos/cosmos-sdk/crypto/keys"
    sdktypes "github.com/cosmos/cosmos-sdk/types"
    "github.com/cosmos/cosmos-sdk/x/auth"
    authutils "github.com/cosmos/cosmos-sdk/x/auth/client/utils"
    authExported "github.com/cosmos/cosmos-sdk/x/auth/exported"
    authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
    "github.com/mesg-foundation/engine/ext/xreflect"
    abci "github.com/tendermint/tendermint/abci/types"
    rpcclient "github.com/tendermint/tendermint/rpc/client"
    tenderminttypes "github.com/tendermint/tendermint/types"
)

// RPC is a tendermint rpc client with helper functions.
type RPC struct {
    rpcclient.Client
    cdc         *codec.Codec
    kb          keys.Keybase
    chainID     string
    accName     string
    accPassword string
    gasPrices   sdktypes.DecCoins

    // Local state
    acc            authExported.Account
    accountMutex   sync.Mutex
    broadcastMutex sync.Mutex
}

// NewRPC returns a rpc tendermint client.
func NewRPC(client rpcclient.Client, cdc *codec.Codec, kb keys.Keybase, chainID, accName, accPassword, gasPrices string) (*RPC, error) {
    gasPricesDecoded, err := sdktypes.ParseDecCoins(gasPrices)
    if err != nil {
        return nil, err
    }
    return &RPC{
        Client:      client,
        cdc:         cdc,
        kb:          kb,
        chainID:     chainID,
        accName:     accName,
        accPassword: accPassword,
        gasPrices:   gasPricesDecoded,
    }, nil
}

// Codec returns the codec used by RPC.
func (c *RPC) Codec() *codec.Codec {
    return c.cdc
}

// QueryJSON is abci.query wrapper with errors check and decode data.
func (c *RPC) QueryJSON(path string, qdata, ptr interface{}) error {
    var data []byte
    if !xreflect.IsNil(qdata) {
        b, err := c.cdc.MarshalJSON(qdata)
        if err != nil {
            return err
        }
        data = b
    }

    result, _, err := c.QueryWithData(path, data)
    if err != nil {
        return err
    }
    return c.cdc.UnmarshalJSON(result, ptr)
}

// QueryWithData performs a query to a Tendermint node with the provided path
// and a data payload. It returns the result and height of the query upon success
// or an error if the query fails.
func (c *RPC) QueryWithData(path string, data []byte) ([]byte, int64, error) {
    result, err := c.ABCIQuery(path, data)
    if err != nil {
        return nil, 0, err
    }
    resp := result.Response
    if !resp.IsOK() {
        return nil, resp.Height, errors.New(resp.Log)
    }
    return resp.Value, resp.Height, nil
}

// BuildAndBroadcastMsg builds and signs message and broadcast it to node.
func (c *RPC) BuildAndBroadcastMsg(msg sdktypes.Msg) (*abci.ResponseDeliverTx, error) {
    signedTx, err := c.buildAndBroadcastMsgNoResult(msg)
    if err != nil {
        return nil, err
    }
    return c.waitForTxResult(signedTx)
}

func (c *RPC) buildAndBroadcastMsgNoResult(msg sdktypes.Msg) (tenderminttypes.Tx, error) {
    // Lock the getAccount + create and sign tx + broadcast
    c.broadcastMutex.Lock()
    defer c.broadcastMutex.Unlock()

    acc, err := c.GetAccount()
    if err != nil {
        return nil, err
    }

    // create and sign the tx
    signedTx, err := c.createAndSignTx([]sdktypes.Msg{msg}, acc)
    if err != nil {
        return nil, err
    }
    txres, err := c.BroadcastTxSync(signedTx)
    if err != nil {
        return nil, err
    }
    if txres.Code != abci.CodeTypeOK {
        return nil, fmt.Errorf("transaction returned with invalid code %d: %s", txres.Code, txres.Log)
    }

    // only increase sequence if no error during broadcast of tx
    if err := c.setAccountSequence(acc.GetSequence() + 1); err != nil {
        return nil, err
    }

    return signedTx, nil
}

func (c *RPC) waitForTxResult(signedTx tenderminttypes.Tx) (*abci.ResponseDeliverTx, error) {
    // TODO: 20*time.Second should not be hardcoded here
    ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
    defer cancel()

    subscriber := "engine"
    query := tenderminttypes.EventQueryTxFor(signedTx).String()
    out, err := c.Subscribe(ctx, subscriber, query)
    if err != nil {
        return nil, err
    }
    defer c.Unsubscribe(ctx, subscriber, query)

    select {
    case result := <-out:
        data, ok := result.Data.(tenderminttypes.EventDataTx)
        if !ok {
            return nil, errors.New("result data is not the right type")
        }
        if data.TxResult.Result.IsErr() {
            return nil, fmt.Errorf("an error occurred in transaction: %s", data.TxResult.Result.Log)
        }
        return &data.TxResult.Result, nil
    case <-ctx.Done():
        return nil, fmt.Errorf("reach timeout for listening for transaction result: %w", ctx.Err())
    }
}

// GetAccount returns the local account.
func (c *RPC) GetAccount() (authExported.Account, error) {
    c.accountMutex.Lock()
    defer c.accountMutex.Unlock()
    if c.acc == nil {
        accKb, err := c.kb.Get(c.accName)
        if err != nil {
            return nil, err
        }
        c.acc = auth.NewBaseAccount(
            accKb.GetAddress(),
            nil,
            accKb.GetPubKey(),
            0,
            0,
        )
    }
    accR, err := auth.NewAccountRetriever(c).GetAccount(c.acc.GetAddress())
    if err != nil {
        return nil, err
    }
    // replace seq if sup
    if c.acc.GetSequence() > accR.GetSequence() {
        accR.SetSequence(c.acc.GetSequence())
    }
    c.acc = accR
    return c.acc, nil
}

// setAccountSequence sets the sequence on the local account.
func (c *RPC) setAccountSequence(seq uint64) error {
    c.accountMutex.Lock()
    defer c.accountMutex.Unlock()
    if c.acc == nil {
        return fmt.Errorf("c.acc should not be nil. use GetAccount first")
    }
    return c.acc.SetSequence(seq)
}

// createAndSignTx build and sign a msg with client account.
func (c *RPC) createAndSignTx(msgs []sdktypes.Msg, acc authExported.Account) (tenderminttypes.Tx, error) {
    // Create TxBuilder
    txBuilder := authtypes.NewTxBuilder(
        authutils.GetTxEncoder(c.cdc),
        acc.GetAccountNumber(),
        acc.GetSequence(),
        flags.DefaultGasLimit,
        GasAdjustment,
        true,
        c.chainID,
        "",
        nil,
        c.gasPrices,
    ).WithKeybase(c.kb)

    // calculate gas
    if txBuilder.SimulateAndExecute() {
        txBytes, err := txBuilder.BuildTxForSim(msgs)
        if err != nil {
            return nil, err
        }
        _, adjusted, err := authutils.CalculateGas(c.QueryWithData, c.cdc, txBytes, txBuilder.GasAdjustment())
        if err != nil {
            return nil, err
        }
        txBuilder = txBuilder.WithGas(adjusted)
    }

    // create StdSignMsg
    stdSignMsg, err := txBuilder.BuildSignMsg(msgs)
    if err != nil {
        return nil, err
    }

    // create StdTx
    stdTx := authtypes.NewStdTx(stdSignMsg.Msgs, stdSignMsg.Fee, nil, stdSignMsg.Memo)

    // sign StdTx
    signedTx, err := txBuilder.SignStdTx(c.accName, c.accPassword, stdTx, false)
    if err != nil {
        return nil, err
    }

    return txBuilder.TxEncoder()(signedTx)
}