modules/rx-lite-backpressure-compat/rx.lite.backpressure.compat.js
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. Similar blocks of code found in 2 locations. Consider refactoring.;(function (factory) { var objectTypes = { 'function': true, 'object': true }; function checkGlobal(value) { return (value && value.Object === Object) ? value : null; } var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); var freeSelf = checkGlobal(objectTypes[typeof self] && self); var freeWindow = checkGlobal(objectTypes[typeof window] && window); var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; var thisGlobal = checkGlobal(objectTypes[typeof this] && this); var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); // Because of build optimizers if (typeof define === 'function' && define.amd) { define(['./rx.lite.compat'], function (Rx, exports) { return factory(root, exports, Rx); }); } else if (typeof module === 'object' && module && module.exports === freeExports) { module.exports = factory(root, module.exports, require('rx-lite-compat')); } else { root.Rx = factory(root, {}, root.Rx); }}.call(this, function (root, exp, Rx, undefined) { // References var Observable = Rx.Observable, observableProto = Observable.prototype, AnonymousObservable = Rx.AnonymousObservable, AbstractObserver = Rx.internals.AbstractObserver, CompositeDisposable = Rx.CompositeDisposable, BinaryDisposable = Rx.BinaryDisposable, NAryDisposable = Rx.NAryDisposable, Notification = Rx.Notification, Subject = Rx.Subject, Observer = Rx.Observer, disposableEmpty = Rx.Disposable.empty, disposableCreate = Rx.Disposable.create, inherits = Rx.internals.inherits, addProperties = Rx.internals.addProperties, defaultScheduler = Rx.Scheduler['default'], currentThreadScheduler = Rx.Scheduler.currentThread, identity = Rx.helpers.identity, isScheduler = Rx.Scheduler.isScheduler, isFunction = Rx.helpers.isFunction, checkDisposed = Rx.Disposable.checkDisposed; var errorObj = {e: {}}; function tryCatcherGen(tryCatchTarget) { return function tryCatcher() { try { return tryCatchTarget.apply(this, arguments); } catch (e) { errorObj.e = e; return errorObj; } }; } var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } return tryCatcherGen(fn); }; function thrower(e) { throw e; } Function `StopAndWaitObservable` has 54 lines of code (exceeds 25 allowed). Consider refactoring. var StopAndWaitObservable = (function (__super__) { inherits(StopAndWaitObservable, __super__); function StopAndWaitObservable (source) { __super__.call(this); this.source = source; } function scheduleMethod(s, self) { return self.source.request(1); } StopAndWaitObservable.prototype._subscribe = function (o) { this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription)); return new BinaryDisposable( this.subscription, defaultScheduler.schedule(this, scheduleMethod) ); }; Function `StopAndWaitObserver` has 36 lines of code (exceeds 25 allowed). Consider refactoring. var StopAndWaitObserver = (function (__sub__) { inherits(StopAndWaitObserver, __sub__); function StopAndWaitObserver (observer, observable, cancel) { __sub__.call(this); this.observer = observer; this.observable = observable; this.cancel = cancel; this.scheduleDisposable = null; } StopAndWaitObserver.prototype.completed = function () { this.observer.onCompleted(); this.dispose(); }; StopAndWaitObserver.prototype.error = function (error) { this.observer.onError(error); this.dispose(); }; function innerScheduleMethod(s, self) { return self.observable.source.request(1); } StopAndWaitObserver.prototype.next = function (value) { this.observer.onNext(value); this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod); }; StopAndWaitObserver.dispose = function () { this.observer = null; if (this.cancel) { this.cancel.dispose(); this.cancel = null; } if (this.scheduleDisposable) { this.scheduleDisposable.dispose(); this.scheduleDisposable = null; } __sub__.prototype.dispose.call(this); }; return StopAndWaitObserver; }(AbstractObserver)); return StopAndWaitObservable; }(Observable)); /** * Attaches a stop and wait observable to the current observable. * @returns {Observable} A stop and wait observable. */ ControlledObservable.prototype.stopAndWait = function () { return new StopAndWaitObservable(this); }; Function `WindowedObservable` has 57 lines of code (exceeds 25 allowed). Consider refactoring. var WindowedObservable = (function (__super__) { inherits(WindowedObservable, __super__); function WindowedObservable(source, windowSize) { __super__.call(this); this.source = source; this.windowSize = windowSize; } function scheduleMethod(s, self) { return self.source.request(self.windowSize); } WindowedObservable.prototype._subscribe = function (o) { this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription)); return new BinaryDisposable( this.subscription, defaultScheduler.schedule(this, scheduleMethod) ); }; Function `WindowedObserver` has 38 lines of code (exceeds 25 allowed). Consider refactoring. var WindowedObserver = (function (__sub__) { inherits(WindowedObserver, __sub__); function WindowedObserver(observer, observable, cancel) { this.observer = observer; this.observable = observable; this.cancel = cancel; this.received = 0; this.scheduleDisposable = null; __sub__.call(this); } WindowedObserver.prototype.completed = function () { this.observer.onCompleted(); this.dispose(); }; WindowedObserver.prototype.error = function (error) { this.observer.onError(error); this.dispose(); }; function innerScheduleMethod(s, self) { return self.observable.source.request(self.observable.windowSize); } WindowedObserver.prototype.next = function (value) { this.observer.onNext(value); this.received = ++this.received % this.observable.windowSize; this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod)); }; WindowedObserver.prototype.dispose = function () { this.observer = null; if (this.cancel) { this.cancel.dispose(); this.cancel = null; } if (this.scheduleDisposable) { this.scheduleDisposable.dispose(); this.scheduleDisposable = null; } __sub__.prototype.dispose.call(this); }; return WindowedObserver; }(AbstractObserver)); return WindowedObservable; }(Observable)); /** * Creates a sliding windowed observable based upon the window size. * @param {Number} windowSize The number of items in the window * @returns {Observable} A windowed observable based upon the window size. */ ControlledObservable.prototype.windowed = function (windowSize) { return new WindowedObservable(this, windowSize); }; /** * Pipes the existing Observable sequence into a Node.js Stream. * @param {Stream} dest The destination Node.js stream. * @returns {Stream} The destination stream. */ observableProto.pipe = function (dest) { var source = this.pausableBuffered(); function onDrain() { source.resume(); } dest.addListener('drain', onDrain); source.subscribe( function (x) { !dest.write(x) && source.pause(); }, function (err) { dest.emit('error', err); }, function () { // Hack check because STDIO is not closable !dest._isStdio && dest.end(); dest.removeListener('drain', onDrain); }); source.resume(); return dest; }; return Rx;}));