stellar/horizon-importer

View on GitHub
app/jobs/history/ledger_importer_job.rb

Summary

Maintainability
F
4 days
Test Coverage
require 'bigdecimal'

#
# Takes the ledger header and transaction set of the requested sequence from the
# stellar_core database and imports them into the history database.
#
class History::LedgerImporterJob < ApplicationJob

  # To allow for updated importer code, we version every history_ledger imported into the horizon by recording the
  # constant below with the new record.
  #
  # IMPORTANT: bump this number up if you ever change the behavior of the importer, so that the reimport system
  # can detect the change and update older imported ledgers.
  VERSION = 5


  EMPTY_HASH            = "0" * 64
  DEFAULT_SIGNER_WEIGHT = 1

  def perform(ledger_sequence, rebuild_allowed=false)
    stellar_core_ledger, stellar_core_transactions = load_stellar_core_data(ledger_sequence)

    if stellar_core_ledger.blank?
      raise ActiveRecord::RecordNotFound,
        "Couldn't find ledger #{ledger_sequence}"
    end

    with_db(:history) do
      first_ledger = stellar_core_ledger.ledgerseq == 1

      create_master_history_account! if first_ledger

      History::Base.transaction do
        # ensure we've imported the previous header
        unless first_ledger
          History::Ledger.validate_previous_ledger_hash!(stellar_core_ledger.prevhash, stellar_core_ledger.ledgerseq)
        end

        # clear out any existing imported data for this ledger, allowing us to re-import the data if necessary
        found = History::Ledger.where(sequence: stellar_core_ledger.ledgerseq).first

        if found.present?
          return unless rebuild_allowed
          found.transactions.each(&:destroy)
          found.accounts.each(&:destroy)
          found.destroy
        end

        result = History::Ledger.create!({
          sequence:             stellar_core_ledger.ledgerseq,
          ledger_hash:          stellar_core_ledger.ledgerhash,
          previous_ledger_hash: (stellar_core_ledger.prevhash unless first_ledger),
          closed_at:            Time.at(stellar_core_ledger.closetime),
          transaction_count:    stellar_core_transactions.length,
          operation_count:      stellar_core_transactions.map(&:operation_count).sum,
          importer_version:     VERSION,
          total_coins:          stellar_core_ledger.total_coins,
          fee_pool:             stellar_core_ledger.fee_pool,
          base_fee:             stellar_core_ledger.base_fee,
          base_reserve:         stellar_core_ledger.base_reserve,
          max_tx_set_size:      stellar_core_ledger.max_tx_set_size,
        })

        stellar_core_transactions.each do |sctx|
          next unless sctx.success?

          htx   = import_history_transaction sctx
          haccs = import_history_accounts sctx
          hops  = import_history_operations sctx, htx
          heffs = import_history_effects sctx, hops
        end

        result
      end
    end
  end

  private

  def load_stellar_core_data(ledger_sequence)
    with_db(:stellar_core) do
      ledger = StellarCore::LedgerHeader.at_sequence(ledger_sequence)

      [ledger, (ledger.transactions.to_a if ledger)]
    end
  end

  def import_history_transaction(sctx)
    htx = History::Transaction.create!({
      transaction_hash:   sctx.txid,
      ledger_sequence:    sctx.ledgerseq,
      application_order:  sctx.txindex,
      account:            sctx.submitting_address,
      account_sequence:   sctx.submitting_sequence,
      fee_paid:           sctx.fee_paid,
      operation_count:    sctx.operations.size,
      tx_envelope:        sctx.txbody,
      tx_result:          sctx.txresult_without_pair,
      tx_meta:            sctx.txmeta,
      tx_fee_meta:        sctx.fee_meta.xdr,
      signatures:         sctx.signatures,
      time_bounds:        sctx.time_bounds,
      memo_type:          sctx.memo_type,
      memo:               sctx.memo,
    })

    sctx.participant_addresses.each do |addr|
      History::TransactionParticipant.create!({
        transaction_hash:  sctx.txid,
        account: addr
      })
    end

    htx
  end

  def import_history_accounts(sctx)
    haccs = []

    sctx.operations.each_with_index do |op, i|
      next unless op.body.type == Stellar::OperationType.create_account

      pop                 = op.body.value
      destination_pk      = pop.destination
      destination_address = Stellar::Convert.pk_to_address(destination_pk)
      id                  = TotalOrderId.make(sctx.ledgerseq, sctx.txindex, i+1)

      unless History::Account.where(address: destination_address).any?
        haccs << History::Account.create!(address: destination_address, id: id)
      end
    end

    haccs
  end

  def import_history_operations(sctx, htx)
    hops = []

    sctx.operations_with_results.each_with_index do |op_and_r, i|
      application_order = i + 1
      op, result = *op_and_r

      source_account = op.source_account || sctx.source_account
      source_address = Stellar::Convert.pk_to_address(source_account)
      participant_addresses = [source_address]

      hop = History::Operation.new({
        transaction_id:     htx.id,
        application_order:  application_order,
        type:               op.body.type.value,
        source_account:     source_address,
        details:            {},
      })


      case op.body.type
      when Stellar::OperationType.create_account
        op = op.body.create_account_op!
        participant_addresses << Stellar::Convert.pk_to_address(op.destination)

        hop.details = {
          "funder"           => Stellar::Convert.pk_to_address(source_account),
          "account"          => Stellar::Convert.pk_to_address(op.destination),
          "starting_balance" => as_amount(op.starting_balance),
        }
      when Stellar::OperationType.payment
        payment = op.body.payment_op!

        hop.details = {
          "from"   => Stellar::Convert.pk_to_address(source_account),
          "to"     => Stellar::Convert.pk_to_address(payment.destination),
          "amount" => as_amount(payment.amount),
        }
        hop.details.merge! asset_details(payment.asset)

        participant_addresses << hop.details["to"]

      when Stellar::OperationType.path_payment
        payment = op.body.path_payment_op!
        result = result.tr!.path_payment_result!

        hop.details = {
          "from"          => Stellar::Convert.pk_to_address(source_account),
          "to"            => Stellar::Convert.pk_to_address(payment.destination),
          "amount"        => as_amount(payment.dest_amount),
          "source_amount" => as_amount(result.send_amount),
          "source_max"    => as_amount(payment.send_max)
        }

        hop.details.merge! asset_details(payment.dest_asset)
        hop.details.merge! asset_details(payment.send_asset, "source_")
        hop.details["path"] = payment.path.map{|a| asset_details(a)}

        participant_addresses << hop.details["to"]
      when Stellar::OperationType.manage_offer
        offer = op.body.manage_offer_op!

        hop.details = {
          "offer_id" => offer.offer_id,
          "amount"   => as_amount(offer.amount),
          "price" => price_string(offer.price),
          "price_r"    => {
            "n" => offer.price.n,
            "d" => offer.price.d,
          },
        }

        hop.details.merge!(asset_details(offer.selling, "selling_"))
        hop.details.merge!(asset_details(offer.buying, "buying_"))
      when Stellar::OperationType.create_passive_offer
        offer = op.body.create_passive_offer_op!

        hop.details = {
          "amount"    => as_amount(offer.amount),
          "price" => price_string(offer.price),
          "price_r"     => {
            "n" => offer.price.n,
            "d" => offer.price.d,
          }
        }

        hop.details.merge!(asset_details(offer.selling, "selling_"))
        hop.details.merge!(asset_details(offer.buying, "buying_"))
      when Stellar::OperationType.set_options
        sop = op.body.set_options_op!
        hop.details = {}

        if sop.inflation_dest.present?
          hop.details["inflation_dest"] = Stellar::Convert.pk_to_address(sop.inflation_dest)
        end

        parsed = Stellar::AccountFlags.parse_mask(sop.set_flags || 0)
        if parsed.any?
          hop.details["set_flags"] = parsed.map(&:value)
          hop.details["set_flags_s"] = parsed.map(&:name)
        end

        parsed = Stellar::AccountFlags.parse_mask(sop.clear_flags || 0)
        if parsed.any?
          hop.details["clear_flags"] = parsed.map(&:value)
          hop.details["clear_flags_s"] = parsed.map(&:name)
        end

        if sop.master_weight.present?
          hop.details["master_key_weight"] = sop.master_weight
        end

        if sop.low_threshold.present?
          hop.details["low_threshold"]     = sop.low_threshold
        end

        if sop.med_threshold.present?
          hop.details["med_threshold"]  = sop.med_threshold
        end

        if sop.high_threshold.present?
          hop.details["high_threshold"]    = sop.high_threshold
        end

        if sop.home_domain.present?
          hop.details["home_domain"] = sop.home_domain
        end

        if sop.signer.present?
          hop.details.merge!({
            "signer_key"    => Stellar::Convert.pk_to_address(sop.signer.pub_key),
            "signer_weight" => sop.signer.weight,
          })
        end

      when Stellar::OperationType.change_trust
        ctop        = op.body.change_trust_op!
        asset    = ctop.line

        hop.details = {
          "trustor" => Stellar::Convert.pk_to_address(source_account),
          "limit"   => as_amount(ctop.limit),
        }
        hop.details.merge! asset_details(asset)
        hop.details["trustee"] = hop.details["asset_issuer"]

        if asset.type == Stellar::AssetType.asset_type_native
          raise "native asset in change_trust_op"
        end

      when Stellar::OperationType.allow_trust
        atop  = op.body.allow_trust_op!
        asset = atop.asset

        hop.details = {
          "trustee"         => Stellar::Convert.pk_to_address(source_account),
          "trustor"         => Stellar::Convert.pk_to_address(atop.trustor),
          "authorize"       => atop.authorize
        }

        case asset.type
        when Stellar::AssetType.asset_type_native
          raise "native asset in allow_trust_op"
        when Stellar::AssetType.asset_type_credit_alphanum4
          hop.details["asset_type"]   = "credit_alphanum4"
          hop.details["asset_code"]   = asset.asset_code4!.strip
          hop.details["asset_issuer"] = Stellar::Convert.pk_to_address source_account
        when Stellar::AssetType.asset_type_credit_alphanum12
          hop.details["asset_type"]   = "credit_alphanum12"
          hop.details["asset_code"]   = asset.asset_code12!.strip
          hop.details["asset_issuer"] = Stellar::Convert.pk_to_address source_account
        else
          raise "Unknown asset type: #{asset.type}"
        end

      when Stellar::OperationType.account_merge
        destination  = op.body.destination!
        hop.details = {
          "account"   => Stellar::Convert.pk_to_address(source_account),
          "into"     => Stellar::Convert.pk_to_address(destination)
        }
        participant_addresses << hop.details["into"]
      when Stellar::OperationType.inflation
        #Inflation has no details, presently.
      end


      hop.save!
      hops << hop

      participant_addresses.uniq!
      # now import the participants from this operation
      participants = History::Account.where(address:participant_addresses).all

      unless participants.length == participant_addresses.length
        raise "Could not find all participants"
      end

      participants.each do |account|
        History::OperationParticipant.create!({
          history_account:   account,
          history_operation: hop,
        })
      end
    end

    hops
  end

  def import_history_effects(sctx, hops)
    heffs = []

    sctx.operations_with_results.each_with_index do |op_and_r, application_order|
      scop, scresult = *op_and_r
      hop = hops[application_order]

      heffs.concat import_history_effects_for_operation(sctx, scop, scresult, hop)
    end

    heffs
  end

  def import_history_effects_for_operation(sctx, scop, scresult, hop)
    effects = History::EffectFactory.new(hop)
    source_account = scop.source_account || sctx.source_account
    op_index = sctx.operations.index(scop)
    scopm = sctx.meta.operations![op_index]

    case hop.type_as_enum
    when Stellar::OperationType.create_account
      scop = scop.body.create_account_op!

      effects.create!("account_created", scop.destination, {
        starting_balance: as_amount(scop.starting_balance),
      })

      effects.create!("account_debited", source_account, {
        asset_type: "native",
        amount: as_amount(scop.starting_balance)
      })

      effects.create!("signer_created", scop.destination, {
        public_key: Stellar::Convert.pk_to_address(scop.destination),
        weight: DEFAULT_SIGNER_WEIGHT,
      })
    when Stellar::OperationType.payment
      scop = scop.body.payment_op!
      details = { amount: as_amount(scop.amount) }
      details.merge!  asset_details(scop.asset)
      effects.create!("account_credited", scop.destination, details)
      effects.create!("account_debited", source_account, details)
    when Stellar::OperationType.path_payment
      scop = scop.body.path_payment_op!

      dest_details = { amount: as_amount(scop.dest_amount) }
      dest_details.merge!  asset_details(scop.dest_asset)

      scresult = scresult.tr!.path_payment_result!
      source_details = { amount: as_amount(scresult.send_amount) }
      source_details.merge!  asset_details(scop.send_asset)

      effects.create!("account_credited", scop.destination, dest_details)
      effects.create!("account_debited", source_account, source_details)

      make_trades effects, source_account, scresult.success!.offers
    when Stellar::OperationType.manage_offer
      scresult = scresult.tr!.manage_offer_result!.success!
      make_trades effects, source_account, scresult.offers_claimed
    when Stellar::OperationType.create_passive_offer
      scresult = scresult.tr!.manage_offer_result!.success!
      make_trades effects, source_account, scresult.offers_claimed
    when Stellar::OperationType.set_options
      scop = scop.body.set_options_op!

      unless scop.home_domain.nil?
        effects.create!("account_home_domain_updated", source_account, {
          "home_domain" => scop.home_domain
        })
      end

      thresholds_changed = scop.low_threshold.present? ||
                           scop.med_threshold.present? ||
                           scop.high_threshold.present?


      if thresholds_changed
        details = {}
        details["low_threshold"]  = scop.low_threshold  if scop.low_threshold.present?
        details["med_threshold"]  = scop.med_threshold  if scop.med_threshold.present?
        details["high_threshold"] = scop.high_threshold if scop.high_threshold.present?
        effects.create!("account_thresholds_updated", source_account, details)
      end

      flag_changes = {}
      Stellar::AccountFlags.parse_mask(scop.set_flags || 0).each do |af|
        flag_changes [af.name] = true
      end
      Stellar::AccountFlags.parse_mask(scop.clear_flags || 0).each do |af|
        flag_changes [af.name] = false
      end

      if flag_changes.any?
        effects.create!("account_flags_updated", source_account, flag_changes)
      end

      if scop.master_weight.present?
        #TODO: BLOCKED stellar-core: differentiate signer_updated and signer_added
        #for master signer
        effect = scop.master_weight == 0 ? "signer_removed" : "signer_updated"

        effects.create!(effect, source_account, {
          public_key: Stellar::Convert.pk_to_address(source_account),
          weight: scop.master_weight,
        })
      end

      if scop.signer.present?
        effect = if scop.signer.weight == 0
                   "signer_removed"
                 else
                   #TODO: BLOCKED stellar-core: distinguish between new signers and updated signers
                   "signer_created"
                 end

        effects.create!(effect, source_account, {
          public_key: Stellar::Convert.pk_to_address(scop.signer.pub_key),
          weight: scop.signer.weight,
        })
      end

    when Stellar::OperationType.change_trust
      scop = scop.body.change_trust_op!
      effect = if scop.limit == 0
                 'trustline_removed'
               else
                 tlm = scopm.changes.first #TODO: add a less brittle method of finding the trustline entry in the meta
                 if tlm.blank?
                   'trustline_updated'
                 elsif tlm.type == Stellar::LedgerEntryChangeType.ledger_entry_created
                   'trustline_created'
                 else
                   'trustline_updated'
                 end
               end

      details = asset_details(scop.line)
      details["limit"] = as_amount(scop.limit)

      effects.create!(effect, source_account, details)
    when Stellar::OperationType.allow_trust
      scop = scop.body.allow_trust_op!
      asset = scop.asset
      effect = scop.authorize ? "trustline_authorized" : "trustline_deauthorized"
      details = {
        "trustor" => Stellar::Convert.pk_to_address(scop.trustor),
      }

      case asset.type
      when Stellar::AssetType.asset_type_native
        raise "native asset in allow_trust_op"
      when Stellar::AssetType.asset_type_credit_alphanum4
        details["asset_type"]   = "credit_alphanum4"
        details["asset_code"]   = asset.asset_code4!.strip
      when Stellar::AssetType.asset_type_credit_alphanum12
        details["asset_type"]   = "credit_alphanum12"
        details["asset_code"]   = asset.asset_code12!.strip
      else
        raise "Unknown asset type: #{asset.type}"
      end


      effects.create!(effect, source_account, details)
    when Stellar::OperationType.account_merge
      destination = scop.body.destination!
      scresult = scresult.tr!.account_merge_result!
      details = { amount: as_amount(scresult.source_account_balance!), asset_type: "native" }
      effects.create!("account_debited", source_account, details)
      effects.create!("account_credited", destination, details)
      effects.create!("account_removed", source_account, {})
    when Stellar::OperationType.inflation
      payouts = scresult.tr!.inflation_result!.payouts!

      payouts.each do |payout|
        details = { amount: as_amount(payout.amount), asset_type: "native" }
        effects.create!("account_credited", payout.destination, details)
      end
    else
      Rails.logger.info "Unknown type: #{hop.type_as_enum.name}.  skipping effects import"
    end

    effects.results
  end

  def asset_details(asset, prefix="")
    case asset.type
    when Stellar::AssetType.asset_type_native
      { "#{prefix}asset_type" => "native" }
    when Stellar::AssetType.asset_type_credit_alphanum4
      coded_asset_details(asset, prefix, "credit_alphanum4")
    when Stellar::AssetType.asset_type_credit_alphanum12
      coded_asset_details(asset, prefix, "credit_alphanum12")
    else
      raise "Unknown asset type: #{asset.type}"
    end
  end

  def coded_asset_details(asset, prefix, type)
    {
      "#{prefix}asset_type"   => type,
      "#{prefix}asset_code"   => asset.code.strip,
      "#{prefix}asset_issuer" => Stellar::Convert.pk_to_address(asset.issuer),
    }
  end

  #
  # This method ensures that we create the history_account record for the
  # master account, which is a special case because it never shows up as
  # a new account in some transaction's metadata.
  #
  def create_master_history_account!
    return if History::Account.where(id:1).any?
    History::Account.create!(address: Stellar::KeyPair.master.address, id: 1)
  end

  # given the provided account and a set of claim_offer_atoms, produce 2 trade
  # effects (one for the buyer, one for the sellar) for each claim_offer_atom
  def make_trades(effects, buyer, claim_offer_atoms)
    claim_offer_atoms.each{|coa| make_trade effects, buyer, coa}
  end

  def make_trade(effects, buyer, claimed_offer)
    seller = claimed_offer.seller_id

    buyer_details = {
      "offer_id"      => claimed_offer.offer_id,
      "seller"        => Stellar::Convert.pk_to_address(seller),
      "bought_amount" => as_amount(claimed_offer.amount_sold),
      "sold_amount"   => as_amount(claimed_offer.amount_bought),
    }
    buyer_details.merge! asset_details(claimed_offer.asset_sold, "bought_")
    buyer_details.merge! asset_details(claimed_offer.asset_bought, "sold_")

    seller_details = {
      "offer_id"      => claimed_offer.offer_id,
      "seller"        => Stellar::Convert.pk_to_address(buyer),
      "bought_amount" => as_amount(claimed_offer.amount_bought),
      "sold_amount"   => as_amount(claimed_offer.amount_sold),
    }
    seller_details.merge! asset_details(claimed_offer.asset_bought, "bought_")
    seller_details.merge! asset_details(claimed_offer.asset_sold, "sold_")

    effects.create!("trade", buyer, buyer_details)
    effects.create!("trade", seller, seller_details)
  end

  def as_amount(raw_amount)
    raw = (BigDecimal.new(raw_amount) / BigDecimal.new(Stellar::ONE)).round(7, :truncate).to_s("F")
    l, r = *raw.split(".", 2)
    r = r.ljust(7, '0')
    "#{l}.#{r}"
  end

  def price_string(price)
    raw = (BigDecimal.new(price.n) / BigDecimal.new(price.d)).round(7, :truncate).to_s("F")
    l, r = *raw.split(".", 2)
    r = r.ljust(7, '0')
    "#{l}.#{r}"
  end
end