ReactiveX/RxRuby

View on GitHub
lib/rx/subjects/subject.rb

Summary

Maintainability
A
50 mins
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'thread'
require 'rx/core/observer'
require 'rx/core/observable'
require 'rx/subscriptions/subscription'

module Rx

  # Represents an object that is both an observable sequence as well as an observer.
  # Each notification is broadcasted to all subscribed observers.
  class Subject

    include Observable
    include Observer

    def initialize
      @observers = []
      @gate = Mutex.new
      @disposed = false
      @stopped = false
      @error = nil
    end

    # Indicates whether the subject has observers subscribed to it.
    def has_observers?
      @observers && @observers.length > 0
    end

    # Notifies all subscribed observers about the end of the sequence.
    def on_completed
      os = nil
      @gate.synchronize do 
        check_disposed

        unless @stopped
          os = @observers.clone
          @observers = []
          @stopped = true
        end 
      end

      os.each {|o| o.on_completed } if os
    end

    # Notifies all subscribed observers with the error.
    def on_error(error)
      raise 'error cannot be nil' unless error

      os = nil
      @gate.synchronize do
        check_disposed

        unless @stopped
          os = @observers.clone
          @observers = []
          @stopped = true
          @error = error
        end         
      end

      os.each {|o| o.on_error error } if os
    end

    # Notifies all subscribed observers with the value.
    def on_next(value) 
      os = nil
      @gate.synchronize do 
        check_disposed
        os = @observers.clone unless @stopped
      end

      os.each {|o| o.on_next value } if os      
    end

    # Subscribes an observer to the subject.
    def subscribe(observer)
      raise 'observer cannot be nil' unless observer

      @gate.synchronize do
        check_disposed

        if !@stopped
          @observers.push(observer)
          return InnerSubscription.new(self, observer)
        elsif @error
          observer.on_error @error
          return Subscription.empty
        else
          observer.on_completed
          return Subscription.empty
        end
      end
    end

    # Unsubscribe all observers and release resources.
    def unsubscribe
      @gate.synchronize do
        @disposed = true
        @observers = nil
      end
    end

    class InnerSubscription
      def initialize(subject, observer)
        @subject = subject
        @observer = observer
      end

      def unsubscribe
        if @observer
          @subject.send(:unsubscribe_observer, @observer)
          @subject = nil
          @observer = nil
        end
      end
    end

    private

    def unsubscribe_observer(observer)
      @gate.synchronize do
        @observers.delete(observer) if @observers
      end
    end

    def check_disposed
      raise ArgumentError.new 'Subject disposed' if @disposed
    end
  end  
end