StellarCN/py-stellar-base

View on GitHub
stellar_sdk/sep/txrep.py

Summary

Maintainability
A
2 hrs
Test Coverage
"""
SEP: 0011
Title: Txrep: human-readable low-level representation of Stellar transactions
Author: David Mazières
Status: Active
Created: 2018-08-31
"""

import json
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Dict, List, Optional, Sequence, Union

from .. import xdr as stellar_xdr
from ..asset import Asset
from ..decorated_signature import DecoratedSignature
from ..fee_bump_transaction import FeeBumpTransaction
from ..fee_bump_transaction_envelope import FeeBumpTransactionEnvelope
from ..ledger_bounds import LedgerBounds
from ..liquidity_pool_asset import LiquidityPoolAsset
from ..liquidity_pool_id import LiquidityPoolId
from ..memo import *
from ..muxed_account import MuxedAccount
from ..operation import *
from ..operation.create_claimable_balance import ClaimPredicateType
from ..operation.revoke_sponsorship import RevokeSponsorshipType
from ..preconditions import Preconditions
from ..price import Price
from ..signer import Signer
from ..signer_key import SignerKey, SignerKeyType
from ..strkey import StrKey
from ..time_bounds import TimeBounds
from ..transaction import Transaction
from ..transaction_envelope import TransactionEnvelope

__all__ = ["to_txrep", "from_txrep"]

_true = "true"
_false = "false"


class _EnvelopeType(Enum):
    ENVELOPE_TYPE_TX_V0 = "ENVELOPE_TYPE_TX_V0"
    ENVELOPE_TYPE_TX = "ENVELOPE_TYPE_TX"
    ENVELOPE_TYPE_TX_FEE_BUMP = "ENVELOPE_TYPE_TX_FEE_BUMP"


def to_txrep(
    transaction_envelope: Union[TransactionEnvelope, FeeBumpTransactionEnvelope],
) -> str:
    """Generate a human-readable format for Stellar transactions.

    MuxAccount is currently not supported.

    Txrep is a human-readable representation of Stellar transactions that functions like an assembly language for XDR.

    See `SEP-0011 <https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0011.md>`_

    :param transaction_envelope: Transaction envelope object.
    :return: A human-readable format for Stellar transactions.
    """

    is_fee_bump = isinstance(transaction_envelope, FeeBumpTransactionEnvelope)
    tx_type = _EnvelopeType.ENVELOPE_TYPE_TX_FEE_BUMP.value
    if not is_fee_bump:
        assert isinstance(transaction_envelope, TransactionEnvelope)
        if transaction_envelope.transaction.v1:
            tx_type = _EnvelopeType.ENVELOPE_TYPE_TX.value
        else:
            tx_type = _EnvelopeType.ENVELOPE_TYPE_TX_V0.value

    prefix = "feeBump.tx.innerTx.tx." if is_fee_bump else "tx."

    transaction = transaction_envelope.transaction
    if is_fee_bump:
        assert isinstance(transaction_envelope, FeeBumpTransactionEnvelope)
        fee_bump_transaction_envelope = transaction_envelope
        fee_bump_transaction = fee_bump_transaction_envelope.transaction
        transaction_envelope = fee_bump_transaction.inner_transaction_envelope
        transaction = transaction_envelope.transaction

    lines: List[str] = []

    _add_line("type", tx_type, lines)
    if is_fee_bump:
        assert isinstance(fee_bump_transaction, FeeBumpTransaction)
        assert isinstance(transaction, Transaction)
        _add_line(
            "feeBump.tx.feeSource",
            _to_muxed_account(fee_bump_transaction.fee_source),
            lines,
            comment=_to_muxed_account_comment(fee_bump_transaction.fee_source),
        )
        _add_line(
            "feeBump.tx.fee",
            fee_bump_transaction.base_fee * (len(transaction.operations) + 1),
            lines,
        )
        _add_line(
            "feeBump.tx.innerTx.type", _EnvelopeType.ENVELOPE_TYPE_TX.value, lines
        )
    assert isinstance(transaction, Transaction)
    _add_line(
        f"{prefix}sourceAccount",
        _to_muxed_account(transaction.source),
        lines,
        comment=_to_muxed_account_comment(transaction.source),
    )
    _add_line(f"{prefix}fee", transaction.fee, lines)
    _add_line(f"{prefix}seqNum", transaction.sequence, lines)
    _add_preconditions(transaction.preconditions, f"{prefix}cond.", lines)
    _add_memo(transaction.memo, prefix, lines)
    _add_operations(transaction.operations, prefix, lines)
    _add_line(f"{prefix}ext.v", 0, lines)
    _add_signatures(
        transaction_envelope.signatures,
        "feeBump.tx.innerTx." if is_fee_bump else "",
        lines,
    )
    if is_fee_bump:
        _add_line("feeBump.tx.ext.v", 0, lines)
        _add_signatures(fee_bump_transaction_envelope.signatures, "feeBump.", lines)
    return "\n".join(lines)


# Setting to ignore pass in .coveragerc will cause this function to not be counted by pytest.


def from_txrep(
    txrep: str, network_passphrase: str
) -> Union[TransactionEnvelope, FeeBumpTransactionEnvelope]:
    """Parse txrep and generate transaction envelope object.

    MuxAccount is currently not supported.

    Txrep is a human-readable representation of Stellar transactions that functions like an assembly language for XDR.

    See `SEP-0011 <https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0011.md>`_

    :param txrep: a human-readable format for Stellar transactions.
    :param network_passphrase: The network to connect, you do not need to set this value at this
        time, it is reserved for future use.
    :return: A human-readable format for Stellar transactions.
    """
    raw_data_map = _get_raw_data_map(txrep)
    tx_type = _EnvelopeType(_get_value(raw_data_map, "type"))
    is_fee_bump = True if tx_type == _EnvelopeType.ENVELOPE_TYPE_TX_FEE_BUMP else False

    prefix = "feeBump.tx.innerTx.tx." if is_fee_bump else "tx."

    source = _get_value(raw_data_map, f"{prefix}sourceAccount")
    fee = _get_int_value(raw_data_map, f"{prefix}fee")
    sequence = _get_int_value(raw_data_map, f"{prefix}seqNum")
    preconditions = _get_preconditions(raw_data_map, f"{prefix}cond.")
    memo = _get_memo(raw_data_map, prefix)
    operations = _get_operations(raw_data_map, prefix)

    prefix = "feeBump.tx.innerTx." if is_fee_bump else ""
    transaction_signatures = _get_signatures(raw_data_map, prefix)

    v1 = (
        False
        if not is_fee_bump and tx_type == _EnvelopeType.ENVELOPE_TYPE_TX_V0
        else True
    )
    transaction = Transaction(
        source=source,
        sequence=sequence,
        fee=fee,
        operations=operations,
        memo=memo,
        preconditions=preconditions,
        v1=v1,
    )
    transaction_envelope = TransactionEnvelope(
        transaction=transaction,
        signatures=transaction_signatures,
        network_passphrase=network_passphrase,
    )

    if is_fee_bump:
        fee_bump_fee_source = _get_value(raw_data_map, "feeBump.tx.feeSource")
        fee_bump_fee = _get_int_value(raw_data_map, "feeBump.tx.fee")
        fee_bump_base_fee = int(fee_bump_fee / (len(operations) + 1))

        fee_bump_transaction_signatures = _get_signatures(raw_data_map, "feeBump.")

        fee_bump_transaction = FeeBumpTransaction(
            fee_source=fee_bump_fee_source,
            base_fee=fee_bump_base_fee,
            inner_transaction_envelope=transaction_envelope,
        )
        fee_bump_transaction_envelope = FeeBumpTransactionEnvelope(
            transaction=fee_bump_transaction,
            signatures=fee_bump_transaction_signatures,
            network_passphrase=network_passphrase,
        )
        return fee_bump_transaction_envelope
    return transaction_envelope


def _to_muxed_account(account: MuxedAccount) -> str:
    if account.account_muxed_id is None:
        return account.account_id
    assert account.account_muxed is not None
    return account.account_muxed


def _get_operations(raw_data_map: Dict[str, str], prefix: str) -> List[Operation]:
    operations = []
    operation_length = _get_int_value(raw_data_map, f"{prefix}operations.len")
    for i in range(operation_length):
        operation = _get_operation(i, raw_data_map, prefix)
        operations.append(operation)
    return operations


