ReactiveX/RxRuby

View on GitHub
lib/rx/testing/test_scheduler.rb

Summary

Maintainability
A
1 hr
Test Coverage
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

require 'rx/concurrency/virtual_time_scheduler'
require 'rx/subscriptions/subscription'
require 'rx/testing/cold_observable'
require 'rx/testing/hot_observable'
require 'rx/testing/mock_observer'
require 'rx/testing/reactive_test'

module Rx

  # Virtual time scheduler used for testing applications and libraries built using Reactive Extensions.
  class TestScheduler < VirtualTimeScheduler

    def initialize
      super(0)
    end

    # Schedules an action to be executed at due_time.
    def schedule_at_absolute_with_state(state, due_time, action)
      raise 'action cannot be nil' unless action
      
      due_time = clock + 1 if due_time <= clock

      super(state, due_time, action)
    end

    # Adds a relative virtual time to an absolute virtual time value.
    def add(absolute, relative)
      absolute + relative
    end

    # Converts the absolute time value to a Time value.
    def to_time(absolute)
      Time.at absolute
    end

    # Converts the time span value to a relative time value.
    def to_relative(time_span)
      time_span
    end    

    # Starts the test scheduler and uses the specified virtual times to invoke the factory function, subscribe to the resulting sequence, and unsubscribe the subscription.
    def configure(options = {})
      options.each {|key,_|
        unless [:created, :subscribed, :disposed].include? key
          raise ArgumentError, "Should be specified whether :created, :subscribed or :disposed, but the #{key.inspect}"
        end
      }
      o = {
        :created    => ReactiveTest::CREATED,
        :subscribed => ReactiveTest::SUBSCRIBED,
        :disposed   => ReactiveTest::DISPOSED
      }.merge(options)

      source = nil
      subscription = nil
      observer = create_observer

      schedule_at_absolute_with_state(nil, o[:created], lambda {|scheduler, state|
        source = yield
        Subscription.empty
      })

      schedule_at_absolute_with_state(nil, o[:subscribed], lambda {|scheduler, state|
        subscription = source.subscribe observer
        Subscription.empty
      })

       schedule_at_absolute_with_state(nil, o[:disposed], lambda {|scheduler, state|
        subscription.unsubscribe
        Subscription.empty
      })

      start
      
      observer           
    end

    # Creates a hot observable using the specified timestamped notification messages.
    def create_hot_observable(*args)
      HotObservable.new(self, *args)
    end

    # Creates a cold observable using the specified timestamped notification messages.
    def create_cold_observable(*args)
      ColdObservable.new(self, *args)
    end

    # Creates an observer that records received notification messages and timestamps those.
    def create_observer
      MockObserver.new self
    end

  end
end