Reactive-Extensions/RxJS

View on GitHub
modules/rx-lite-time/rx.lite.time.js

Summary

Maintainability
F
1 mo
Test Coverage
File `rx.lite.time.js` has 563 lines of code (exceeds 250 allowed). Consider refactoring.
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
 
Similar blocks of code found in 2 locations. Consider refactoring.
;(function (factory) {
var objectTypes = {
'function': true,
'object': true
};
 
function checkGlobal(value) {
return (value && value.Object === Object) ? value : null;
}
 
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
var freeSelf = checkGlobal(objectTypes[typeof self] && self);
var freeWindow = checkGlobal(objectTypes[typeof window] && window);
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
 
// Because of build optimizers
if (typeof define === 'function' && define.amd) {
define(['./rx.lite'], function (Rx, exports) {
return factory(root, exports, Rx);
});
} else if (typeof module === 'object' && module && module.exports === freeExports) {
module.exports = factory(root, module.exports, require('rx-lite'));
} else {
root.Rx = factory(root, {}, root.Rx);
}
}.call(this, function (root, exp, Rx, undefined) {
 
// Refernces
var inherits = Rx.internals.inherits,
AbstractObserver = Rx.internals.AbstractObserver,
Observable = Rx.Observable,
observableProto = Observable.prototype,
AnonymousObservable = Rx.AnonymousObservable,
ObservableBase = Rx.ObservableBase,
observableDefer = Observable.defer,
observableEmpty = Observable.empty,
observableNever = Observable.never,
observableThrow = Observable['throw'],
observableFromArray = Observable.fromArray,
defaultScheduler = Rx.Scheduler['default'],
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
SerialDisposable = Rx.SerialDisposable,
CompositeDisposable = Rx.CompositeDisposable,
BinaryDisposable = Rx.BinaryDisposable,
RefCountDisposable = Rx.RefCountDisposable,
Subject = Rx.Subject,
addRef = Rx.internals.addRef,
normalizeTime = Rx.Scheduler.normalize,
helpers = Rx.helpers,
isPromise = helpers.isPromise,
isFunction = helpers.isFunction,
isScheduler = Rx.Scheduler.isScheduler,
observableFromPromise = Observable.fromPromise;
 
var errorObj = {e: {}};
function tryCatcherGen(tryCatchTarget) {
return function tryCatcher() {
try {
return tryCatchTarget.apply(this, arguments);
} catch (e) {
errorObj.e = e;
return errorObj;
}
};
}
 
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
return tryCatcherGen(fn);
};
 
function thrower(e) {
throw e;
}
 
/**
* Projects each element of an observable sequence into zero or more windows which are produced based on timing information.
* @param {Number} timeSpan Length of each window (specified as an integer denoting milliseconds).
* @param {Mixed} [timeShiftOrScheduler] Interval between creation of consecutive windows (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timeSpan parameter, resulting in non-overlapping adjacent windows.
* @param {Scheduler} [scheduler] Scheduler to run windowing timers on. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence of windows.
*/
Function `windowTime` has 69 lines of code (exceeds 25 allowed). Consider refactoring.
observableProto.windowWithTime = observableProto.windowTime = function (timeSpan, timeShiftOrScheduler, scheduler) {
var source = this, timeShift;
timeShiftOrScheduler == null && (timeShift = timeSpan);
isScheduler(scheduler) || (scheduler = defaultScheduler);
if (typeof timeShiftOrScheduler === 'number') {
timeShift = timeShiftOrScheduler;
} else if (isScheduler(timeShiftOrScheduler)) {
timeShift = timeSpan;
scheduler = timeShiftOrScheduler;
}
return new AnonymousObservable(function (observer) {
var groupDisposable,
nextShift = timeShift,
nextSpan = timeSpan,
q = [],
refCountDisposable,
timerD = new SerialDisposable(),
totalTime = 0;
groupDisposable = new CompositeDisposable(timerD),
refCountDisposable = new RefCountDisposable(groupDisposable);
 
Function `createTimer` has 30 lines of code (exceeds 25 allowed). Consider refactoring.
function createTimer () {
var m = new SingleAssignmentDisposable(),
isSpan = false,
isShift = false;
timerD.setDisposable(m);
if (nextSpan === nextShift) {
isSpan = true;
isShift = true;
} else if (nextSpan < nextShift) {
isSpan = true;
} else {
isShift = true;
}
var newTotalTime = isSpan ? nextSpan : nextShift,
ts = newTotalTime - totalTime;
totalTime = newTotalTime;
if (isSpan) {
nextSpan += timeShift;
}
if (isShift) {
nextShift += timeShift;
}
m.setDisposable(scheduler.scheduleFuture(null, ts, function () {
if (isShift) {
var s = new Subject();
q.push(s);
observer.onNext(addRef(s, refCountDisposable));
}
isSpan && q.shift().onCompleted();
createTimer();
}));
};
q.push(new Subject());
observer.onNext(addRef(q[0], refCountDisposable));
createTimer();
groupDisposable.add(source.subscribe(
function (x) {
for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); }
},
function (e) {
for (var i = 0, len = q.length; i < len; i++) { q[i].onError(e); }
observer.onError(e);
},
function () {
for (var i = 0, len = q.length; i < len; i++) { q[i].onCompleted(); }
observer.onCompleted();
}
));
return refCountDisposable;
}, source);
};
 