def _get_raw_data_map(txrep: str) -> Dict[str, str]:
    lines = txrep.strip().split("\n")
    raw_data_map = {}
    for line in lines:
        if line.startswith(":") or len(line.strip()) == 0:
            # remove full-line comment and blank line
            continue
        parts = line.split(":", 1)
        if len(parts) == 2:
            key = parts[0].strip()
            value = _remove_comment(parts[1])
            raw_data_map[key] = value
    return raw_data_map


def _get_preconditions(raw_data_map: Dict[str, str], prefix: str) -> Preconditions:
    preconditions_type = _get_value(raw_data_map, f"{prefix}type")
    if preconditions_type == stellar_xdr.PreconditionType.PRECOND_TIME.name:
        time_bounds = _get_time_bounds(raw_data_map, prefix)
        return Preconditions(time_bounds=time_bounds)
    elif preconditions_type == stellar_xdr.PreconditionType.PRECOND_V2.name:
        time_bounds_optional = _get_time_bounds_optional(raw_data_map, prefix)
        ledger_bounds = _get_ledger_bounds_optional(raw_data_map, prefix)
        min_sequence_number_present = _get_bool_value(
            raw_data_map, f"{prefix}minSeqNum._present"
        )
        min_sequence_number = None
        if min_sequence_number_present:
            min_sequence_number = _get_int_value(raw_data_map, f"{prefix}minSeqNum")
        min_seq_age = _get_int_value(raw_data_map, f"{prefix}minSeqAge")
        min_sequence_ledger_gap = _get_int_value(
            raw_data_map, f"{prefix}minSeqLedgerGap"
        )
        extra_signers = _get_extra_signers(raw_data_map, f"{prefix}extraSigners")
        return Preconditions(
            time_bounds_optional,
            ledger_bounds,
            min_sequence_number,
            min_seq_age,
            min_sequence_ledger_gap,
            extra_signers,
        )
    elif preconditions_type == stellar_xdr.PreconditionType.PRECOND_NONE.name:
        return Preconditions()
    else:
        raise ValueError(
            f"This preconditions type has not been implemented yet, "
            f"preconditions type: {preconditions_type}."
        )


def _get_signer_key(raw_data_map: Dict[str, str], prefix: str) -> SignerKey:
    signer_key_type = _get_value(raw_data_map, f"{prefix}.type")
    if signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_ED25519.name:
        key = _get_value(raw_data_map, f"{prefix}.ed25519")
    elif signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_HASH_X.name:
        key = _get_value(raw_data_map, f"{prefix}.hashX")
    elif signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX.name:
        key = _get_value(raw_data_map, f"{prefix}.preAuthTx")
    elif signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_ED25519_SIGNED_PAYLOAD.name:
        key = _get_value(raw_data_map, f"{prefix}.ed25519SignedPayload")
    else:
        raise ValueError(
            f"This signer key type has not been implemented yet, "
            f"signer key type: {signer_key_type}."
        )
    return SignerKey.from_encoded_signer_key(key)


def _get_extra_signers(raw_data_map: Dict[str, str], prefix: str) -> List[SignerKey]:
    extra_signers = []
    extra_signers_length = _get_int_value(raw_data_map, f"{prefix}.len")
    for i in range(extra_signers_length):
        key = _get_signer_key(raw_data_map, f"{prefix}[{i}]")
        extra_signers.append(key)
    return extra_signers


def _get_time_bounds_optional(
    raw_data_map: Dict[str, str], prefix: str
) -> Optional[TimeBounds]:
    time_bounds_present = _get_bool_value(raw_data_map, f"{prefix}timeBounds._present")
    time_bounds = None
    if time_bounds_present:
        time_bounds = _get_time_bounds(raw_data_map, prefix)
    return time_bounds


def _get_time_bounds(raw_data_map: Dict[str, str], prefix: str) -> TimeBounds:
    min_time = _get_int_value(raw_data_map, f"{prefix}timeBounds.minTime")
    max_time = _get_int_value(raw_data_map, f"{prefix}timeBounds.maxTime")
    return TimeBounds(min_time=min_time, max_time=max_time)


def _get_ledger_bounds_optional(
    raw_data_map: Dict[str, str], prefix: str
) -> Optional[LedgerBounds]:
    ledger_bounds_present = _get_bool_value(
        raw_data_map, f"{prefix}ledgerBounds._present"
    )
    ledger_bounds = None
    if ledger_bounds_present:
        ledger_bounds = _get_ledger_bounds(raw_data_map, prefix)
    return ledger_bounds


def _get_ledger_bounds(raw_data_map: Dict[str, str], prefix: str) -> LedgerBounds:
    min_ledger = _get_int_value(raw_data_map, f"{prefix}ledgerBounds.minLedger")
    max_ledger = _get_int_value(raw_data_map, f"{prefix}ledgerBounds.maxLedger")
    return LedgerBounds(min_ledger=min_ledger, max_ledger=max_ledger)


def _get_memo(raw_data_map: Dict[str, str], prefix: str) -> Memo:
    memo_type = _get_value(raw_data_map, f"{prefix}memo.type")
    if memo_type == "MEMO_TEXT":
        return TextMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.text"))
    elif memo_type == "MEMO_ID":
        return IdMemo(_get_int_value(raw_data_map, f"{prefix}memo.id"))
    elif memo_type == "MEMO_HASH":
        return HashMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.hash"))
    elif memo_type == "MEMO_RETURN":
        return ReturnHashMemo(_get_bytes_value(raw_data_map, f"{prefix}memo.retHash"))
    elif memo_type == "MEMO_NONE":
        return NoneMemo()
    else:
        raise ValueError(
            f"`{memo_type}` is not a valid memo type, expected one of `MEMO_TEXT`, `MEMO_ID`, "
            f"`MEMO_HASH`, `MEMO_RETURN`, `MEMO_NONE`."
        )


def _remove_comment(value: str) -> str:
    value = value.strip()
    if len(value) == 0:
        return value
    if value[0] == '"':
        return _remove_string_value_comment(value)
    return _remove_non_string_value_comment(value)


def _remove_non_string_value_comment(value: str) -> str:
    parts = value.split(" ")
    return parts[0]


def _remove_string_value_comment(value: str) -> str:
    v = '"'
    in_escape_sequence = False
    for char in value[1:]:
        if in_escape_sequence:
            if char == "n":
                v += "\n"
            else:
                v += char
            in_escape_sequence = False
        elif char == "\\":
            in_escape_sequence = True
        elif char == '"':
            v += char
            break
        else:
            v += char
    return v


def _get_signature(
    index: int, raw_data_map: Dict[str, str], prefix: str
) -> DecoratedSignature:
    hint = _get_bytes_value(raw_data_map, f"{prefix}signatures[{index}].hint")
    signature = _get_bytes_value(raw_data_map, f"{prefix}signatures[{index}].signature")
    return DecoratedSignature(hint, signature)


def _get_signatures(
    raw_data_map: Dict[str, str], prefix: str
) -> List[DecoratedSignature]:
    signatures: List[DecoratedSignature] = []
    signature_length = _get_int_value(raw_data_map, f"{prefix}signatures.len")
    for i in range(signature_length):
        signature = _get_signature(i, raw_data_map, prefix)
        signatures.append(signature)
    return signatures


