ReactiveX/Rx.rb

View on GitHub
lib/rx/subscriptions/serial_subscription.rb

Summary

Maintainability
A
25 mins
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'thread'

module Rx

  # Represents a subscription resource whose underlying subscription resource can be replaced by another subscription resource, causing automatic disposal of the previous underlying subscription resource.
  class SerialSubscription
    include Subscription

    def initialize
      @gate = Mutex.new
      @current = nil
      @unsubscribed = false
    end

    # Gets a value that indicates whether the object is unsubscribed.
    def unsubscribed?
      @gate.synchronize do
        return @unsubscribed
      end
    end

    # Gets the underlying subscription.
    def subscription
      @current
    end

    # Sets the underlying subscription.
    def subscription=(new_subscription)
      should_unsubscribe = false
      old = nil
      @gate.synchronize do
        should_unsubscribe = @unsubscribed
        unless should_unsubscribe
          old = @current
          @current = new_subscription
        end
      end

      old.unsubscribe if old
      new_subscription.unsubscribe if should_unsubscribe && !new_subscription.nil?
    end

    # Unsubscribes the current underlying subscription and all future subscriptions.
    def unsubscribe
      old = nil
      @gate.synchronize do
        unless @unsubscribed
          @unsubscribed = true
          old = @current
          @current = nil
        end
      end

      old.unsubscribe if old
    end

  end
end