/**
* Projects each element of an observable sequence into a window that is completed when either it's full or a given amount of time has elapsed.
* @param {Number} timeSpan Maximum time length of a window.
* @param {Number} count Maximum element count of a window.
* @param {Scheduler} [scheduler] Scheduler to run windowing timers on. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence of windows.
*/
Function `windowTimeOrCount` has 48 lines of code (exceeds 25 allowed). Consider refactoring.
observableProto.windowWithTimeOrCount = observableProto.windowTimeOrCount = function (timeSpan, count, scheduler) {
var source = this;
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new AnonymousObservable(function (observer) {
var timerD = new SerialDisposable(),
groupDisposable = new CompositeDisposable(timerD),
refCountDisposable = new RefCountDisposable(groupDisposable),
n = 0,
windowId = 0,
s = new Subject();
 
function createTimer(id) {
var m = new SingleAssignmentDisposable();
timerD.setDisposable(m);
m.setDisposable(scheduler.scheduleFuture(null, timeSpan, function () {
if (id !== windowId) { return; }
n = 0;
var newId = ++windowId;
s.onCompleted();
s = new Subject();
observer.onNext(addRef(s, refCountDisposable));
createTimer(newId);
}));
}
 
observer.onNext(addRef(s, refCountDisposable));
createTimer(0);
 
groupDisposable.add(source.subscribe(
function (x) {
var newId = 0, newWindow = false;
s.onNext(x);
if (++n === count) {
newWindow = true;
n = 0;
newId = ++windowId;
s.onCompleted();
s = new Subject();
observer.onNext(addRef(s, refCountDisposable));
}
newWindow && createTimer(newId);
},
function (e) {
s.onError(e);
observer.onError(e);
}, function () {
s.onCompleted();
observer.onCompleted();
}
));
return refCountDisposable;
}, source);
};
 
function toArray(x) { return x.toArray(); }
 
/**
* Projects each element of an observable sequence into zero or more buffers which are produced based on timing information.
* @param {Number} timeSpan Length of each buffer (specified as an integer denoting milliseconds).
* @param {Mixed} [timeShiftOrScheduler] Interval between creation of consecutive buffers (specified as an integer denoting milliseconds), or an optional scheduler parameter. If not specified, the time shift corresponds to the timeSpan parameter, resulting in non-overlapping adjacent buffers.
* @param {Scheduler} [scheduler] Scheduler to run buffer timers on. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence of buffers.
*/
observableProto.bufferWithTime = observableProto.bufferTime = function (timeSpan, timeShiftOrScheduler, scheduler) {
return this.windowWithTime(timeSpan, timeShiftOrScheduler, scheduler).flatMap(toArray);
};
 