def _get_operation(index, raw_data_map, tx_prefix):
    prefix = f"{tx_prefix}operations[{index}].body."
    source_account_id = None
    if _get_bool_value(
        raw_data_map, f"{tx_prefix}operations[{index}].sourceAccount._present"
    ):
        source_account_id = _get_value(
            raw_data_map, f"{tx_prefix}operations[{index}].sourceAccount"
        )
    operation_type = _get_value(raw_data_map, f"{prefix}type")
    if operation_type == _to_caps_with_under(AccountMerge.__name__):
        return _get_account_merge_op(source_account_id, tx_prefix, raw_data_map, index)
    elif operation_type == _to_caps_with_under(AllowTrust.__name__):
        operation_prefix = prefix + "allowTrustOp."
        return _get_allow_trust_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(BeginSponsoringFutureReserves.__name__):
        operation_prefix = prefix + "beginSponsoringFutureReservesOp."
        return _get_begin_sponsoring_future_reserves_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(BumpSequence.__name__):
        operation_prefix = prefix + "bumpSequenceOp."
        return _get_bump_sequence_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(ChangeTrust.__name__):
        operation_prefix = prefix + "changeTrustOp."
        return _get_change_trust_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(ClaimClaimableBalance.__name__):
        operation_prefix = prefix + "claimClaimableBalanceOp."
        return _get_claim_claimable_balance_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(Clawback.__name__):
        operation_prefix = prefix + "clawbackOp."
        return _get_clawback_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(ClawbackClaimableBalance.__name__):
        operation_prefix = prefix + "clawbackClaimableBalanceOp."
        return _get_clawback_claimable_balance_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(CreateAccount.__name__):
        operation_prefix = prefix + "createAccountOp."
        return _get_create_account_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(CreateClaimableBalance.__name__):
        operation_prefix = prefix + "createClaimableBalanceOp."
        return _get_create_claimable_balance_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(CreatePassiveSellOffer.__name__):
        operation_prefix = prefix + "createPassiveSellOfferOp."
        return _get_create_passive_sell_offer_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(EndSponsoringFutureReserves.__name__):
        return _get_end_sponsoring_future_reserves_op(source_account_id)
    elif operation_type == _to_caps_with_under(Inflation.__name__):
        return _get_inflation_op(source_account_id)
    elif operation_type == _to_caps_with_under(ManageBuyOffer.__name__):
        operation_prefix = prefix + "manageBuyOfferOp."
        return _get_manage_buy_offer_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(ManageData.__name__):
        operation_prefix = prefix + "manageDataOp."
        return _get_manage_data_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(ManageSellOffer.__name__):
        operation_prefix = prefix + "manageSellOfferOp."
        return _get_manage_sell_offer_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(PathPaymentStrictReceive.__name__):
        operation_prefix = prefix + "pathPaymentStrictReceiveOp."
        return _get_path_payment_strict_receive_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(PathPaymentStrictSend.__name__):
        operation_prefix = prefix + "pathPaymentStrictSendOp."
        return _get_path_payment_strict_send_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(Payment.__name__):
        operation_prefix = prefix + "paymentOp."
        return _get_payment_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(RevokeSponsorship.__name__):
        operation_prefix = prefix + "revokeSponsorshipOp."
        return _get_revoke_sponsorship_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(SetOptions.__name__):
        operation_prefix = prefix + "setOptionsOp."
        return _get_set_options_op(source_account_id, operation_prefix, raw_data_map)
    elif operation_type == _to_caps_with_under(SetTrustLineFlags.__name__):
        operation_prefix = prefix + "setTrustLineFlagsOp."
        return _get_set_trust_line_flags_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(LiquidityPoolDeposit.__name__):
        operation_prefix = prefix + "liquidityPoolDepositOp."
        return _get_liquidity_pool_deposit_op(
            source_account_id, operation_prefix, raw_data_map
        )
    elif operation_type == _to_caps_with_under(LiquidityPoolWithdraw.__name__):
        operation_prefix = prefix + "liquidityPoolWithdrawOp."
        return _get_liquidity_pool_withdraw_op(
            source_account_id, operation_prefix, raw_data_map
        )
    else:
        raise ValueError(
            f"This operation has not been implemented yet, "
            f"operation type: {operation_type}."
        )


def _get_set_options_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> SetOptions:
    inflation_dest = None
    clear_flags = None
    set_flags = None
    master_weight = None
    low_threshold = None
    med_threshold = None
    high_threshold = None
    signer = None
    home_domain = None
    if _get_bool_value(raw_data_map, f"{operation_prefix}inflationDest._present"):
        inflation_dest = _get_value(raw_data_map, f"{operation_prefix}inflationDest")
    if _get_bool_value(raw_data_map, f"{operation_prefix}clearFlags._present"):
        clear_flags = _get_int_value(raw_data_map, f"{operation_prefix}clearFlags")
    if _get_bool_value(raw_data_map, f"{operation_prefix}setFlags._present"):
        set_flags = _get_int_value(raw_data_map, f"{operation_prefix}setFlags")
    if _get_bool_value(raw_data_map, f"{operation_prefix}masterWeight._present"):
        master_weight = _get_int_value(raw_data_map, f"{operation_prefix}masterWeight")
    if _get_bool_value(raw_data_map, f"{operation_prefix}lowThreshold._present"):
        low_threshold = _get_int_value(raw_data_map, f"{operation_prefix}lowThreshold")
    if _get_bool_value(raw_data_map, f"{operation_prefix}medThreshold._present"):
        med_threshold = _get_int_value(raw_data_map, f"{operation_prefix}medThreshold")
    if _get_bool_value(raw_data_map, f"{operation_prefix}highThreshold._present"):
        high_threshold = _get_int_value(
            raw_data_map, f"{operation_prefix}highThreshold"
        )
    if _get_bool_value(raw_data_map, f"{operation_prefix}signer._present"):
        weight = _get_int_value(raw_data_map, f"{operation_prefix}signer.weight")
        key = _get_value(raw_data_map, f"{operation_prefix}signer.key")
        if key.startswith("G"):
            signer = Signer.ed25519_public_key(key, weight)
        elif key.startswith("X"):
            sha256_hash = StrKey.decode_sha256_hash(key)
            signer = Signer.sha256_hash(sha256_hash, weight)
        elif key.startswith("T"):
            pre_auth_tx_hash = StrKey.decode_pre_auth_tx(key)
            signer = Signer.pre_auth_tx(pre_auth_tx_hash, weight)
        else:
            raise ValueError("Signer key should start with `G`, `X` or `T`.")

    if _get_bool_value(raw_data_map, f"{operation_prefix}homeDomain._present"):
        home_domain = _get_string_value(raw_data_map, f"{operation_prefix}homeDomain")

    return SetOptions(
        inflation_dest=inflation_dest,
        clear_flags=clear_flags,
        set_flags=set_flags,
        master_weight=master_weight,
        low_threshold=low_threshold,
        med_threshold=med_threshold,
        high_threshold=high_threshold,
        signer=signer,
        home_domain=home_domain,
        source=source,
    )


def _get_path_payment_strict_receive_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> PathPaymentStrictReceive:
    send_asset = _get_asset(raw_data_map, f"{operation_prefix}sendAsset")
    send_max = _get_amount_value(raw_data_map, f"{operation_prefix}sendMax")
    destination = _get_value(raw_data_map, f"{operation_prefix}destination")
    dest_asset = _get_asset(raw_data_map, f"{operation_prefix}destAsset")
    dest_amount = _get_amount_value(raw_data_map, f"{operation_prefix}destAmount")
    path_length = _get_int_value(raw_data_map, f"{operation_prefix}path.len")
    path = []
    for i in range(path_length):
        asset = _get_asset(raw_data_map, f"{operation_prefix}path[{i}]")
        path.append(asset)
    return PathPaymentStrictReceive(
        destination=destination,
        send_asset=send_asset,
        send_max=send_max,
        dest_asset=dest_asset,
        dest_amount=dest_amount,
        path=path,
        source=source,
    )


def _get_path_payment_strict_send_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> PathPaymentStrictSend:
    send_asset = _get_asset(raw_data_map, f"{operation_prefix}sendAsset")
    send_amount = _get_amount_value(raw_data_map, f"{operation_prefix}sendAmount")
    destination = _get_value(raw_data_map, f"{operation_prefix}destination")
    dest_asset = _get_asset(raw_data_map, f"{operation_prefix}destAsset")
    dest_min = _get_amount_value(raw_data_map, f"{operation_prefix}destMin")
    path_length = _get_int_value(raw_data_map, f"{operation_prefix}path.len")
    path = []
    for i in range(path_length):
        asset = _get_asset(raw_data_map, f"{operation_prefix}path[{i}]")
        path.append(asset)
    return PathPaymentStrictSend(
        destination=destination,
        send_asset=send_asset,
        send_amount=send_amount,
        dest_asset=dest_asset,
        dest_min=dest_min,
        path=path,
        source=source,
    )


def _get_create_passive_sell_offer_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> CreatePassiveSellOffer:
    selling = _get_asset(raw_data_map, f"{operation_prefix}selling")
    buying = _get_asset(raw_data_map, f"{operation_prefix}buying")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n")
    price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d")
    price = Price(n=price_n, d=price_d)
    return CreatePassiveSellOffer(
        selling=selling, buying=buying, amount=amount, price=price, source=source
    )


