
View on GitHub


6 hrs
Test Coverage
/* jshint -W084 */
 * BufferedStream - A robust stream implementation for node.js and the browser
 * https://github.com/mjackson/bufferedstream
var bodec = require('bodec');
var d = require('describe-property');
var EventEmitter = require('events').EventEmitter;

 * The default maximum buffer size.
var DEFAULT_MAX_SIZE = Math.pow(2, 16); // 64k

var BaseClass, async;
if (typeof window === 'object') {
  BaseClass = EventEmitter;
  async = window.setTimeout;
} else {
  BaseClass = require('stream').Stream;
  async = process.nextTick;

function trackSource(dest) {
  dest.on('pipe', function (source) {
    if (dest._source)
      throw new Error('BufferedStream is already piped');

    dest._source = source;

    function cleanup() {
      dest._source = null;
      source.removeListener('error', cleanup);
      source.removeListener('end', cleanup);

    source.on('error', cleanup);
    source.on('end', cleanup);

function flushSoon(stream) {
  if (stream._flushing)

  stream._flushing = true;

  async(function tryToFlush() {
    if (stream.paused) {
      stream._flushing = false;


    if (stream.empty) {
      stream._flushing = false;
    } else {

function flush(stream) {
  if (!stream._chunks)

  var chunk;
  while (chunk = stream._chunks.shift()) {
    stream.size -= chunk.length;

    if (stream.encoding) {
      stream.emit('data', bodec.toString(chunk, stream.encoding));
    } else {
      stream.emit('data', chunk);

    // If the stream was paused in a data event handler, break.
    if (stream.paused)

  if (stream.ended) {
    if (!stream.paused) {
      stream._chunks = null;
  } else if (stream._wasFull && !stream.full) {
    stream._wasFull = false;

 * A robust stream implementation for node.js and the browser based on the
 * initial version of the stream API in Node.js.
 * The maxSize determines the number of bytes the buffer can hold before it is
 * considered "full". Defaults to 64k.
 * The source and sourceEncoding arguments may be used to easily wrap this
 * stream around another, or a simple string. If the source is another stream,
 * it is piped to this stream. If it's a string or binary data, it is used as
 * the entire contents of the stream.
 * NOTE: The maxSize is a soft limit that is only used to determine when calls
 * to write will return false, indicating to streams that are writing to this
 * stream that they should pause. In any case, calls to write will still append
 * to the buffer so that no data is lost.
function BufferedStream(maxSize, source, sourceEncoding) {
  if (!(this instanceof BufferedStream))
    return new BufferedStream(maxSize, source, sourceEncoding);


  if (typeof maxSize !== 'number') {
    sourceEncoding = source;
    source = maxSize;
    maxSize = DEFAULT_MAX_SIZE;

  // Public interface.
  this.maxSize = maxSize;
  this.size = 0;
  this.encoding = null;
  this.paused = true;
  this.ended = false;
  this.readable = true;
  this.writable = true;

  this._chunks = [];
  this._flushing = false;
  this._wasFull = false;
  this._source = null;


  if (source != null) {
    if (typeof source.pipe === 'function') {
      if (typeof source.resume === 'function')
        source.resume(); // Triggers "old mode" in node v2 streams.

    } else {
      this.end(source, sourceEncoding);

BufferedStream.prototype = Object.create(BaseClass.prototype, {

  constructor: d(BufferedStream),

   * A read-only property that is true if this stream has no data to emit.
  empty: d.gs(function () {
    return this._chunks == null || this._chunks.length === 0;

   * A read-only property that is true if this stream's buffer is full.
  full: d.gs(function () {
    return this.maxSize < this.size;

   * A read-only property that is true if this stream is currently receiving
   * data from another stream via pipe().
  piped: d.gs(function () {
    return this._source != null;

   * Sets this stream's encoding. If an encoding is set, this stream will emit
   * strings using that encoding. Otherwise, it emits binary objects.
   * Valid encodings are "hex", "base64", "utf8", and "utf-8".
  setEncoding: d(function (encoding) {
    this.encoding = encoding;

   * Prevents this stream from emitting data events until resume is called.
   * Note: This does not prevent writes to this stream.
  pause: d(function () {
    this.paused = true;

   * Resumes emitting data events.
  resume: d(function () {
    if (this.paused)

    this.paused = false;

   * Pipes all data in this stream through to the given destination stream.
   * By default the destination stream is ended when this one ends. Set the
   * "end" option to `false` to disable this behavior.
   * This function was copied out of node's lib/stream.js and modified for
   * use in other JavaScript environments.
  pipe: d(function (dest, options) {
    var source = this;

    function ondata(chunk) {
      if (dest.writable && false === dest.write(chunk))

    source.on('data', ondata);

    function ondrain() {
      if (source.readable)

    dest.on('drain', ondrain);

    var didOnEnd = false;
    function onend() {
      if (didOnEnd) return;
      didOnEnd = true;


    // If the 'end' option is not supplied, dest.end() will be called when
    // source gets the 'end' or 'close' events. Only dest.end() once.
    if (!dest._isStdio && (!options || options.end !== false))
      source.on('end', onend);

    // don't leave dangling pipes when there are errors.
    function onerror(error) {
      if (EventEmitter.listenerCount(this, 'error') === 0)
        throw error; // Unhandled stream error in pipe.

    source.on('error', onerror);
    dest.on('error', onerror);

    // remove all the event listeners that were added.
    function cleanup() {
      source.removeListener('data', ondata);
      dest.removeListener('drain', ondrain);

      source.removeListener('end', onend);

      source.removeListener('error', onerror);
      dest.removeListener('error', onerror);

      source.removeListener('end', cleanup);

    source.on('end', cleanup);
    dest.on('close', cleanup);

    dest.emit('pipe', source);

    // Mimic the behavior of node v2 streams where pipe() resumes the flow.
    // This lets us avoid having to do stream.resume() all over the place.

    // Allow for unix-like usage: A.pipe(B).pipe(C)
    return dest;

   * Writes the given chunk of data to this stream. Returns false if this
   * stream is full and should not be written to further until drained, true
   * otherwise.
  write: d(function (chunk) {
    if (!this.writable)
      throw new Error('BufferedStream is not writable');

    if (this.ended)
      throw new Error('BufferedStream is already ended');

    if (typeof chunk === 'string')
      chunk = bodec.fromString(chunk, arguments[1]);

    if (!bodec.isBinary(chunk))
      throw new Error('BufferedStream only accepts binary data');

    this.size += chunk.length;


    if (this.full) {
      this._wasFull = true;
      return false;

    return true;

   * Writes the given chunk to this stream and queues the end event to be
   * called as soon as soon as the stream is empty. Calling write() after
   * end() is an error.
  end: d(function (chunk) {
    if (this.ended)
      throw new Error('BufferedStream is already ended');

    if (chunk != null)
      this.write(chunk, arguments[1]);

    this.ended = true;

    // Trigger the flush cycle one last time to emit
    // any data that was written before end was called.


module.exports = BufferedStream;