src/core/linq/observable/expand.js
Function `ExpandObservable` has 46 lines of code (exceeds 25 allowed). Consider refactoring.
Function `ExpandObservable` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring. var ExpandObservable = (function(__super__) { inherits(ExpandObservable, __super__); function ExpandObservable(source, fn, scheduler) { this.source = source; this._fn = fn; this._scheduler = scheduler; __super__.call(this); } function scheduleRecursive(args, recurse) { var state = args[0], self = args[1]; var work; if (state.q.length > 0) { work = state.q.shift(); } else { state.isAcquired = false; return; } var m1 = new SingleAssignmentDisposable(); state.d.add(m1); m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1))); recurse([state, self]); } ExpandObservable.prototype._ensureActive = function (state) { var isOwner = false; if (state.q.length > 0) { isOwner = !state.isAcquired; state.isAcquired = true; } isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive)); }; ExpandObservable.prototype.subscribeCore = function (o) { var m = new SerialDisposable(), d = new CompositeDisposable(m), state = { q: [], m: m, d: d, activeCount: 0, isAcquired: false, o: o }; state.q.push(this.source); state.activeCount++; this._ensureActive(state); return d; }; return ExpandObservable; }(ObservableBase)); var ExpandObserver = (function(__super__) { inherits(ExpandObserver, __super__); function ExpandObserver(state, parent, m1) { this._s = state; this._p = parent; this._m1 = m1; __super__.call(this); } ExpandObserver.prototype.next = function (x) { this._s.o.onNext(x); var result = tryCatch(this._p._fn)(x); if (result === errorObj) { return this._s.o.onError(result.e); } this._s.q.push(result); this._s.activeCount++; this._p._ensureActive(this._s); }; ExpandObserver.prototype.error = function (e) { this._s.o.onError(e); }; ExpandObserver.prototype.completed = function () { this._s.d.remove(this._m1); this._s.activeCount--; this._s.activeCount === 0 && this._s.o.onCompleted(); }; return ExpandObserver; }(AbstractObserver)); /** * Expands an observable sequence by recursively invoking selector. * * @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again. * @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler. * @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion. */ observableProto.expand = function (selector, scheduler) { isScheduler(scheduler) || (scheduler = currentThreadScheduler); return new ExpandObservable(this, selector, scheduler); };