def _get_manage_buy_offer_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ManageBuyOffer:
    selling = _get_asset(raw_data_map, f"{operation_prefix}selling")
    buying = _get_asset(raw_data_map, f"{operation_prefix}buying")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}buyAmount")
    offer_id = _get_int_value(raw_data_map, f"{operation_prefix}offerID")
    price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n")
    price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d")
    price = Price(n=price_n, d=price_d)
    return ManageBuyOffer(
        selling=selling,
        buying=buying,
        amount=amount,
        price=price,
        offer_id=offer_id,
        source=source,
    )


def _get_manage_sell_offer_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ManageSellOffer:
    selling = _get_asset(raw_data_map, f"{operation_prefix}selling")
    buying = _get_asset(raw_data_map, f"{operation_prefix}buying")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    offer_id = _get_int_value(raw_data_map, f"{operation_prefix}offerID")
    price_n = _get_int_value(raw_data_map, f"{operation_prefix}price.n")
    price_d = _get_int_value(raw_data_map, f"{operation_prefix}price.d")
    price = Price(n=price_n, d=price_d)
    return ManageSellOffer(
        selling=selling,
        buying=buying,
        amount=amount,
        price=price,
        offer_id=offer_id,
        source=source,
    )


def _get_payment_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> Payment:
    destination = _get_value(raw_data_map, f"{operation_prefix}destination")
    asset = _get_asset(raw_data_map, f"{operation_prefix}asset")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    return Payment(destination=destination, asset=asset, amount=amount, source=source)


def _get_manage_data_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ManageData:
    data_name = _get_string_value(raw_data_map, f"{operation_prefix}dataName")
    data_value = None
    if _get_bool_value(raw_data_map, f"{operation_prefix}dataValue._present"):
        data_value = _get_bytes_value(raw_data_map, f"{operation_prefix}dataValue")
    return ManageData(data_name=data_name, data_value=data_value, source=source)


def _get_bump_sequence_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> BumpSequence:
    bump_to = _get_int_value(raw_data_map, f"{operation_prefix}bumpTo")
    return BumpSequence(bump_to=bump_to, source=source)


def _get_allow_trust_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> AllowTrust:
    trustor = _get_value(raw_data_map, f"{operation_prefix}trustor")
    asset_code = _get_value(raw_data_map, f"{operation_prefix}asset")
    authorize = _get_bool_value(raw_data_map, f"{operation_prefix}authorize")
    return AllowTrust(
        trustor=trustor, asset_code=asset_code, authorize=authorize, source=source
    )


def _get_change_trust_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ChangeTrust:
    # Keep compatibility
    line_type = raw_data_map.get(f"{operation_prefix}line.type", None)
    if line_type == stellar_xdr.AssetType.ASSET_TYPE_POOL_SHARE.name:
        asset_a = _get_asset(
            raw_data_map, f"{operation_prefix}line.liquidityPool.constantProduct.assetA"
        )
        asset_b = _get_asset(
            raw_data_map, f"{operation_prefix}line.liquidityPool.constantProduct.assetB"
        )
        fee = _get_int_value(
            raw_data_map, f"{operation_prefix}line.liquidityPool.constantProduct.fee"
        )
        line: Union[Asset, LiquidityPoolAsset] = LiquidityPoolAsset(
            asset_a, asset_b, fee
        )
    else:
        line = _get_asset(raw_data_map, f"{operation_prefix}line")
    limit = _get_amount_value(raw_data_map, f"{operation_prefix}limit")
    return ChangeTrust(asset=line, limit=limit, source=source)


def _get_inflation_op(source: str) -> Inflation:
    return Inflation(source=source)


def _get_account_merge_op(
    source: str, transaction_prefix: str, raw_data_map: Dict[str, str], index: int
) -> AccountMerge:
    destination = _get_value(
        raw_data_map, f"{transaction_prefix}operations[{index}].body.destination"
    )
    return AccountMerge(destination=destination, source=source)


def _get_create_account_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> CreateAccount:
    destination = _get_value(raw_data_map, f"{operation_prefix}destination")
    starting_balance = _get_amount_value(
        raw_data_map, f"{operation_prefix}startingBalance"
    )
    return CreateAccount(
        destination=destination, starting_balance=starting_balance, source=source
    )


def _get_begin_sponsoring_future_reserves_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> BeginSponsoringFutureReserves:
    sponsored_id = _get_value(raw_data_map, f"{operation_prefix}sponsoredID")
    return BeginSponsoringFutureReserves(sponsored_id=sponsored_id, source=source)


def _get_claim_claimable_balance_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ClaimClaimableBalance:
    balance_id = _get_value(raw_data_map, f"{operation_prefix}balanceID")
    return ClaimClaimableBalance(balance_id=balance_id, source=source)


def _get_clawback_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> Clawback:
    asset = _get_asset(raw_data_map, f"{operation_prefix}asset")
    from_ = _get_value(raw_data_map, f"{operation_prefix}from")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    return Clawback(asset=asset, from_=from_, amount=amount, source=source)


def _get_clawback_claimable_balance_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> ClawbackClaimableBalance:
    balance_id = _get_value(raw_data_map, f"{operation_prefix}balanceID")
    return ClawbackClaimableBalance(balance_id=balance_id, source=source)


def _get_create_claimable_balance_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
):
    def parse_claimant_predicate(
        prefix: str, raw_data_map: Dict[str, str]
    ) -> ClaimPredicate:
        claimant_predicate_type = _get_value(raw_data_map, f"{prefix}.type")
        if claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_AND.name:
            left = parse_claimant_predicate(f"{prefix}.andPredicates[0]", raw_data_map)
            right = parse_claimant_predicate(f"{prefix}.andPredicates[1]", raw_data_map)
            return ClaimPredicate.predicate_and(left, right)
        elif claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_OR.name:
            left = parse_claimant_predicate(f"{prefix}.orPredicates[0]", raw_data_map)
            right = parse_claimant_predicate(f"{prefix}.orPredicates[1]", raw_data_map)
            return ClaimPredicate.predicate_or(left, right)
        elif claimant_predicate_type == ClaimPredicateType.CLAIM_PREDICATE_NOT.name:
            predicate = parse_claimant_predicate(f"{prefix}.notPredicate", raw_data_map)
            return ClaimPredicate.predicate_not(predicate)
        elif (
            claimant_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_ABSOLUTE_TIME.name
        ):
            before_absolute_time = _get_int_value(raw_data_map, f"{prefix}.absBefore")
            return ClaimPredicate.predicate_before_absolute_time(before_absolute_time)
        elif (
            claimant_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_RELATIVE_TIME.name
        ):
            before_relative_time = _get_int_value(raw_data_map, f"{prefix}.relBefore")
            return ClaimPredicate.predicate_before_relative_time(before_relative_time)
        else:
            raise ValueError(
                f"This claim predicate type has not been implemented yet, "
                f"claim predicate type: {claimant_predicate_type}."
            )

    asset = _get_asset(raw_data_map, f"{operation_prefix}asset")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    claimants: List[Claimant] = []
    claimants_len = _get_int_value(raw_data_map, f"{operation_prefix}claimants.len")

    for index in range(claimants_len):
        destination = _get_value(
            raw_data_map, f"{operation_prefix}claimants[{index}].v0.destination"
        )
        claimant_predicate = parse_claimant_predicate(
            f"{operation_prefix}claimants[{index}].v0.predicate", raw_data_map
        )
        claimant = Claimant(destination=destination, predicate=claimant_predicate)
        claimants.append(claimant)
    return CreateClaimableBalance(
        asset=asset, amount=amount, claimants=claimants, source=source
    )


def _get_end_sponsoring_future_reserves_op(source: str) -> EndSponsoringFutureReserves:
    return EndSponsoringFutureReserves(source=source)


