ReactiveX/RxRuby

View on GitHub
lib/rx/linq/observable/delay.rb

Summary

Maintainability
C
1 day
Test Coverage
module Rx
  module Observable
    def delay(due_time, scheduler = DefaultScheduler.instance)
      if Time === due_time
        delay_date(due_time, scheduler)
      else
        delay_time_span(due_time, scheduler)
      end
    end

    private

    def delay_time_span(due_time, scheduler)
      AnonymousObservable.new do |observer|
        active = false
        cancelable = SerialSubscription.new
        exception = nil
        q = []
        running = false
        subscription = materialize.timestamp(scheduler).subscribe do |notification|
          if notification[:value].on_error?
            q = []
            q.push notification
            exception = notification[:value].error
            should_run = !running
          else
            q.push({ value: notification[:value], timestamp: notification[:timestamp] + due_time })
            should_run = !active
            active = true
          end

          if should_run
            if exception != nil
              observer.on_error exception
            else
              d = SingleAssignmentSubscription.new
              cancelable.subscription = d

              d.subscription = scheduler.schedule_recursive_relative(due_time, lambda {|this|
                return if exception != nil

                running = true
                begin
                  result = nil
                  if q.length > 0 && q[0][:timestamp] - scheduler.now <= 0
                    result = q.shift[:value]
                  end
                  if result != nil
                    result.accept observer
                  end
                end while result != nil

                should_recurse = false
                recurse_due_time = 0
                if q.length > 0
                  should_recurse = true
                  recurse_due_time = [0, q[0][:timestamp] - scheduler.now].max
                else
                  active = false
                end
                e = exception
                running = false
                if e != nil
                  observer.on_error e
                elsif should_recurse
                  this.call recurse_due_time
                end
              })
            end
          end
        end

        CompositeSubscription.new [subscription, cancelable]
      end
    end

    def delay_date(due_time, scheduler)
      delay_time_span(due_time - scheduler.now, scheduler)
    end
  end
end