src/core/backpressure/controlled.js
Similar blocks of code found in 3 locations. Consider refactoring. var ControlledObservable = (function (__super__) { inherits(ControlledObservable, __super__); function ControlledObservable (source, enableQueue, scheduler) { __super__.call(this); this.subject = new ControlledSubject(enableQueue, scheduler); this.source = source.multicast(this.subject).refCount(); } ControlledObservable.prototype._subscribe = function (o) { return this.source.subscribe(o); }; ControlledObservable.prototype.request = function (numberOfItems) { return this.subject.request(numberOfItems == null ? -1 : numberOfItems); }; return ControlledObservable; }(Observable)); Function `ControlledSubject` has a Cognitive Complexity of 31 (exceeds 5 allowed). Consider refactoring.
Function `ControlledSubject` has 84 lines of code (exceeds 25 allowed). Consider refactoring.
Similar blocks of code found in 3 locations. Consider refactoring. var ControlledSubject = (function (__super__) { inherits(ControlledSubject, __super__); function ControlledSubject(enableQueue, scheduler) { enableQueue == null && (enableQueue = true); __super__.call(this); this.subject = new Subject(); this.enableQueue = enableQueue; this.queue = enableQueue ? [] : null; this.requestedCount = 0; this.requestedDisposable = null; this.error = null; this.hasFailed = false; this.hasCompleted = false; this.scheduler = scheduler || currentThreadScheduler; } addProperties(ControlledSubject.prototype, Observer, { _subscribe: function (o) { return this.subject.subscribe(o); }, onCompleted: function () { this.hasCompleted = true; if (!this.enableQueue || this.queue.length === 0) { this.subject.onCompleted(); this.disposeCurrentRequest(); } else { this.queue.push(Notification.createOnCompleted()); } }, onError: function (error) { this.hasFailed = true; this.error = error; if (!this.enableQueue || this.queue.length === 0) { this.subject.onError(error); this.disposeCurrentRequest(); } else { this.queue.push(Notification.createOnError(error)); } }, onNext: function (value) { if (this.requestedCount <= 0) { this.enableQueue && this.queue.push(Notification.createOnNext(value)); } else { (this.requestedCount-- === 0) && this.disposeCurrentRequest(); this.subject.onNext(value); } }, _processRequest: function (numberOfItems) { if (this.enableQueue) { while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) { var first = this.queue.shift(); first.accept(this.subject); if (first.kind === 'N') { numberOfItems--; } else { this.disposeCurrentRequest(); this.queue = []; } } } return numberOfItems; }, request: function (number) { this.disposeCurrentRequest(); var self = this; this.requestedDisposable = this.scheduler.schedule(number, function(s, i) { var remaining = self._processRequest(i); var stopped = self.hasCompleted || self.hasFailed; if (!stopped && remaining > 0) { self.requestedCount = remaining; return disposableCreate(function () { self.requestedCount = 0; }); // Scheduled item is still in progress. Return a new // disposable to allow the request to be interrupted // via dispose. } }); return this.requestedDisposable; }, disposeCurrentRequest: function () { if (this.requestedDisposable) { this.requestedDisposable.dispose(); this.requestedDisposable = null; } } }); return ControlledSubject; }(Observable)); /** * Attaches a controller to the observable sequence with the ability to queue. * @example * var source = Rx.Observable.interval(100).controlled(); * source.request(3); // Reads 3 values * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request * @param {Scheduler} scheduler determines how the requests will be scheduled * @returns {Observable} The observable sequence which only propagates values on request. */ observableProto.controlled = function (enableQueue, scheduler) { if (enableQueue && isScheduler(enableQueue)) { scheduler = enableQueue; enableQueue = true; } if (enableQueue == null) { enableQueue = true; } return new ControlledObservable(this, enableQueue, scheduler); };