def _get_revoke_sponsorship_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> RevokeSponsorship:
    revoke_sponsorship_type = _get_value(raw_data_map, f"{operation_prefix}type")
    if (
        revoke_sponsorship_type
        == stellar_xdr.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name
    ):
        ledger_entry_type = _get_value(
            raw_data_map, f"{operation_prefix}ledgerKey.type"
        )
        if ledger_entry_type == stellar_xdr.LedgerEntryType.ACCOUNT.name:
            account = _get_value(raw_data_map, f"{operation_prefix}ledgerKey.account")
            return RevokeSponsorship.revoke_account_sponsorship(
                account_id=account, source=source
            )
        elif ledger_entry_type == stellar_xdr.LedgerEntryType.TRUSTLINE.name:
            account = _get_value(
                raw_data_map, f"{operation_prefix}ledgerKey.trustLine.accountID"
            )
            asset = _get_asset(
                raw_data_map, f"{operation_prefix}ledgerKey.trustLine.asset"
            )
            return RevokeSponsorship.revoke_trustline_sponsorship(
                account_id=account, asset=asset, source=source
            )
        elif ledger_entry_type == stellar_xdr.LedgerEntryType.OFFER.name:
            sell_id = _get_value(
                raw_data_map, f"{operation_prefix}ledgerKey.offer.sellerID"
            )
            offer_id = _get_int_value(
                raw_data_map, f"{operation_prefix}ledgerKey.offer.offerID"
            )
            return RevokeSponsorship.revoke_offer_sponsorship(
                seller_id=sell_id, offer_id=offer_id, source=source
            )
        elif ledger_entry_type == stellar_xdr.LedgerEntryType.DATA.name:
            account_id = _get_value(
                raw_data_map, f"{operation_prefix}ledgerKey.data.accountID"
            )
            data_name = _get_value(
                raw_data_map, f"{operation_prefix}ledgerKey.data.dataName"
            )
            return RevokeSponsorship.revoke_data_sponsorship(
                account_id=account_id, data_name=data_name, source=source
            )
        elif ledger_entry_type == stellar_xdr.LedgerEntryType.CLAIMABLE_BALANCE.name:
            claimable_balance_id = _get_value(
                raw_data_map, f"{operation_prefix}ledgerKey.claimableBalance.balanceID"
            )
            return RevokeSponsorship.revoke_claimable_balance_sponsorship(
                claimable_balance_id=claimable_balance_id, source=source
            )
        elif ledger_entry_type == stellar_xdr.LedgerEntryType.LIQUIDITY_POOL.name:
            liquidity_pool_id = _get_value(
                raw_data_map,
                f"{operation_prefix}ledgerKey.liquidityPool.liquidityPoolID",
            )
            return RevokeSponsorship.revoke_liquidity_pool_sponsorship(
                liquidity_pool_id=liquidity_pool_id, source=source
            )
        else:
            raise ValueError(
                f"This ledger entry type has not been implemented yet, "
                f"ledger entry type: {ledger_entry_type}."
            )
    elif (
        revoke_sponsorship_type
        == stellar_xdr.RevokeSponsorshipType.REVOKE_SPONSORSHIP_SIGNER.name
    ):
        signer_key_type = _get_value(
            raw_data_map, f"{operation_prefix}signer.signerKey.type"
        )
        account_id = _get_value(raw_data_map, f"{operation_prefix}signer.accountID")
        if (
            signer_key_type
            == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX.name
        ):
            key = _get_value(
                raw_data_map, f"{operation_prefix}signer.signerKey.preAuthTx"
            )
            signer_key = SignerKey.pre_auth_tx(StrKey.decode_pre_auth_tx(key))
        elif signer_key_type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_ED25519.name:
            key = _get_value(
                raw_data_map, f"{operation_prefix}signer.signerKey.ed25519"
            )
            signer_key = SignerKey.ed25519_public_key(key)
        elif signer_key_type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_HASH_X.name:
            key = _get_value(raw_data_map, f"{operation_prefix}signer.signerKey.hashX")
            signer_key = SignerKey.sha256_hash(StrKey.decode_sha256_hash(key))
        else:
            raise ValueError(
                f"This signer key type has not been implemented yet, "
                f"signer key type: {signer_key_type}."
            )
        return RevokeSponsorship.revoke_signer_sponsorship(
            account_id=account_id, signer_key=signer_key, source=source
        )
    else:
        raise ValueError(
            f"This revoke sponsorship type has not been implemented yet, "
            f"revoke sponsorship type: {revoke_sponsorship_type}."
        )


def _get_set_trust_line_flags_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
) -> SetTrustLineFlags:
    trustor = _get_value(raw_data_map, f"{operation_prefix}trustor")
    asset = _get_asset(raw_data_map, f"{operation_prefix}asset")
    clear_flags_raw = _get_int_value(raw_data_map, f"{operation_prefix}clearFlags")
    set_flags_raw = _get_int_value(raw_data_map, f"{operation_prefix}setFlags")
    if clear_flags_raw == 0:
        clear_flags = None
    else:
        clear_flags = TrustLineFlags(clear_flags_raw)
    if set_flags_raw == 0:
        set_flags = None
    else:
        set_flags = TrustLineFlags(set_flags_raw)
    return SetTrustLineFlags(
        trustor=trustor,
        asset=asset,
        clear_flags=clear_flags,
        set_flags=set_flags,
        source=source,
    )


def _get_liquidity_pool_deposit_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
):
    liquidity_pool_id = _get_value(raw_data_map, f"{operation_prefix}liquidityPoolID")
    max_amount_a = _get_amount_value(raw_data_map, f"{operation_prefix}maxAmountA")
    max_amount_b = _get_amount_value(raw_data_map, f"{operation_prefix}maxAmountB")
    min_price_n = _get_int_value(raw_data_map, f"{operation_prefix}minPrice.n")
    min_price_d = _get_int_value(raw_data_map, f"{operation_prefix}minPrice.d")
    min_price = Price(min_price_n, min_price_d)
    max_price_n = _get_int_value(raw_data_map, f"{operation_prefix}maxPrice.n")
    max_price_d = _get_int_value(raw_data_map, f"{operation_prefix}maxPrice.d")
    max_price = Price(max_price_n, max_price_d)
    return LiquidityPoolDeposit(
        liquidity_pool_id=liquidity_pool_id,
        max_amount_a=max_amount_a,
        max_amount_b=max_amount_b,
        min_price=min_price,
        max_price=max_price,
        source=source,
    )


def _get_liquidity_pool_withdraw_op(
    source: str, operation_prefix: str, raw_data_map: Dict[str, str]
):
    liquidity_pool_id = _get_value(raw_data_map, f"{operation_prefix}liquidityPoolID")
    amount = _get_amount_value(raw_data_map, f"{operation_prefix}amount")
    min_amount_a = _get_amount_value(raw_data_map, f"{operation_prefix}minAmountA")
    min_amount_b = _get_amount_value(raw_data_map, f"{operation_prefix}minAmountB")
    return LiquidityPoolWithdraw(
        liquidity_pool_id=liquidity_pool_id,
        amount=amount,
        min_amount_a=min_amount_a,
        min_amount_b=min_amount_b,
        source=source,
    )


def _get_asset(raw_data_map: Dict[str, str], key: str) -> Asset:
    return _decode_asset(_get_value(raw_data_map, key))


def _get_amount_value(raw_data_map: Dict[str, str], key: str) -> str:
    value = _get_int_value(raw_data_map, key)
    return Operation.from_xdr_amount(value)


def _get_int_value(raw_data_map: Dict[str, str], key: str) -> int:
    value = _get_value(raw_data_map, key)
    try:
        return int(value)
    except ValueError as e:
        raise ValueError(f"Failed to convert `{value}` to int type.") from e


def _get_bool_value(raw_data_map: Dict[str, str], key: str) -> bool:
    value = _get_value(raw_data_map, key)
    if value == _true:
        return True
    elif value == _false:
        return False
    else:
        raise ValueError(f"Failed to convert `{value}` to bool type.")


def _get_bytes_value(raw_data_map: Dict[str, str], key: str) -> bytes:
    value = _get_value(raw_data_map, key)
    if value[0] == '"':
        # for text memo.
        return _get_string_value(raw_data_map, key).encode()
    try:
        return bytes.fromhex(value)
    except ValueError as e:
        raise ValueError(f"Failed to convert `{value}` to bytes type.") from e


def _get_string_value(raw_data_map: Dict[str, str], key: str) -> str:
    value = _get_value(raw_data_map, key)
    if len(value) == 0:
        return value
    return value[1:-1]