function toArray(x) { return x.toArray(); }
 
/**
* Projects each element of an observable sequence into a buffer that is completed when either it's full or a given amount of time has elapsed.
* @param {Number} timeSpan Maximum time length of a buffer.
* @param {Number} count Maximum element count of a buffer.
* @param {Scheduler} [scheduler] Scheduler to run bufferin timers on. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence of buffers.
*/
observableProto.bufferWithTimeOrCount = observableProto.bufferTimeOrCount = function (timeSpan, count, scheduler) {
return this.windowWithTimeOrCount(timeSpan, count, scheduler).flatMap(toArray);
};
 
var TimeIntervalObservable = (function (__super__) {
inherits(TimeIntervalObservable, __super__);
function TimeIntervalObservable(source, s) {
this.source = source;
this._s = s;
__super__.call(this);
}
 
TimeIntervalObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new TimeIntervalObserver(o, this._s));
};
 
return TimeIntervalObservable;
}(ObservableBase));
 
var TimeIntervalObserver = (function (__super__) {
inherits(TimeIntervalObserver, __super__);
 
function TimeIntervalObserver(o, s) {
this._o = o;
this._s = s;
this._l = s.now();
__super__.call(this);
}
 
TimeIntervalObserver.prototype.next = function (x) {
var now = this._s.now(), span = now - this._l;
this._l = now;
this._o.onNext({ value: x, interval: span });
};
TimeIntervalObserver.prototype.error = function (e) { this._o.onError(e); };
TimeIntervalObserver.prototype.completed = function () { this._o.onCompleted(); };
 
return TimeIntervalObserver;
}(AbstractObserver));
 
/**
* Records the time interval between consecutive values in an observable sequence.
*
* @example
* 1 - res = source.timeInterval();
* 2 - res = source.timeInterval(Rx.Scheduler.timeout);
*
* @param [scheduler] Scheduler used to compute time intervals. If not specified, the timeout scheduler is used.
* @returns {Observable} An observable sequence with time interval information on values.
*/
observableProto.timeInterval = function (scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new TimeIntervalObservable(this, scheduler);
};
 
Function `GenerateAbsoluteObservable` has 41 lines of code (exceeds 25 allowed). Consider refactoring.
var GenerateAbsoluteObservable = (function (__super__) {
inherits(GenerateAbsoluteObservable, __super__);
Function `GenerateAbsoluteObservable` has 6 arguments (exceeds 4 allowed). Consider refactoring.
function GenerateAbsoluteObservable(state, cndFn, itrFn, resFn, timeFn, s) {
this._state = state;
this._cndFn = cndFn;
this._itrFn = itrFn;
this._resFn = resFn;
this._timeFn = timeFn;
this._s = s;
__super__.call(this);
}
 
function scheduleRecursive(state, recurse) {
state.hasResult && state.o.onNext(state.result);
 
if (state.first) {
state.first = false;
} else {
state.newState = tryCatch(state.self._itrFn)(state.newState);
if (state.newState === errorObj) { return state.o.onError(state.newState.e); }
}
state.hasResult = tryCatch(state.self._cndFn)(state.newState);
if (state.hasResult === errorObj) { return state.o.onError(state.hasResult.e); }
if (state.hasResult) {
state.result = tryCatch(state.self._resFn)(state.newState);
if (state.result === errorObj) { return state.o.onError(state.result.e); }
var time = tryCatch(state.self._timeFn)(state.newState);
if (time === errorObj) { return state.o.onError(time.e); }
recurse(state, time);
} else {
state.o.onCompleted();
}
}
 
GenerateAbsoluteObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
self: this,
newState: this._state,
first: true,
hasResult: false
};
return this._s.scheduleRecursiveFuture(state, new Date(this._s.now()), scheduleRecursive);
};
 
