ReactiveX/RxRuby

View on GitHub
lib/rx/linq/observable/from.rb

Summary

Maintainability
A
2 hrs
Test Coverage
module Rx
  class << Observable
    def from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance)
      it = iterable.to_enum
      AnonymousObservable.new {|observer|
        i = 0
        scheduler.schedule_recursive lambda {|this|
          begin
            result = it.next
          rescue StopIteration => e
            observer.on_completed
            return
          rescue => e
            observer.on_error e
            return
          end

          if Proc === map_fn
            begin
              result = map_fn.call(result, i)
            rescue => e
              observer.on_error e
              return
            end
          end

          observer.on_next result
          i += 1
          this.call
        }
      }
    end
  end
end