def _get_value(raw_data_map: Dict[str, str], key: str) -> str:
    try:
        return raw_data_map[key]
    except KeyError as e:
        raise ValueError(f"`{key}` is missing from txrep.") from e


def _decode_asset(asset: str) -> Asset:
    # native (or any string up to 12 characters not containing an unescaped colon) for the native asset
    if ":" not in asset and len(asset) <= 12:
        return Asset.native()
    parts = asset.split(":")
    if len(parts) != 2:
        raise ValueError("Failed to decode asset string.")
    return Asset(parts[0], parts[1])


def _add_line(
    key: str,
    value: Union[str, int],
    lines: List[str],
    comment: Union[str, int, Decimal] = None,
) -> None:
    lines.append(f"{key}: {value}{' (' + str(comment) + ')' if comment else ''}")


def _add_preconditions(
    cond: Optional[Preconditions], prefix: str, lines: List[str]
) -> None:
    if cond is None:
        cond_xdr = stellar_xdr.Preconditions(stellar_xdr.PreconditionType.PRECOND_NONE)
    else:
        cond_xdr = cond.to_xdr_object()
    _add_line(f"{prefix}type", cond_xdr.type.name, lines)
    if cond_xdr.type == stellar_xdr.PreconditionType.PRECOND_TIME:
        assert cond is not None
        assert cond.time_bounds is not None
        _add_time_bounds(cond.time_bounds, prefix, lines)
    elif cond_xdr.type == stellar_xdr.PreconditionType.PRECOND_V2:
        assert cond is not None
        _add_time_bounds_optional(cond.time_bounds, prefix, lines)
        _add_ledger_bounds_optional(cond.ledger_bounds, prefix, lines)
        if cond.min_sequence_number is None:
            _add_line(f"{prefix}minSeqNum._present", _false, lines)
        else:
            _add_line(f"{prefix}minSeqNum._present", _true, lines)
            _add_line(f"{prefix}minSeqNum", cond.min_sequence_number, lines)
        _add_line(f"{prefix}minSeqAge", cond.min_sequence_age or 0, lines)
        _add_line(f"{prefix}minSeqLedgerGap", cond.min_sequence_ledger_gap or 0, lines)
        _add_extra_signers(f"{prefix}extraSigners", cond.extra_signers, lines)
    elif cond_xdr.type == stellar_xdr.PreconditionType.PRECOND_NONE:
        pass
    else:
        raise ValueError(
            f"This preconditions type has not been implemented yet, "
            f"preconditions type: {cond_xdr.type}."
        )


def _add_signer_key(prefix: str, signer_key: SignerKey, lines: List[str]) -> None:
    _add_line(f"{prefix}.type", signer_key.signer_key_type.name, lines)
    key = signer_key.encoded_signer_key
    if signer_key.signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_ED25519:
        _add_line(f"{prefix}.ed25519", key, lines)
    elif signer_key.signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_HASH_X:
        _add_line(f"{prefix}.hashX", key, lines)
    elif signer_key.signer_key_type == SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX:
        _add_line(f"{prefix}.preAuthTx", key, lines)
    elif (
        signer_key.signer_key_type
        == SignerKeyType.SIGNER_KEY_TYPE_ED25519_SIGNED_PAYLOAD
    ):
        _add_line(f"{prefix}.ed25519SignedPayload", key, lines)
    else:
        raise ValueError(
            f"This signer key type has not been implemented yet, "
            f"signer key type: {signer_key.signer_key_type}."
        )


def _add_extra_signers(
    prefix: str, extra_signers: Optional[Sequence[SignerKey]], lines: List[str]
) -> None:
    if extra_signers is None:
        extra_signers = []
    _add_line(f"{prefix}.len", len(extra_signers), lines)
    for index, extra_signer in enumerate(extra_signers):
        _add_signer_key(f"{prefix}[{index}]", extra_signer, lines)


def _add_time_bounds_optional(
    time_bounds: Optional[TimeBounds], prefix: str, lines: List[str]
) -> None:
    if time_bounds is None:
        _add_line(f"{prefix}timeBounds._present", _false, lines)
    else:
        _add_line(f"{prefix}timeBounds._present", _true, lines)
        _add_time_bounds(time_bounds, prefix, lines)


def _add_time_bounds(time_bounds: TimeBounds, prefix: str, lines: List[str]) -> None:
    _add_line(
        f"{prefix}timeBounds.minTime",
        time_bounds.min_time,
        lines,
        _to_readable_utc_time_comment(time_bounds.min_time),
    )
    _add_line(
        f"{prefix}timeBounds.maxTime",
        time_bounds.max_time,
        lines,
        _to_readable_utc_time_comment(time_bounds.max_time),
    )


def _add_ledger_bounds_optional(
    ledger_bounds: Optional[LedgerBounds], prefix: str, lines: List[str]
) -> None:
    if ledger_bounds is None:
        _add_line(f"{prefix}ledgerBounds._present", _false, lines)
    else:
        _add_line(f"{prefix}ledgerBounds._present", _true, lines)
        _add_ledger_bounds(ledger_bounds, prefix, lines)


def _add_ledger_bounds(
    ledger_bounds: LedgerBounds, prefix: str, lines: List[str]
) -> None:
    _add_line(f"{prefix}ledgerBounds.minLedger", ledger_bounds.min_ledger, lines, None)
    _add_line(f"{prefix}ledgerBounds.maxLedger", ledger_bounds.max_ledger, lines, None)


def _add_memo(memo: Memo, prefix: str, lines: List[str]) -> None:
    if isinstance(memo, NoneMemo):
        _add_line(f"{prefix}memo.type", "MEMO_NONE", lines)
    if isinstance(memo, TextMemo):
        _add_line(f"{prefix}memo.type", "MEMO_TEXT", lines)
        # I don't think we should decode it.
        _add_line(f"{prefix}memo.text", _to_string(memo.memo_text), lines)
    if isinstance(memo, IdMemo):
        _add_line(f"{prefix}memo.type", "MEMO_ID", lines)
        _add_line(f"{prefix}memo.id", memo.memo_id, lines)
    if isinstance(memo, HashMemo):
        _add_line(f"{prefix}memo.type", "MEMO_HASH", lines)
        _add_line(f"{prefix}memo.hash", _to_opaque(memo.memo_hash), lines)
    if isinstance(memo, ReturnHashMemo):
        _add_line(f"{prefix}memo.type", "MEMO_RETURN", lines)
        _add_line(f"{prefix}memo.retHash", _to_opaque(memo.memo_return), lines)


def _add_operations(
    operations: Sequence[Operation], prefix: str, lines: List[str]
) -> None:
    _add_line(f"{prefix}operations.len", len(operations), lines)
    for index, operation in enumerate(operations):
        _add_operation(index, operation, prefix, lines)


