activerecord/lib/active_record/connection_adapters/abstract/transaction.rb

Summary

Maintainability
D
2 days
Test Coverage
# frozen_string_literal: true

module ActiveRecord
  module ConnectionAdapters
    # = Active Record Connection Adapters Transaction State
    class TransactionState
      def initialize(state = nil)
        @state = state
        @children = nil
      end

      def add_child(state)
        @children ||= []
        @children << state
      end

      def finalized?
        @state
      end

      def committed?
        @state == :committed || @state == :fully_committed
      end

      def fully_committed?
        @state == :fully_committed
      end

      def rolledback?
        @state == :rolledback || @state == :fully_rolledback
      end

      def fully_rolledback?
        @state == :fully_rolledback
      end

      def invalidated?
        @state == :invalidated
      end

      def fully_completed?
        completed?
      end

      def completed?
        committed? || rolledback?
      end

      def rollback!
        @children&.each { |c| c.rollback! }
        @state = :rolledback
      end

      def full_rollback!
        @children&.each { |c| c.rollback! }
        @state = :fully_rolledback
      end

      def invalidate!
        @children&.each { |c| c.invalidate! }
        @state = :invalidated
      end

      def commit!
        @state = :committed
      end

      def full_commit!
        @state = :fully_committed
      end

      def nullify!
        @state = nil
      end
    end

    class TransactionInstrumenter
      def initialize(payload = {})
        @handle = nil
        @started = false
        @payload = nil
        @base_payload = payload
      end

      class InstrumentationNotStartedError < ActiveRecordError; end
      class InstrumentationAlreadyStartedError < ActiveRecordError; end

      def start
        raise InstrumentationAlreadyStartedError.new("Called start on an already started transaction") if @started
        @started = true

        @payload = @base_payload.dup
        @handle = ActiveSupport::Notifications.instrumenter.build_handle("transaction.active_record", @payload)
        @handle.start
      end

      def finish(outcome)
        raise InstrumentationNotStartedError.new("Called finish on a transaction that hasn't started") unless @started
        @started = false

        @payload[:outcome] = outcome
        @handle.finish
      end
    end

    class NullTransaction # :nodoc:
      def initialize; end
      def state; end
      def closed?; true; end
      def open?; false; end
      def joinable?; false; end
      def add_record(record, _ = true); end
      def restartable?; false; end
      def dirty?; false; end
      def dirty!; end
      def invalidated?; false; end
      def invalidate!; end
      def materialized?; false; end
      def before_commit; yield; end
      def after_commit; yield; end
      def after_rollback; end # noop
    end

    class Transaction < ActiveRecord::Transaction # :nodoc:
      attr_reader :connection, :state, :savepoint_name, :isolation_level
      attr_accessor :written

      delegate :invalidate!, :invalidated?, to: :@state

      def initialize(connection, isolation: nil, joinable: true, run_commit_callbacks: false)
        super()
        @connection = connection
        @state = TransactionState.new
        @records = nil
        @isolation_level = isolation
        @materialized = false
        @joinable = joinable
        @run_commit_callbacks = run_commit_callbacks
        @lazy_enrollment_records = nil
        @dirty = false
        @instrumenter = TransactionInstrumenter.new(connection: connection)
      end

      def dirty!
        @dirty = true
      end

      def dirty?
        @dirty
      end

      def add_record(record, ensure_finalize = true)
        @records ||= []
        if ensure_finalize
          @records << record
        else
          @lazy_enrollment_records ||= ObjectSpace::WeakMap.new
          @lazy_enrollment_records[record] = record
        end
      end

      def records
        if @lazy_enrollment_records
          @records.concat @lazy_enrollment_records.values
          @lazy_enrollment_records = nil
        end
        @records
      end

      # Can this transaction's current state be recreated by
      # rollback+begin ?
      def restartable?
        joinable? && !dirty?
      end

      def incomplete!
        @instrumenter.finish(:incomplete) if materialized?
      end

      def materialize!
        @materialized = true
        @instrumenter.start
      end

      def materialized?
        @materialized
      end

      def restore!
        if materialized?
          incomplete!
          @materialized = false
          materialize!
        end
      end

      def rollback_records
        if records
          begin
            ite = unique_records

            instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)

            run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
              record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
            end
          ensure
            ite&.each do |i|
              i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
            end
          end
        end

        @callbacks&.each(&:after_rollback)
      end

      def before_commit_records
        if @run_commit_callbacks
          if records
            if ActiveRecord.before_committed_on_all_records
              ite = unique_records

              instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
                candidates[record] = record
              end

              run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
                record.before_committed! if should_run_callbacks
              end
            else
              records.uniq.each(&:before_committed!)
            end
          end

          @callbacks&.each(&:before_commit)
        end
        # Note: When @run_commit_callbacks is false #commit_records takes care of appending
        # remaining callbacks to the parent transaction
      end

      def commit_records
        if records
          begin
            ite = unique_records

            if @run_commit_callbacks
              instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)

              run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
                record.committed!(should_run_callbacks: should_run_callbacks)
              end
            else
              while record = ite.shift
                # if not running callbacks, only adds the record to the parent transaction
                connection.add_transaction_record(record)
              end
            end
          ensure
            ite&.each { |i| i.committed!(should_run_callbacks: false) }
          end
        end

        if @run_commit_callbacks
          @callbacks&.each(&:after_commit)
        elsif @callbacks
          connection.current_transaction.append_callbacks(@callbacks)
        end
      end

      def full_rollback?; true; end
      def joinable?; @joinable; end
      def closed?; false; end
      def open?; !closed?; end

      private
        def unique_records
          records.uniq(&:__id__)
        end

        def run_action_on_records(records, instances_to_run_callbacks_on)
          while record = records.shift
            should_run_callbacks = record.__id__ == instances_to_run_callbacks_on[record].__id__

            yield record, should_run_callbacks
          end
        end

        def prepare_instances_to_run_callbacks_on(records)
          records.each_with_object({}) do |record, candidates|
            next unless record.trigger_transactional_callbacks?

            earlier_saved_candidate = candidates[record]

            next if earlier_saved_candidate && record.class.run_commit_callbacks_on_first_saved_instances_in_transaction

            # If the candidate instance destroyed itself in the database, then
            # instances which were added to the transaction afterwards, and which
            # think they updated themselves, are wrong. They should not replace
            # our candidate as an instance to run callbacks on
            next if earlier_saved_candidate&.destroyed? && !record.destroyed?

            # If the candidate instance was created inside of this transaction,
            # then instances which were subsequently loaded from the database
            # and updated need that state transferred to them so that
            # the after_create_commit callbacks are run
            record._new_record_before_last_commit = true if earlier_saved_candidate&._new_record_before_last_commit

            # The last instance to save itself is likeliest to have internal
            # state that matches what's committed to the database
            candidates[record] = record
          end
        end
    end

    # = Active Record Restart Parent \Transaction
    class RestartParentTransaction < Transaction
      def initialize(connection, parent_transaction, **options)
        super(connection, **options)

        @parent = parent_transaction

        if isolation_level
          raise ActiveRecord::TransactionIsolationError, "cannot set transaction isolation in a nested transaction"
        end

        @parent.state.add_child(@state)
      end

      delegate :materialize!, :materialized?, :restart, to: :@parent

      def rollback
        @state.rollback!
        @parent.restart
      end

      def commit
        @state.commit!
      end

      def full_rollback?; false; end
    end

    # = Active Record Savepoint \Transaction
    class SavepointTransaction < Transaction
      def initialize(connection, savepoint_name, parent_transaction, **options)
        super(connection, **options)

        parent_transaction.state.add_child(@state)

        if isolation_level
          raise ActiveRecord::TransactionIsolationError, "cannot set transaction isolation in a nested transaction"
        end

        @savepoint_name = savepoint_name
      end

      def materialize!
        connection.create_savepoint(savepoint_name)
        super
      end

      def restart
        return unless materialized?

        @instrumenter.finish(:restart)
        @instrumenter.start

        connection.rollback_to_savepoint(savepoint_name)
      end

      def rollback
        unless @state.invalidated?
          connection.rollback_to_savepoint(savepoint_name) if materialized? && connection.active?
        end
        @state.rollback!
        @instrumenter.finish(:rollback) if materialized?
      end

      def commit
        connection.release_savepoint(savepoint_name) if materialized?
        @state.commit!
        @instrumenter.finish(:commit) if materialized?
      end

      def full_rollback?; false; end
    end

    # = Active Record Real \Transaction
    class RealTransaction < Transaction
      def materialize!
        if isolation_level
          connection.begin_isolated_db_transaction(isolation_level)
        else
          connection.begin_db_transaction
        end

        super
      end

      def restart
        return unless materialized?

        @instrumenter.finish(:restart)

        if connection.supports_restart_db_transaction?
          @instrumenter.start
          connection.restart_db_transaction
        else
          connection.rollback_db_transaction
          materialize!
        end
      end

      def rollback
        connection.rollback_db_transaction if materialized?
        @state.full_rollback!
        @instrumenter.finish(:rollback) if materialized?
      end

      def commit
        connection.commit_db_transaction if materialized?
        @state.full_commit!
        @instrumenter.finish(:commit) if materialized?
      end
    end

    class TransactionManager # :nodoc:
      def initialize(connection)
        @stack = []
        @connection = connection
        @has_unmaterialized_transactions = false
        @materializing_transactions = false
        @lazy_transactions_enabled = true
      end

      def begin_transaction(isolation: nil, joinable: true, _lazy: true)
        @connection.lock.synchronize do
          run_commit_callbacks = !current_transaction.joinable?
          transaction =
            if @stack.empty?
              RealTransaction.new(
                @connection,
                isolation: isolation,
                joinable: joinable,
                run_commit_callbacks: run_commit_callbacks
              )
            elsif current_transaction.restartable?
              RestartParentTransaction.new(
                @connection,
                current_transaction,
                isolation: isolation,
                joinable: joinable,
                run_commit_callbacks: run_commit_callbacks
              )
            else
              SavepointTransaction.new(
                @connection,
                "active_record_#{@stack.size}",
                current_transaction,
                isolation: isolation,
                joinable: joinable,
                run_commit_callbacks: run_commit_callbacks
              )
            end

          unless transaction.materialized?
            if @connection.supports_lazy_transactions? && lazy_transactions_enabled? && _lazy
              @has_unmaterialized_transactions = true
            else
              transaction.materialize!
            end
          end
          @stack.push(transaction)
          transaction
        end
      end

      def disable_lazy_transactions!
        materialize_transactions
        @lazy_transactions_enabled = false
      end

      def enable_lazy_transactions!
        @lazy_transactions_enabled = true
      end

      def lazy_transactions_enabled?
        @lazy_transactions_enabled
      end

      def dirty_current_transaction
        current_transaction.dirty!
      end

      def restore_transactions
        return false unless restorable?

        @stack.each(&:restore!)

        true
      end

      def restorable?
        @stack.none?(&:dirty?)
      end

      def materialize_transactions
        return if @materializing_transactions

        if @has_unmaterialized_transactions
          @connection.lock.synchronize do
            begin
              @materializing_transactions = true
              @stack.each { |t| t.materialize! unless t.materialized? }
            ensure
              @materializing_transactions = false
            end
            @has_unmaterialized_transactions = false
          end
        end
      end

      def commit_transaction
        @connection.lock.synchronize do
          transaction = @stack.last

          begin
            transaction.before_commit_records
          ensure
            @stack.pop
          end

          dirty_current_transaction if transaction.dirty?

          transaction.commit
          transaction.commit_records
        end
      end

      def rollback_transaction(transaction = nil)
        @connection.lock.synchronize do
          transaction ||= @stack.last
          begin
            transaction.rollback
          ensure
            @stack.pop if @stack.last == transaction
          end
          transaction.rollback_records
        end
      end

      def within_new_transaction(isolation: nil, joinable: true)
        @connection.lock.synchronize do
          transaction = begin_transaction(isolation: isolation, joinable: joinable)
          begin
            yield transaction
          rescue Exception => error
            rollback_transaction
            after_failure_actions(transaction, error)

            raise
          ensure
            unless error
              if Thread.current.status == "aborting"
                rollback_transaction
              else
                begin
                  commit_transaction
                rescue ActiveRecord::ConnectionFailed
                  transaction.invalidate! unless transaction.state.completed?
                  raise
                rescue Exception
                  rollback_transaction(transaction) unless transaction.state.completed?
                  raise
                end
              end
            end
          end
        ensure
          unless transaction&.state&.completed?
            @connection.throw_away!
            transaction&.incomplete!
          end
        end
      end

      def open_transactions
        @stack.size
      end

      def current_transaction
        @stack.last || NULL_TRANSACTION
      end

      private
        NULL_TRANSACTION = NullTransaction.new.freeze

        # Deallocate invalidated prepared statements outside of the transaction
        def after_failure_actions(transaction, error)
          return unless transaction.is_a?(RealTransaction)
          return unless error.is_a?(ActiveRecord::PreparedStatementCacheExpired)
          @connection.clear_cache!
        end
    end
  end
end