return GenerateAbsoluteObservable;
}(ObservableBase));
 
/**
* GenerateAbsolutes an observable sequence by iterating a state from an initial state until the condition fails.
*
* @example
* res = source.generateWithAbsoluteTime(0,
* function (x) { return return true; },
* function (x) { return x + 1; },
* function (x) { return x; },
* function (x) { return new Date(); }
* });
*
* @param {Mixed} initialState Initial state.
* @param {Function} condition Condition to terminate generation (upon returning false).
* @param {Function} iterate Iteration step function.
* @param {Function} resultSelector Selector function for results produced in the sequence.
* @param {Function} timeSelector Time selector function to control the speed of values being produced each iteration, returning Date values.
* @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not specified, the timeout scheduler is used.
* @returns {Observable} The generated sequence.
*/
Function `generateWithAbsoluteTime` has 6 arguments (exceeds 4 allowed). Consider refactoring.
Observable.generateWithAbsoluteTime = function (initialState, condition, iterate, resultSelector, timeSelector, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new GenerateAbsoluteObservable(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
};
 
Function `GenerateRelativeObservable` has 41 lines of code (exceeds 25 allowed). Consider refactoring.
var GenerateRelativeObservable = (function (__super__) {
inherits(GenerateRelativeObservable, __super__);
Function `GenerateRelativeObservable` has 6 arguments (exceeds 4 allowed). Consider refactoring.
function GenerateRelativeObservable(state, cndFn, itrFn, resFn, timeFn, s) {
this._state = state;
this._cndFn = cndFn;
this._itrFn = itrFn;
this._resFn = resFn;
this._timeFn = timeFn;
this._s = s;
__super__.call(this);
}
 
function scheduleRecursive(state, recurse) {
state.hasResult && state.o.onNext(state.result);
 
if (state.first) {
state.first = false;
} else {
state.newState = tryCatch(state.self._itrFn)(state.newState);
if (state.newState === errorObj) { return state.o.onError(state.newState.e); }
}
 
state.hasResult = tryCatch(state.self._cndFn)(state.newState);
if (state.hasResult === errorObj) { return state.o.onError(state.hasResult.e); }
if (state.hasResult) {
state.result = tryCatch(state.self._resFn)(state.newState);
if (state.result === errorObj) { return state.o.onError(state.result.e); }
var time = tryCatch(state.self._timeFn)(state.newState);
if (time === errorObj) { return state.o.onError(time.e); }
recurse(state, time);
} else {
state.o.onCompleted();
}
}
 
GenerateRelativeObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
self: this,
newState: this._state,
first: true,
hasResult: false
};
return this._s.scheduleRecursiveFuture(state, 0, scheduleRecursive);
};
 
return GenerateRelativeObservable;
}(ObservableBase));
 
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
*
* @example
* res = source.generateWithRelativeTime(0,
* function (x) { return return true; },
* function (x) { return x + 1; },
* function (x) { return x; },
* function (x) { return 500; }
* );
*
* @param {Mixed} initialState Initial state.
* @param {Function} condition Condition to terminate generation (upon returning false).
* @param {Function} iterate Iteration step function.
* @param {Function} resultSelector Selector function for results produced in the sequence.
* @param {Function} timeSelector Time selector function to control the speed of values being produced each iteration, returning integer values denoting milliseconds.
* @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not specified, the timeout scheduler is used.
* @returns {Observable} The generated sequence.
*/
Function `generateWithRelativeTime` has 6 arguments (exceeds 4 allowed). Consider refactoring.
Observable.generateWithRelativeTime = function (initialState, condition, iterate, resultSelector, timeSelector, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new GenerateRelativeObservable(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
};
 
var DelaySubscription = (function(__super__) {
inherits(DelaySubscription, __super__);
function DelaySubscription(source, dt, s) {
this.source = source;
this._dt = dt;
this._s = s;
__super__.call(this);
}
 
DelaySubscription.prototype.subscribeCore = function (o) {
var d = new SerialDisposable();
 
d.setDisposable(this._s.scheduleFuture([this.source, o, d], this._dt, scheduleMethod));
 
return d;
};
 
function scheduleMethod(s, state) {
var source = state[0], o = state[1], d = state[2];
d.setDisposable(source.subscribe(o));
}
 
return DelaySubscription;
}(ObservableBase));
 
/**
* Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
*
* @example
* 1 - res = source.delaySubscription(5000); // 5s
* 2 - res = source.delaySubscription(5000, Rx.Scheduler.default); // 5 seconds
*
* @param {Number} dueTime Relative or absolute time shift of the subscription.
* @param {Scheduler} [scheduler] Scheduler to run the subscription delay timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Time-shifted sequence.
*/
observableProto.delaySubscription = function (dueTime, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new DelaySubscription(this, dueTime, scheduler);
};
 
var SkipLastWithTimeObservable = (function (__super__) {
inherits(SkipLastWithTimeObservable, __super__);
function SkipLastWithTimeObservable(source, d, s) {
this.source = source;
this._d = d;
this._s = s;
__super__.call(this);
}
 
SkipLastWithTimeObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new SkipLastWithTimeObserver(o, this));
};
 
return SkipLastWithTimeObservable;
}(ObservableBase));
 