def _add_operation(
    index: int, operation: Operation, prefix: str, lines: List[str]
) -> None:
    prefix = f"{prefix}operations[{index}]."
    operation_type = operation.__class__.__name__

    def add_operation_line(
        key: str, value: Union[str, int], comment: Union[str, int, Decimal] = None
    ) -> None:
        _add_line(f"{prefix}{key}", value, lines, comment)

    if operation.source is not None:
        add_operation_line("sourceAccount._present", _true)
        add_operation_line(
            "sourceAccount",
            _to_muxed_account(operation.source),
            comment=_to_muxed_account_comment(operation.source),
        )
    else:
        add_operation_line("sourceAccount._present", _false)

    add_operation_line("body.type", _to_caps_with_under(operation_type))

    def add_body_line(
        key: str,
        value: Union[str, int, None],
        optional: bool = False,
        comment: Union[str, int, Decimal] = None,
    ) -> None:
        operation_type = operation.__class__.__name__
        key = f"body.{_to_camel_case(operation_type)}Op.{key}"
        if optional:
            present = True if value is not None else False
            add_operation_line(f"{key}._present", _true if present else _false)
            if present:
                assert value is not None
                add_operation_line(key, value, comment=comment)
        else:
            assert value is not None
            add_operation_line(key, value, comment=comment)

    def add_signer(signer: Optional[Signer]) -> None:
        add_body_line("signer._present", _false if signer is None else _true)
        if signer is None:
            return
        add_body_line("signer.key", signer.signer_key.encoded_signer_key)
        add_body_line("signer.weight", signer.weight)

    def add_price(price: Union[Price, str, Decimal]) -> None:
        price = _to_price(price)
        add_body_line("price.n", price.n)
        add_body_line("price.d", price.d)

    def add_home_domain(home_domain: Optional[str]) -> None:
        if home_domain is None:
            add_body_line("homeDomain", None, True)
        else:
            add_body_line("homeDomain", _to_string(home_domain), True)

    def add_claim_predicate(prefix: str, claimant_predicate: ClaimPredicate):
        add_body_line(f"{prefix}.type", claimant_predicate.claim_predicate_type.name)
        if (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_UNCONDITIONAL
        ):
            pass
        elif (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_NOT
        ):
            assert claimant_predicate.not_predicate is not None
            add_claim_predicate(
                f"{prefix}.notPredicate", claimant_predicate.not_predicate
            )
        elif (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_AND
        ):
            add_body_line(f"{prefix}.andPredicates.len", 2)
            assert claimant_predicate.and_predicates is not None
            add_claim_predicate(
                f"{prefix}.andPredicates[0]", claimant_predicate.and_predicates.left
            )
            add_claim_predicate(
                f"{prefix}.andPredicates[1]", claimant_predicate.and_predicates.right
            )
        elif (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_OR
        ):
            add_body_line(f"{prefix}.orPredicates.len", 2)
            assert claimant_predicate.or_predicates is not None
            add_claim_predicate(
                f"{prefix}.orPredicates[0]", claimant_predicate.or_predicates.left
            )
            add_claim_predicate(
                f"{prefix}.orPredicates[1]", claimant_predicate.or_predicates.right
            )
        elif (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_ABSOLUTE_TIME
        ):
            assert claimant_predicate.abs_before is not None
            add_body_line(
                f"{prefix}.absBefore",
                claimant_predicate.abs_before,
                comment=_to_readable_utc_time_comment(claimant_predicate.abs_before),
            )
        elif (
            claimant_predicate.claim_predicate_type
            == ClaimPredicateType.CLAIM_PREDICATE_BEFORE_RELATIVE_TIME
        ):
            assert claimant_predicate.rel_before is not None
            add_body_line(
                f"{prefix}.relBefore",
                claimant_predicate.rel_before,
                comment=_to_readable_utc_time_comment(claimant_predicate.rel_before),
            )
        else:
            raise ValueError(
                f"This claim predicate type has not been implemented yet, "
                f"claim predicate type: {claimant_predicate.claim_predicate_type}."
            )

    if isinstance(operation, CreateAccount):
        add_body_line("destination", operation.destination)
        add_body_line(
            "startingBalance",
            _to_amount(operation.starting_balance),
            comment=operation.starting_balance,
        )
    elif isinstance(operation, Payment):
        add_body_line(
            "destination",
            _to_muxed_account(operation.destination),
            comment=_to_muxed_account_comment(operation.destination),
        )
        add_body_line("asset", _to_asset(operation.asset))
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
    elif isinstance(operation, PathPaymentStrictReceive):
        add_body_line("sendAsset", _to_asset(operation.send_asset))
        add_body_line(
            "sendMax", _to_amount(operation.send_max), comment=operation.send_max
        )
        add_body_line(
            "destination",
            _to_muxed_account(operation.destination),
            comment=_to_muxed_account_comment(operation.destination),
        )
        add_body_line("destAsset", _to_asset(operation.dest_asset))
        add_body_line(
            "destAmount",
            _to_amount(operation.dest_amount),
            comment=operation.dest_amount,
        )
        add_body_line("path.len", len(operation.path))
        for index, asset in enumerate(operation.path):
            add_body_line(f"path[{index}]", _to_asset(asset))
    elif isinstance(operation, ManageSellOffer):
        add_body_line("selling", _to_asset(operation.selling))
        add_body_line("buying", _to_asset(operation.buying))
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
        add_price(operation.price)
        add_body_line("offerID", operation.offer_id)
    elif isinstance(operation, CreatePassiveSellOffer):
        add_body_line("selling", _to_asset(operation.selling))
        add_body_line("buying", _to_asset(operation.buying))
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
        add_price(operation.price)
    elif isinstance(operation, SetOptions):
        add_body_line("inflationDest", operation.inflation_dest, True)
        add_body_line(
            "clearFlags",
            operation.clear_flags.value if operation.clear_flags else None,
            True,
        )
        add_body_line(
            "setFlags", operation.set_flags.value if operation.set_flags else None, True
        )
        add_body_line("masterWeight", operation.master_weight, True)
        add_body_line("lowThreshold", operation.low_threshold, True)
        add_body_line("medThreshold", operation.med_threshold, True)
        add_body_line("highThreshold", operation.high_threshold, True)
        add_home_domain(operation.home_domain)
        add_signer(operation.signer)
    elif isinstance(operation, ChangeTrust):
        asset_xdr = operation.asset.to_change_trust_asset_xdr_object()
        add_body_line("line.type", asset_xdr.type.name)
        if asset_xdr.type == stellar_xdr.AssetType.ASSET_TYPE_POOL_SHARE:
            assert isinstance(operation.asset, LiquidityPoolAsset)
            add_body_line(
                "line.liquidityPool.constantProduct.assetA",
                _to_asset(operation.asset.asset_a),
            )
            add_body_line(
                "line.liquidityPool.constantProduct.assetB",
                _to_asset(operation.asset.asset_b),
            )
            add_body_line("line.liquidityPool.constantProduct.fee", operation.asset.fee)
        else:
            assert isinstance(operation.asset, Asset)
            add_body_line("line", _to_asset(operation.asset))
        add_body_line("limit", _to_amount(operation.limit), comment=operation.limit)
    elif isinstance(operation, AllowTrust):
        add_body_line("trustor", operation.trustor)
        add_body_line("asset", operation.asset_code)
        add_body_line("authorize", _true if operation.authorize else _false)
    elif isinstance(operation, AccountMerge):
        # AccountMerge does not include 'accountMergeOp' prefix
        # see https://github.com/StellarCN/py-stellar-base/blob/master/.xdr/Stellar-transaction.x#L282
        add_operation_line(
            "body.destination",
            _to_muxed_account(operation.destination),
            comment=_to_muxed_account_comment(operation.destination),
        )
    elif isinstance(operation, ManageData):
        add_body_line("dataName", _to_string(operation.data_name))
        if operation.data_value is None:
            add_body_line("dataValue._present", _false)
        else:
            add_body_line("dataValue._present", _true)
            add_body_line("dataValue", _to_opaque(operation.data_value))
    elif isinstance(operation, BumpSequence):
        add_body_line("bumpTo", operation.bump_to)
    elif isinstance(operation, ManageBuyOffer):
        add_body_line("selling", _to_asset(operation.selling))
        add_body_line("buying", _to_asset(operation.buying))
        add_body_line(
            "buyAmount", _to_amount(operation.amount), comment=operation.amount
        )
        add_price(operation.price)
        add_body_line("offerID", operation.offer_id)
    elif isinstance(operation, PathPaymentStrictSend):
        add_body_line("sendAsset", _to_asset(operation.send_asset))
        add_body_line(
            "sendAmount",
            _to_amount(operation.send_amount),
            comment=operation.send_amount,
        )
        add_body_line(
            "destination",
            _to_muxed_account(operation.destination),
            comment=_to_muxed_account_comment(operation.destination),
        )
        add_body_line("destAsset", _to_asset(operation.dest_asset))
        add_body_line(
            "destMin", _to_amount(operation.dest_min), comment=operation.dest_min
        )
        add_body_line("path.len", len(operation.path))
        for index, asset in enumerate(operation.path):
            add_body_line(f"path[{index}]", _to_asset(asset))
    elif isinstance(operation, Inflation):
        # no body
        pass
    elif isinstance(operation, CreateClaimableBalance):
        add_body_line("asset", _to_asset(operation.asset))
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
        add_body_line("claimants.len", len(operation.claimants))
        for index, claimant in enumerate(operation.claimants):
            # current CLAIMANT_TYPE is CLAIMANT_TYPE_V0
            add_body_line(
                f"claimants[{index}].type",
                stellar_xdr.ClaimantType.CLAIMANT_TYPE_V0.name,
            )
            add_body_line(f"claimants[{index}].v0.destination", claimant.destination)
            add_claim_predicate(f"claimants[{index}].v0.predicate", claimant.predicate)
    elif isinstance(operation, ClaimClaimableBalance):
        add_body_line("balanceID", operation.balance_id)
    elif isinstance(operation, BeginSponsoringFutureReserves):
        add_body_line("sponsoredID", operation.sponsored_id)
    elif isinstance(operation, EndSponsoringFutureReserves):
        # no body
        pass
    elif isinstance(operation, RevokeSponsorship):
        if operation.revoke_sponsorship_type == RevokeSponsorshipType.ACCOUNT:
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.ACCOUNT.name)
            add_body_line("ledgerKey.account", operation.account_id)
        elif operation.revoke_sponsorship_type == RevokeSponsorshipType.TRUSTLINE:
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.TRUSTLINE.name)
            assert operation.trustline is not None
            add_body_line(
                "ledgerKey.trustLine.accountID", operation.trustline.account_id
            )
            add_body_line(
                "ledgerKey.trustLine.asset", _to_asset(operation.trustline.asset)
            )
        elif operation.revoke_sponsorship_type == RevokeSponsorshipType.OFFER:
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.OFFER.name)
            assert operation.offer is not None
            add_body_line("ledgerKey.offer.sellerID", operation.offer.seller_id)
            add_body_line("ledgerKey.offer.offerID", operation.offer.offer_id)
        elif operation.revoke_sponsorship_type == RevokeSponsorshipType.DATA:
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line("ledgerKey.type", stellar_xdr.LedgerEntryType.DATA.name)
            assert operation.data is not None
            add_body_line("ledgerKey.data.accountID", operation.data.account_id)
            add_body_line("ledgerKey.data.dataName", operation.data.data_name)
        elif (
            operation.revoke_sponsorship_type == RevokeSponsorshipType.CLAIMABLE_BALANCE
        ):
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line(
                "ledgerKey.type", stellar_xdr.LedgerEntryType.CLAIMABLE_BALANCE.name
            )
            add_body_line(
                "ledgerKey.claimableBalance.balanceID", operation.claimable_balance_id
            )
        elif operation.revoke_sponsorship_type == RevokeSponsorshipType.LIQUIDITY_POOL:
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_LEDGER_ENTRY.name,
            )
            add_body_line(
                "ledgerKey.type", stellar_xdr.LedgerEntryType.LIQUIDITY_POOL.name
            )
            add_body_line(
                "ledgerKey.liquidityPool.liquidityPoolID", operation.liquidity_pool_id
            )
        elif operation.revoke_sponsorship_type == RevokeSponsorshipType.SIGNER:
            assert operation.signer is not None
            add_body_line(
                "type",
                stellar_xdr.revoke_sponsorship_type.RevokeSponsorshipType.REVOKE_SPONSORSHIP_SIGNER.name,
            )
            add_body_line("signer.accountID", operation.signer.account_id)
            signer_key_xdr = operation.signer.signer_key.to_xdr_object()
            add_body_line(
                "signer.signerKey.type",
                signer_key_xdr.type.name,
            )
            key = operation.signer.signer_key.encoded_signer_key
            if signer_key_xdr.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_HASH_X:
                add_body_line("signer.signerKey.hashX", key)
            elif (
                signer_key_xdr.type == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_ED25519
            ):
                add_body_line("signer.signerKey.ed25519", key)
            elif (
                signer_key_xdr.type
                == stellar_xdr.SignerKeyType.SIGNER_KEY_TYPE_PRE_AUTH_TX
            ):
                add_body_line("signer.signerKey.preAuthTx", key)
            else:
                raise ValueError(
                    f"This signer key type has not been implemented yet, "
                    f"signer key type: {signer_key_xdr.type}."
                )
        else:
            raise ValueError(
                f"This revoke sponsorship type has not been implemented yet, "
                f"revoke sponsorship type: {operation.revoke_sponsorship_type}."
            )
    elif isinstance(operation, Clawback):
        add_body_line("asset", _to_asset(operation.asset))
        add_body_line(
            "from",
            _to_muxed_account(operation.from_),
            comment=_to_muxed_account_comment(operation.from_),
        )
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
    elif isinstance(operation, ClawbackClaimableBalance):
        add_body_line("balanceID", operation.balance_id)
    elif isinstance(operation, SetTrustLineFlags):
        add_body_line("trustor", operation.trustor)
        add_body_line("asset", _to_asset(operation.asset))
        if operation.clear_flags is None:
            add_body_line("clearFlags", 0)
        else:
            add_body_line("clearFlags", operation.clear_flags.value)
        if operation.set_flags is None:
            add_body_line("setFlags", 0)
        else:
            add_body_line("setFlags", operation.set_flags.value)
    elif isinstance(operation, LiquidityPoolDeposit):
        add_body_line("liquidityPoolID", operation.liquidity_pool_id)
        add_body_line(
            "maxAmountA",
            _to_amount(operation.max_amount_a),
            comment=operation.max_amount_a,
        )
        add_body_line(
            "maxAmountB",
            _to_amount(operation.max_amount_b),
            comment=operation.max_amount_b,
        )
        add_body_line("minPrice.n", operation.min_price.n)
        add_body_line("minPrice.d", operation.min_price.d)
        add_body_line("maxPrice.n", operation.max_price.n)
        add_body_line("maxPrice.d", operation.max_price.d)
    elif isinstance(operation, LiquidityPoolWithdraw):
        add_body_line("liquidityPoolID", operation.liquidity_pool_id)
        add_body_line("amount", _to_amount(operation.amount), comment=operation.amount)
        add_body_line(
            "minAmountA",
            _to_amount(operation.min_amount_a),
            comment=operation.min_amount_a,
        )
        add_body_line(
            "minAmountB",
            _to_amount(operation.min_amount_b),
            comment=operation.min_amount_b,
        )
    else:
        raise ValueError(
            f"This operation has not been implemented yet, "
            f"operation type: {operation}."
        )


