ReactiveX/RxRuby

View on GitHub
lib/rx/core/auto_detach_observer.rb

Summary

Maintainability
A
0 mins
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'rx/core/observer'
require 'rx/subscriptions/single_assignment_subscription'

module Rx

  class AutoDetachObserver < Rx::ObserverBase

    def on_next_core(value) 
      no_error = false
      begin
        @observer.on_next(value)
        no_error = true
      ensure
        unsubscribe unless no_error
      end
    end

    def on_error_core(error)
      begin
        @observer.on_error(error)
      ensure
        unsubscribe
      end
    end

    def on_completed_core
      begin
        @observer.on_completed
      ensure
        unsubscribe
      end
    end

    def initialize(observer)
      @observer = observer
      @m = SingleAssignmentSubscription.new

      config = ObserverConfiguration.new
      config.on_next(&method(:on_next_core))
      config.on_error(&method(:on_error_core))
      config.on_completed(&method(:on_completed_core))

      super(config)
    end

    def subscription=(new_subscription)
      @m.subscription = new_subscription
    end

    def unsubscribe
      super
      @m.unsubscribe
    end

  end
end