ReactiveX/RxRuby

View on GitHub
lib/rx/core/notification.rb

Summary

Maintainability
A
0 mins
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'rx/concurrency/immediate_scheduler'
require 'rx/core/observable'

module Rx

  module Observer

    class << self
    # Creates an observer from a notification callback.
      def from_notifier
        raise ArgumentError.new 'Block required' unless block_given?

        configure do |o|
          o.on_next      {|x| yield Notification.create_on_next(x) }
          o.on_error     {|err| yield Notification.create_on_error(err) }
          o.on_completed { yield Notification.create_on_completed }
        end
      end
    end
  end

  # Represents a notification to an observer.
  module Notification

    class << self

      # Creates an object that represents an on_next notification to an observer.
      def create_on_next(value)
        OnNextNotification.new value
      end

      # Creates an object that represents an on_error notification to an observer.
      def create_on_error(error)
        OnErrorNotification.new error
      end

      # Creates an object that represents an on_completed notification to an observer.
      def create_on_completed
        OnCompletedNotification.new
      end

    end

    # Determines whether this is an on_next notification.
    def on_next?
      @kind == :on_next
    end

    # Determines whether this is an on_error notification.
    def on_error?
      @kind == :on_error
    end

    # Determines whether this is an on_completed notification.
    def on_completed?
      @kind == :on_completed
    end

    # Determines whether this notification has a value.
    def has_value?
      false
    end

    # Returns an observable sequence with a single notification.
    def to_observable(scheduler = ImmediateScheduler.instance)
      AnonymousObservable.new do |observer|
        scheduler.schedule lambda {
          accept observer
          observer.on_completed if on_next?
        }
      end
    end

  end

  # Represents an on_next notification to an observer.
  class OnNextNotification
    include Notification

    attr_reader :value

    def initialize(value)
      @value = value
      @kind = :on_next
    end

    # Determines whether this notification has a value.
    def has_value?
      true
    end

    def ==(other)
      other.class == self.class && other.on_next? && value == other.value
    end
    alias_method :eql?, :==

    def to_s
      "on_next(#{value})"
    end

    # Invokes the observer's method corresponding to the notification.
    def accept(observer)
      observer.on_next value
    end

  end

  # Represents an on_error notification to an observer.
  class OnErrorNotification
    include Notification

    attr_reader :error

    def initialize(error)
      @error = error
      @kind = :on_error
    end

    def ==(other)
      other.class == self.class && other.on_error? && error == other.error
    end
    alias_method :eql?, :==

    def to_s
      "on_error(#{error})"
    end

    # Invokes the observer's method corresponding to the notification.
    def accept(observer)
      observer.on_error error
    end

  end

  # Represents an on_completed notification to an observer.
  class OnCompletedNotification
    include Notification

    def initialize
      @kind = :on_completed
    end

    def ==(other)
      other.class == self.class && other.on_completed?
    end
    alias_method :eql?, :==

    def to_s
      "on_completed()"
    end

    # Invokes the observer's method corresponding to the notification.
    def accept(observer)
      observer.on_completed
    end    

  end

end