def _add_signatures(
    signatures: Sequence[DecoratedSignature], prefix: str, lines: List[str]
) -> None:
    _add_line(f"{prefix}signatures.len", len(signatures), lines)
    for index, signature in enumerate(signatures):
        _add_signature(index, signature, prefix, lines)


def _add_signature(
    index: int, signature: DecoratedSignature, prefix: str, lines: List[str]
) -> None:
    prefix = f"{prefix}signatures[{index}]."
    _add_line(f"{prefix}hint", _to_opaque(signature.signature_hint), lines)
    _add_line(f"{prefix}signature", _to_opaque(signature.signature), lines)


def _to_asset(asset: Union[Asset, LiquidityPoolAsset, LiquidityPoolId]) -> str:
    if not isinstance(asset, Asset):
        raise ValueError("Unexpected asset type.")
    if asset.is_native():
        return "native"
    return f"{asset.code}:{asset.issuer}"


def _to_readable_utc_time_comment(timestamp: int) -> str:
    utc_time = datetime.utcfromtimestamp(timestamp)
    return utc_time.strftime("%Y-%m-%d %H:%M:%S.%f+00:00 (UTC)")


def _to_muxed_account_comment(account: MuxedAccount) -> Optional[str]:
    if account.account_muxed_id is None:
        return None
    return (
        f"accountID: {account.account_id}, accountMuxedID: {account.account_muxed_id}"
    )


def _to_amount(amount: Union[Decimal, str]) -> int:
    return Operation.to_xdr_amount(amount)


def _to_price(price: Union[Price, str, Decimal]) -> Price:
    if isinstance(price, Price):
        price_fraction = price
    else:
        price_fraction = Price.from_raw_price(price)
    return price_fraction


def _to_camel_case(cap_words: str) -> str:
    return cap_words[0].lower() + cap_words[1:]


def _to_caps_with_under(word):
    return "".join(["_" + c if c.isupper() else c for c in word]).lstrip("_").upper()


def _to_string(value: Union[str, bytes]) -> str:
    if isinstance(value, str):
        return json.dumps(value)
    # We are not following the standard here, it needs more discussion.
    try:
        value = value.decode("utf-8")
    except UnicodeDecodeError:
        assert isinstance(value, bytes)
        return _to_opaque(value)
    return json.dumps(value)


def _to_opaque(value: bytes) -> str:
    return value.hex()