var SkipLastWithTimeObserver = (function (__super__) {
inherits(SkipLastWithTimeObserver, __super__);
 
function SkipLastWithTimeObserver(o, p) {
this._o = o;
this._s = p._s;
this._d = p._d;
this._q = [];
__super__.call(this);
}
 
SkipLastWithTimeObserver.prototype.next = function (x) {
var now = this._s.now();
this._q.push({ interval: now, value: x });
while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
this._o.onNext(this._q.shift().value);
}
};
SkipLastWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
SkipLastWithTimeObserver.prototype.completed = function () {
var now = this._s.now();
while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
this._o.onNext(this._q.shift().value);
}
this._o.onCompleted();
};
 
return SkipLastWithTimeObserver;
}(AbstractObserver));
 
/**
* Skips elements for the specified duration from the end of the observable source sequence, using the specified scheduler to run timers.
* @description
* This operator accumulates a queue with a length enough to store elements received during the initial duration window.
* As more elements are received, elements older than the specified duration are taken from the queue and produced on the
* result sequence. This causes elements to be delayed with duration.
* @param {Number} duration Duration for skipping elements from the end of the sequence.
* @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout
* @returns {Observable} An observable sequence with the elements skipped during the specified duration from the end of the source sequence.
*/
observableProto.skipLastWithTime = function (duration, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new SkipLastWithTimeObservable(this, duration, scheduler);
};
 
var TakeLastWithTimeObservable = (function (__super__) {
inherits(TakeLastWithTimeObservable, __super__);
function TakeLastWithTimeObservable(source, d, s) {
this.source = source;
this._d = d;
this._s = s;
__super__.call(this);
}
 
TakeLastWithTimeObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new TakeLastWithTimeObserver(o, this._d, this._s));
};
 
return TakeLastWithTimeObservable;
}(ObservableBase));
 
var TakeLastWithTimeObserver = (function (__super__) {
inherits(TakeLastWithTimeObserver, __super__);
 
function TakeLastWithTimeObserver(o, d, s) {
this._o = o;
this._d = d;
this._s = s;
this._q = [];
__super__.call(this);
}
 
TakeLastWithTimeObserver.prototype.next = function (x) {
var now = this._s.now();
this._q.push({ interval: now, value: x });
while (this._q.length > 0 && now - this._q[0].interval >= this._d) {
this._q.shift();
}
};
TakeLastWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
TakeLastWithTimeObserver.prototype.completed = function () {
var now = this._s.now();
while (this._q.length > 0) {
var next = this._q.shift();
if (now - next.interval <= this._d) { this._o.onNext(next.value); }
}
this._o.onCompleted();
};
 
return TakeLastWithTimeObserver;
}(AbstractObserver));
 
