ReactiveX/RxRuby

View on GitHub
lib/rx/subjects/subject_extensions.rb

Summary

Maintainability
A
0 mins
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'rx/core/observer'
require 'rx/core/observable'

module Rx

  # Provides a set of static methods for creating subjects.
  class Subject

    # Creates a subject from the specified observer and observable.
    def self.create(observer, observable)
      AnonymousSubject.new(observer, observable)
    end

    class AnonymousSubject
      include Observable
      include Observer

      def initialize(observer, observable)
        @observer = observer
        @observable = observable
      end

      def on_completed
        @observer.on_completed
      end

      def on_error(error)
        raise 'error cannot be nil' unless error
        @observer.on_error(error)
      end

      def on_next(value)
        @observer.on_next(value)
      end

      def subscribe(observer)
        raise 'observer cannot be nil' unless observer
        @observable.subscribe(observer)
      end
    end

  end
end