/**
* Returns elements within the specified duration from the end of the observable source sequence, using the specified schedulers to run timers and to drain the collected elements.
* @description
* This operator accumulates a queue with a length enough to store elements received during the initial duration window.
* As more elements are received, elements older than the specified duration are taken from the queue and produced on the
* result sequence. This causes elements to be delayed with duration.
* @param {Number} duration Duration for taking elements from the end of the sequence.
* @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
* @returns {Observable} An observable sequence with the elements taken during the specified duration from the end of the source sequence.
*/
observableProto.takeLastWithTime = function (duration, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new TakeLastWithTimeObservable(this, duration, scheduler);
};
 
/**
* Returns an array with the elements within the specified duration from the end of the observable source sequence, using the specified scheduler to run timers.
* @description
* This operator accumulates a queue with a length enough to store elements received during the initial duration window.
* As more elements are received, elements older than the specified duration are taken from the queue and produced on the
* result sequence. This causes elements to be delayed with duration.
* @param {Number} duration Duration for taking elements from the end of the sequence.
* @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
* @returns {Observable} An observable sequence containing a single array with the elements taken during the specified duration from the end of the source sequence.
*/
observableProto.takeLastBufferWithTime = function (duration, scheduler) {
var source = this;
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new AnonymousObservable(function (o) {
var q = [];
return source.subscribe(function (x) {
var now = scheduler.now();
q.push({ interval: now, value: x });
while (q.length > 0 && now - q[0].interval >= duration) {
q.shift();
}
}, function (e) { o.onError(e); }, function () {
var now = scheduler.now(), res = [];
while (q.length > 0) {
var next = q.shift();
now - next.interval <= duration && res.push(next.value);
}
o.onNext(res);
o.onCompleted();
});
}, source);
};
 
var TakeWithTimeObservable = (function (__super__) {
inherits(TakeWithTimeObservable, __super__);
function TakeWithTimeObservable(source, d, s) {
this.source = source;
this._d = d;
this._s = s;
__super__.call(this);
}
 
function scheduleMethod(s, o) {
o.onCompleted();
}
 
TakeWithTimeObservable.prototype.subscribeCore = function (o) {
return new BinaryDisposable(
this._s.scheduleFuture(o, this._d, scheduleMethod),
this.source.subscribe(o)
);
};
 
return TakeWithTimeObservable;
}(ObservableBase));
 
/**
* Takes elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
*
* @example
* 1 - res = source.takeWithTime(5000, [optional scheduler]);
* @description
* This operator accumulates a queue with a length enough to store elements received during the initial duration window.
* As more elements are received, elements older than the specified duration are taken from the queue and produced on the
* result sequence. This causes elements to be delayed with duration.
* @param {Number} duration Duration for taking elements from the start of the sequence.
* @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
* @returns {Observable} An observable sequence with the elements taken during the specified duration from the start of the source sequence.
*/
observableProto.takeWithTime = function (duration, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new TakeWithTimeObservable(this, duration, scheduler);
};
 
var SkipWithTimeObservable = (function (__super__) {
inherits(SkipWithTimeObservable, __super__);
function SkipWithTimeObservable(source, d, s) {
this.source = source;
this._d = d;
this._s = s;
this._open = false;
__super__.call(this);
}
 
function scheduleMethod(s, self) {
self._open = true;
}
 
SkipWithTimeObservable.prototype.subscribeCore = function (o) {
return new BinaryDisposable(
this._s.scheduleFuture(this, this._d, scheduleMethod),
this.source.subscribe(new SkipWithTimeObserver(o, this))
);
};
 
return SkipWithTimeObservable;
}(ObservableBase));
 
var SkipWithTimeObserver = (function (__super__) {
inherits(SkipWithTimeObserver, __super__);
 
function SkipWithTimeObserver(o, p) {
this._o = o;
this._p = p;
__super__.call(this);
}
 
SkipWithTimeObserver.prototype.next = function (x) { this._p._open && this._o.onNext(x); };
SkipWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
SkipWithTimeObserver.prototype.completed = function () { this._o.onCompleted(); };
 
return SkipWithTimeObserver;
}(AbstractObserver));
 
/**
* Skips elements for the specified duration from the start of the observable source sequence, using the specified scheduler to run timers.
* @description
* Specifying a zero value for duration doesn't guarantee no elements will be dropped from the start of the source sequence.
* This is a side-effect of the asynchrony introduced by the scheduler, where the action that causes callbacks from the source sequence to be forwarded
* may not execute immediately, despite the zero due time.
*
* Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the duration.
* @param {Number} duration Duration for skipping elements from the start of the sequence.
* @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
* @returns {Observable} An observable sequence with the elements skipped during the specified duration from the start of the source sequence.
*/
observableProto.skipWithTime = function (duration, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new SkipWithTimeObservable(this, duration, scheduler);
};
 
var SkipUntilWithTimeObservable = (function (__super__) {
inherits(SkipUntilWithTimeObservable, __super__);
function SkipUntilWithTimeObservable(source, startTime, scheduler) {
this.source = source;
this._st = startTime;
this._s = scheduler;
__super__.call(this);
}
 
function scheduleMethod(s, state) {
state._open = true;
}
 
SkipUntilWithTimeObservable.prototype.subscribeCore = function (o) {
this._open = false;
return new BinaryDisposable(
this._s.scheduleFuture(this, this._st, scheduleMethod),
this.source.subscribe(new SkipUntilWithTimeObserver(o, this))
);
};
 
return SkipUntilWithTimeObservable;
}(ObservableBase));
 
var SkipUntilWithTimeObserver = (function (__super__) {
inherits(SkipUntilWithTimeObserver, __super__);
 
function SkipUntilWithTimeObserver(o, p) {
this._o = o;
this._p = p;
__super__.call(this);
}
 
SkipUntilWithTimeObserver.prototype.next = function (x) { this._p._open && this._o.onNext(x); };
SkipUntilWithTimeObserver.prototype.error = function (e) { this._o.onError(e); };
SkipUntilWithTimeObserver.prototype.completed = function () { this._o.onCompleted(); };
 
return SkipUntilWithTimeObserver;
}(AbstractObserver));
 
 
/**
* Skips elements from the observable source sequence until the specified start time, using the specified scheduler to run timers.
* Errors produced by the source sequence are always forwarded to the result sequence, even if the error occurs before the start time.
*
* @examples
* 1 - res = source.skipUntilWithTime(new Date(), [scheduler]);
* 2 - res = source.skipUntilWithTime(5000, [scheduler]);
* @param {Date|Number} startTime Time to start taking elements from the source sequence. If this value is less than or equal to Date(), no elements will be skipped.
* @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout.
* @returns {Observable} An observable sequence with the elements skipped until the specified start time.
*/
observableProto.skipUntilWithTime = function (startTime, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return new SkipUntilWithTimeObservable(this, startTime, scheduler);
};
 
/**
* Takes elements for the specified duration until the specified end time, using the specified scheduler to run timers.
* @param {Number | Date} endTime Time to stop taking elements from the source sequence. If this value is less than or equal to new Date(), the result stream will complete immediately.
* @param {Scheduler} [scheduler] Scheduler to run the timer on.
* @returns {Observable} An observable sequence with the elements taken until the specified end time.
*/
observableProto.takeUntilWithTime = function (endTime, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
var source = this;
return new AnonymousObservable(function (o) {
return new BinaryDisposable(
scheduler.scheduleFuture(o, endTime, function (_, o) { o.onCompleted(); }),
source.subscribe(o));
}, source);
};
 
return Rx;
}));