haraka/email-message

View on GitHub
index.js

Summary

Maintainability
F
3 days
Test Coverage
'use strict';

const events = require('events');
const config = require('haraka-config');
const libmime = require('libmime');
const libqp = require('libqp');
const Stream = require('stream');
const Iconv = require('iconv').Iconv;

let logger;
try {
  logger = require('./logger');
} catch (ignore) {
  logger = {
    lognotice: console.log,
    logerror: console.error,
    logwarn: console.error,
  };
}

const buf_siz = config.get('mailparser.bufsize') || 65536;

// An RFC 2822 email header parser
/* eslint no-control-regex: 0 */
class Header {
  constructor(options) {
    this.headers = {};
    this.headers_decoded = {};
    this.header_list = [];
    this.options = options;
  }

  parse(lines) {
    for (const line of lines) {
      if (/^[ \t]/.test(line)) {
        // continuation
        this.header_list[this.header_list.length - 1] += line;
      } else {
        this.header_list.push(line);
      }
    }

    for (const header of this.header_list) {
      const match = header.match(/^([^\s:]*):\s*([\s\S]*)$/);
      if (match) {
        const key = match[1].toLowerCase();
        const val = match[2];

        this._add_header(key, val, 'push');
      } else {
        logger.lognotice(`Header did not look right: ${header}`);
      }
    }

    // Now add decoded versions
    for (const key of Object.keys(this.headers)) {
      for (const val of this.headers[key]) {
        this._add_header_decode(key, val, 'push');
      }
    }
  }

  decode_header(val) {
    // Fold continuations
    val = val.replace(/\r?\n/g, '');

    const rfc2231_params = {
      kv: {},
      keys: {},
      cur_key: '',
      cur_enc: '',
      cur_lang: '', // Secondary languages are ignored for our purposes
    };

    val = _decode_rfc2231(rfc2231_params, val);

    // strip 822 comments in the most basic way - does not support nested comments
    // val = val.replace(/\([^\)]*\)/, '');

    if (Iconv && !/^[\x00-\x7f]*$/.test(val)) {
      // 8 bit values in the header
      const matches = /\bcharset\s*=\s*["']?([\w_-]*)/.exec(
        this.get('content-type'),
      );
      if (matches && !/UTF-?8/i.test(matches[1])) {
        const encoding = matches[1];
        const source = Buffer.from(val, 'binary');
        val = try_convert(source, encoding).toString();
      }
    }

    if (!/=\?/.test(val)) {
      // no encoded stuff
      return val;
    }

    return (
      val
        // strip whitespace between encoded-words, rfc 2047 6.2
        .replace(/(=\?.+?\?=)\s+(?==\?.+?\?=)/g, '$1')
        // decode each encoded match
        .replace(
          /=\?([\w_-]+)(\*[\w_-]+)?\?([bqBQ])\?([\s\S]*?)\?=/g,
          _decode_header,
        )
    );
  }

  get(key) {
    return (this.headers[key.toLowerCase()] || []).join('\n');
  }

  get_all(key) {
    return Object.freeze([...(this.headers[key.toLowerCase()] || [])]);
  }

  get_decoded(key) {
    return (this.headers_decoded[key.toLowerCase()] || []).join('\n');
  }

  remove(key) {
    key = key.toLowerCase();
    delete this.headers[key];
    delete this.headers_decoded[key];

    this._remove_more(key);
  }

  _remove_more(key) {
    const key_len = key.length;
    for (let i = 0, l = this.header_list.length; i < l; i++) {
      if (
        this.header_list[i].substring(0, key_len + 1).toLowerCase() ===
        `${key}:`
      ) {
        this.header_list.splice(i, 1);
        return this._remove_more(key);
      }
    }
  }

  add(key, value) {
    if (!key) key = 'X-Haraka-Blank';
    value = value.replace(/(\r?\n)*$/, '');
    if (/[^\x00-\x7f]/.test(value)) {
      value = libmime.encodeWords(value, 'Q');
    }
    this._add_header(key.toLowerCase(), value, 'unshift');
    this._add_header_decode(key.toLowerCase(), value, 'unshift');
    this.header_list.unshift(`${key}: ${value}\n`);
  }

  _add_header(key, value, method) {
    this.headers[key] = this.headers[key] || [];
    this.headers[key][method](value);
  }

  _add_header_decode(key, value, method) {
    const val = this.decode_header(value);
    // console.log(key + ': ' + val);
    this.headers_decoded[key] = this.headers_decoded[key] || [];
    this.headers_decoded[key][method](val);
  }

  add_end(key, value) {
    if (!key) key = 'X-Haraka-Blank';
    value = value.replace(/(\r?\n)*$/, '');
    if (/[^\x00-\x7f]/.test(value)) {
      value = libmime.encodeWords(value, 'Q');
    }
    this._add_header(key.toLowerCase(), value, 'push');
    this._add_header_decode(key.toLowerCase(), value, 'push');
    this.header_list.push(`${key}: ${value}\n`);
  }

  lines() {
    return Object.freeze([...this.header_list]);
  }

  toString() {
    return this.header_list.join('\n');
  }
}

function try_convert(data, encoding) {
  try {
    const converter = new Iconv(encoding, 'UTF-8');
    data = converter.convert(data);
  } catch (err) {
    // TODO: raise a flag for this for possible scoring
    logger.logwarn(
      `initial iconv conversion from ${encoding} to UTF-8 failed: ${err.message}`,
    );
    if (err.code !== 'EINVAL') {
      try {
        const converter = new Iconv(encoding, 'UTF-8//TRANSLIT//IGNORE');
        data = converter.convert(data);
      } catch (e) {
        logger.logerror(`iconv from ${encoding} to UTF-8 failed: ${e.message}`);
      }
    }
  }

  return data;
}

function _decode_header(matched, encoding, lang, cte, data) {
  cte = cte.toUpperCase();

  switch (cte) {
    case 'Q':
      data = libqp.decode(data.replace(/_/g, ' '));
      break;
    case 'B':
      data = Buffer.from(data, 'base64');
      break;
    default:
      logger.logerror(`Invalid header encoding type: ${cte}`);
  }

  // convert with iconv if encoding != UTF-8
  if (Iconv && !/UTF-?8/i.test(encoding)) {
    data = try_convert(data, encoding);
  }

  return data.toString();
}

function _decode_rfc2231(params, str) {
  _parse_rfc2231(params, str);

  for (const key in params.keys) {
    str += ` ${key}="`;
    /* eslint no-constant-condition: 0 */
    let merged = '';
    for (let i = 0; true; i++) {
      const _key = `${key}*${i}`;
      const _val = params.kv[_key];
      if (_val === undefined) break;
      merged += _val;
    }

    try {
      merged = decodeURIComponent(merged);
    } catch (e) {
      logger.logerror(`Decode header failed: ${key}: ${merged}`);
    }
    merged = params.cur_enc ? try_convert(merged, params.cur_enc) : merged;

    str += `${merged}";`;
  }

  return str;
}

function _parse_rfc2231(params, str) {
  /*
    To explain the regexp below, the params are:

    parameter := attribute "=" value

    attribute := token
                 ; Matching of attributes
                 ; is ALWAYS case-insensitive.

    token := 1*<any (US-ASCII) CHAR except SPACE, CTLs,
                or tspecials>

    tspecials :=  "(" / ")" / "<" / ">" / "@" /
                  "," / ";" / ":" / "\" / <">
                  "/" / "[" / "]" / "?" / "="
                  ; Must be in quoted-string,
                  ; to use within parameter values
    */
  const sub_matches =
    /(([!#$%&'*+.0-9A-Zdiff^_`a-z{|}~-]*)\*)(\d*)=(\s*".*?[^\\]";?|\S*)/.exec(
      str,
    );
  if (!sub_matches) {
    return;
  }
  const key = sub_matches[1];
  let key_actual = sub_matches[2];
  let key_id = sub_matches[3] || '0';
  let value = sub_matches[4].replace(/;$/, '');

  str = str.replace(sub_matches[0], ''); // strip it out, so we move to next

  const key_extract = /^(.*?)(\*(\d+)\*)$/.exec(key);
  if (key_extract) {
    key_actual = key_extract[1];
    key_id = key_extract[3];
  }

  const quote = /^\s*"(.*)"$/.exec(value);
  if (quote) {
    value = quote[1];
  }

  const lang_match = /^(.*?)'(.*?)'(.*)/.exec(value);
  if (lang_match) {
    if (key_actual == params.cur_key && lang_match[2] != params.cur_lang) {
      return _parse_rfc2231(params, str); // same key, different lang, throw it away
    }
    params.cur_enc = lang_match[1];
    params.cur_lang = lang_match[2];
    value = lang_match[3];
  } else if (key_actual != params.cur_key) {
    params.cur_lang = '';
    params.cur_enc = '';
  }

  params.cur_key = key_actual;
  params.keys[key_actual] = '';
  params.kv[`${key_actual}*${key_id}`] = value;
  return _parse_rfc2231(params, str); // Get next one
}

// Mail Body Parser
class Body extends events.EventEmitter {
  constructor(header, options) {
    super();
    this.header = header || new Header();
    this.header_lines = [];
    this.is_html = false;
    this.options = options || {};
    this.filters = [];
    this.bodytext = '';

    // Caution: slice before using!  We build up data in this buffer, and
    // it always has extra space at the end.  Use
    // this.body_text_encoded.slice(0, this.body_text_encoded_pos).
    this.body_text_encoded = Buffer.alloc(buf_siz);
    this.body_text_encoded_pos = 0;

    this.body_encoding = null;
    this.boundary = null;
    this.ct = null;
    this.decode_function = null;
    this.children = []; // if multipart
    this.state = 'start';
    this.buf = Buffer.alloc(buf_siz);
    this.buf_fill = 0;
    this.decode_accumulator = '';
    this.decode_qp = (line) => libqp.decode(line.toString());
    this.decode_7bit = this.decode_8bit;
  }

  add_filter(filter) {
    this.filters.push(filter);
  }

  set_banner(banners) {
    this.add_filter((ct, enc, buf, cd) =>
      insert_banner(ct, enc, buf, cd, banners),
    );
  }

  parse_more(line) {
    // Ensure we're working in buffers, for the tests (transaction should
    // always pass buffers).
    if (!Buffer.isBuffer(line)) line = Buffer.from(line);

    return this[`parse_${this.state}`](line);
  }

  parse_child(line) {
    const line_string = line.toString();

    // check for MIME boundary
    if (
      line_string.substr(0, this.boundary.length + 2) === `--${this.boundary}`
    ) {
      line = this.children[this.children.length - 1].parse_end(line);

      if (line_string.substr(this.boundary.length + 2, 2) === '--') {
        // end
        this.state = 'end';
      } else {
        this.emit('mime_boundary', line_string);
        const bod = new Body(new Header(), this.options);
        for (const ln of [
          'attachment_start',
          'attachment_data',
          'attachment_end',
          'mime_boundary',
        ]) {
          for (const cb of this.listeners(ln)) {
            bod.on(ln, cb);
          }
        }
        for (const f of this.filters) {
          bod.add_filter(f);
        }
        this.children.push(bod);
        bod.state = 'headers';
      }
      return line;
    }
    // Pass data into last child
    return this.children[this.children.length - 1].parse_more(line);
  }

  parse_headers(line) {
    const line_string = line.toString();

    if (/^\s*$/.test(line_string)) {
      // end of headers
      this.header.parse(this.header_lines);
      delete this.header_lines;
      this.state = 'start';
    } else {
      this.header_lines.push(line_string);
    }
    return line;
  }

  parse_start(line) {
    const ct = this.header.get_decoded('content-type') || 'text/plain';
    let enc = this.header.get_decoded('content-transfer-encoding') || '8bit';
    const cd = this.header.get_decoded('content-disposition') || '';

    if (/text\/html/i.test(ct)) {
      this.is_html = true;
    }

    enc = enc.toLowerCase().split('\n').pop().trim();
    if (!enc.match(/^base64|quoted-printable|[78]bit$/i)) {
      logger.logwarn(`Invalid CTE on email: ${enc}, using 8bit`);
      enc = '8bit';
    }
    enc = enc.replace(/^quoted-printable$/i, 'qp');

    this.decode_function = this[`decode_${enc}`];
    if (!this.decode_function) {
      logger.logerror(`No decode function found for: ${enc}`);
      this.decode_function = this.decode_8bit;
    }
    this.ct = ct;

    const buildRegex = (key) => new RegExp(`${key}\\s*=\\s*"([^"]+)"|${key}\\s*=\\s*"?([^";]+)"?`, 'i');
    const matchKey = (test, key) => test.match(buildRegex(key))?.filter(item => item);

    let match;
    if (/^(?:text|message)\//i.test(ct) && !/^attachment/i.test(cd)) {
      this.state = 'body';
    } else if (/^multipart\//i.test(ct)) {
      match = matchKey(ct, 'boundary');
      this.boundary = match ? match[1] : '';
      this.state = 'multipart_preamble';
    } else {
      match = matchKey(cd, 'name');
      if (!match) {
        match = matchKey(ct, 'name');
      }
      const filename = match ? match[1] : '';
      this.attachment_stream = exports.createAttachmentStream(this.header);
      this.emit('attachment_start', ct, filename, this, this.attachment_stream);
      this.buf_fill = 0;
      this.state = 'attachment';
    }

    return this[`parse_${this.state}`](line);
  }

  _empty_filter(ct, enc, cd) {
    let new_buf = Buffer.from('');
    for (const filter of this.filters) {
      new_buf = filter(ct, enc, new_buf, cd) || new_buf;
    }

    return new_buf;
  }

  _emit_buf_fill() {
    if (this.buf_fill > 0) {
      // see below for why we create a new buffer here.
      const to_emit = Buffer.alloc(this.buf_fill);
      this.buf.copy(to_emit, 0, 0, this.buf_fill);
      this.attachment_stream.emit_data(to_emit);
      this.buf_fill = 0;
    }
  }

  force_end() {
    if (this.state === 'attachment') {
      this._emit_buf_fill();
      this.attachment_stream.emit_end(true);
    }
  }

  parse_end(line) {
    if (!line) {
      line = Buffer.from('');
    }

    if (this.state === 'attachment') {
      this._emit_buf_fill();
      this.attachment_stream.emit_end();
    }

    const ct = this.header.get_decoded('content-type') || 'text/plain';
    let enc = 'UTF-8';
    let pre_enc = '';
    const matches = /\bcharset\s*=\s*(?:"|3D|')?([\w_-]*)(?:"|3D|')?/.exec(ct);
    if (matches) {
      pre_enc = matches[1].trim();
      if (pre_enc.length > 0) {
        enc = pre_enc;
      }
    }
    this.body_encoding = enc;

    const cd = this.header.get_decoded('content-disposition') || '';
    if (!this.body_text_encoded_pos) {
      // nothing to decode
      return Buffer.concat([
        this._empty_filter(ct, enc, cd) || Buffer.from(''),
        line,
      ]);
    }
    if (this.bodytext.length !== 0) return line; // already decoded?

    let buf = this.decode_function(
      this.body_text_encoded.slice(0, this.body_text_encoded_pos),
    );

    if (this.filters.length) {
      // up until this point we've returned '' for line, so now we run
      // the filters and return the whole lot as one line, re-encoded using
      // whatever encoding scheme we used to decode it.

      let new_buf = buf;
      for (const filter of this.filters) {
        new_buf = filter(ct, enc, new_buf) || new_buf;
      }

      // convert back to base_64 or QP if required:
      if (this.decode_function === this.decode_qp) {
        line = Buffer.from(`${libqp.wrap(libqp.encode(new_buf))}\n${line}`);
      } else if (this.decode_function === this.decode_base64) {
        line = Buffer.from(
          new_buf.toString('base64').replace(/(.{1,76})/g, '$1\n') + line,
        );
      } else {
        line = Buffer.concat([new_buf, line]);
      }

      buf = new_buf;
    }

    // convert the buffer to UTF-8, stored in this.bodytext
    this.try_iconv(buf, enc);

    // delete this.body_text_encoded;
    return line;
  }

  try_iconv(buf, enc) {
    if (!Iconv) {
      this.body_encoding = 'no_iconv';
      this.bodytext = buf.toString();
      return;
    }

    if (/UTF-?8/i.test(enc)) {
      this.bodytext = buf.toString();
      return;
    }

    try {
      const converter = new Iconv(enc, 'UTF-8');
      this.bodytext = converter.convert(buf).toString();
    } catch (err) {
      logger.logwarn(
        `initial iconv conversion from ${enc} to UTF-8 failed: ${err.message}`,
      );
      this.body_encoding = `broken//${enc}`;
      // EINVAL is returned when the encoding type is not recognized/supported (e.g. ANSI_X3)
      if (err.code !== 'EINVAL') {
        // Perform the conversion again, but ignore any errors
        try {
          const converter = new Iconv(enc, 'UTF-8//TRANSLIT//IGNORE');
          this.bodytext = converter.convert(buf).toString();
        } catch (e) {
          logger.logwarn(
            `iconv conversion from ${enc} to UTF-8 failed: ${e.message}`,
          );
          this.bodytext = buf.toString();
        }
      }
    }
  }

  parse_body(line) {
    if (!Buffer.isBuffer(line)) line = Buffer.from(line);

    // Grow the body_text_encoded buffer if we need more space.  Doing this
    // instead of constant Buffer.concat()s means we allocate/copy way less
    // often.
    if (
      this.body_text_encoded_pos + line.length >
      this.body_text_encoded.length
    ) {
      let new_size = this.body_text_encoded.length * 2;
      while (this.body_text_encoded_pos + line.length > new_size) new_size *= 2;

      this.body_text_encoded = Buffer.alloc(
        new_size,
        this.body_text_encoded.slice(0, this.body_text_encoded_pos),
      );
    }

    line.copy(this.body_text_encoded, this.body_text_encoded_pos);
    this.body_text_encoded_pos += line.length;

    if (this.filters.length) return '';
    return line;
  }

  parse_multipart_preamble(line) {
    if (!this.boundary) return line;
    const line_string = line.toString();

    if (
      line_string.substr(0, this.boundary.length + 2) === `--${this.boundary}`
    ) {
      if (line_string.substr(this.boundary.length + 2, 2) === '--') {
        // end
      } else {
        // next section
        this.emit('mime_boundary', line_string);
        const bod = new Body(new Header(), this.options);
        for (const ln of ['attachment_start', 'mime_boundary']) {
          for (const cb of this.listeners(ln)) {
            bod.on(ln, cb);
          }
        }
        for (const f of this.filters) {
          bod.add_filter(f);
        }
        this.children.push(bod);
        bod.state = 'headers';
        this.state = 'child';
      }
      return line;
    }

    return line;
  }

  parse_attachment(line) {
    const line_string = line.toString();

    if (this.boundary) {
      if (
        line_string.substr(0, this.boundary.length + 2) === `--${this.boundary}`
      ) {
        if (line_string.substr(this.boundary.length + 2, 2) === '--') {
          // end
        } else {
          // next section
          this.state = 'headers';
        }
        return line;
      }
    }

    const buf = this.decode_function(line);
    if (buf.length + this.buf_fill > buf_siz) {
      // now we have to create a new buffer, because if we write this out
      // using async code, it will get overwritten under us. Creating a new
      // buffer eliminates that problem (at the expense of a malloc and a
      // memcpy())
      const to_emit = Buffer.alloc(this.buf_fill);
      this.buf.copy(to_emit, 0, 0, this.buf_fill);
      this.attachment_stream.emit_data(to_emit);
      if (buf.length > buf_siz) {
        // this is an unusual case - the base64/whatever data is larger
        // than our buffer size, so we just emit it and reset the counter.
        this.attachment_stream.emit_data(buf);
        this.buf_fill = 0;
      } else {
        buf.copy(this.buf);
        this.buf_fill = buf.length;
      }
    } else {
      buf.copy(this.buf, this.buf_fill);
      this.buf_fill += buf.length;
    }
    return line;
  }

  decode_base64(line) {
    // Remove all whitespace (such as newlines and errant spaces) from base64
    // before combining it with any previously unprocessed data.
    let to_process =
      this.decode_accumulator + line.toString().trim().replace(/[\s]+/g, '');

    // Sometimes base64 data lines will not be aligned with
    // byte boundaries. This is because each char in base64
    // represents 6 bits. 24 is the LCM between 6 and 8 bits.
    // (i.e. 4 * 6-bit chars === 3 * bytes)

    // As a result, 24 bits is our word boundary for base64.
    // Failure to align here will result in truncated/incorrect
    // node Buffers later on.

    // Walk back from the toProcess.length to the first
    // position that aligns with a 24-bit boundary.
    const emit_length = to_process.length - (to_process.length % 4);

    if (emit_length > 0) {
      const emit_now = to_process.substring(0, emit_length);
      this.decode_accumulator = to_process.substring(emit_length);
      return Buffer.from(emit_now, 'base64');
    } else {
      this.decode_accumulator = '';
      // This is the end of the base64 data, we don't really have enough bits
      // to fill up the bytes, but that's because we're on the last line, and ==
      // might have been elided.

      // In order to prevent any weird boundary issues, we'll re-pad
      // the string if there's any data left. As above, our target
      // is a 24-bit boundary, pad to 4 characters.
      while (to_process.length > 0 && to_process.length < 4) {
        to_process += '=';
      }
      return Buffer.from(to_process, 'base64');
    }
  }

  decode_8bit(line) {
    return Buffer.from(line, 'binary');
  }
}

exports.Body = Body;
exports.Header = Header;
exports.stream = require('haraka-message-stream');

function _get_html_insert_position(buf) {
  // otherwise, if we return -1 then the buf.copy will die with
  // RangeError: out of range index
  if (buf.length === 0) return 0;

  // TODO: consider re-writing this to go backwards from the end
  for (let i = 0, l = buf.length; i < l; i++) {
    if (buf[i] === 60 && buf[i + 1] === 47) {
      // found: "</"
      if (
        (buf[i + 2] === 98 || buf[i + 2] === 66) && // "b" or "B"
        (buf[i + 3] === 111 || buf[i + 3] === 79) && // "o" or "O"
        (buf[i + 4] === 100 || buf[i + 4] === 68) && // "d" or "D"
        (buf[i + 5] === 121 || buf[i + 5] === 89) && // "y" or "Y"
        buf[i + 6] === 62
      ) {
        // matched </body>
        return i;
      }
      if (
        (buf[i + 2] === 104 || buf[i + 2] === 72) && // "h" or "H"
        (buf[i + 3] === 116 || buf[i + 3] === 84) && // "t" or "T"
        (buf[i + 4] === 109 || buf[i + 4] === 77) && // "m" or "M"
        (buf[i + 5] === 108 || buf[i + 5] === 76) && // "l" or "L"
        buf[i + 6] === 62
      ) {
        // matched </html>
        return i;
      }
    }
  }
  return buf.length - 1; // default is at the end
}

function insert_banner(ct, enc, buf, cd, banners) {
  if (!banners || !/^text\//i.test(ct) || /^attachment/i.test(cd)) {
    return;
  }
  const is_html = /text\/html/i.test(ct);

  // First we convert the banner to the same encoding as the buf
  const banner_str = banners[is_html ? 1 : 0];
  let banner_buf = null;
  if (Iconv) {
    try {
      const converter = new Iconv('UTF-8', `${enc}//IGNORE`);
      banner_buf = converter.convert(banner_str);
    } catch (err) {
      logger.logerror(`iconv conversion of banner to ${enc} failed: ${err}`);
    }
  }

  if (!banner_buf) {
    banner_buf = Buffer.from(banner_str);
  }

  // Allocate a new buffer: (7 or 2 is <P>...</P> vs \n...\n - correct that if you change those!)
  const new_buf = Buffer.alloc(
    buf.length + banner_buf.length + (is_html ? 7 : 2),
  );

  // Now we find where to insert it and combine it with the original buf:
  if (is_html) {
    let insert_pos = _get_html_insert_position(buf);

    // copy start of buf into new_buf
    buf.copy(new_buf, 0, 0, insert_pos);

    // add in <P>
    new_buf[insert_pos++] = 60;
    new_buf[insert_pos++] = 80;
    new_buf[insert_pos++] = 62;

    // copy all of banner into new_buf
    banner_buf.copy(new_buf, insert_pos);

    // add in </P>
    new_buf[banner_buf.length + insert_pos++] = 60;
    new_buf[banner_buf.length + insert_pos++] = 47;
    new_buf[banner_buf.length + insert_pos++] = 80;
    new_buf[banner_buf.length + insert_pos++] = 62;

    // copy remainder of buf into new_buf, if there is buf remaining
    if (buf.length > insert_pos - 7) {
      buf.copy(new_buf, insert_pos + banner_buf.length, insert_pos - 7);
    }
  } else {
    buf.copy(new_buf);
    new_buf[buf.length] = 10; // \n
    banner_buf.copy(new_buf, buf.length + 1);
    new_buf[buf.length + banner_buf.length + 1] = 10; // \n
  }

  return new_buf;
}

class AttachmentStream extends Stream {
  constructor(header) {
    super();
    this.header = header;
    this.encoding = null;
    this.paused = false;
    this.end_emitted = false;
    this.connection = null;
    this.buffer = [];
  }

  emit_data(data) {
    // console.log("YYY: DATA emit");
    if (this.paused) {
      return this.buffer.push(data);
    }

    if (this.encoding) {
      this.emit('data', data.toString(this.encoding));
    } else {
      this.emit('data', data);
    }
  }

  emit_end(force) {
    if (this.paused && !force) {
      // console.log("YYY: end emit (cache)");
      this.end_emitted = true;
    } else {
      // console.log("YYY: end emit");
      if (this.buffer.length > 0) {
        while (this.buffer.length > 0) {
          // Don't use this.emit_data() here because we don't want to
          // re-buffer the data we're trying to emit, when we're
          // paused and forcing the end.
          const data = this.buffer.shift();
          if (this.encoding) {
            this.emit('data', data.toString(this.encoding));
          } else {
            this.emit('data', data);
          }
        }
      }
      this.emit('end');
    }
  }

  pipe(dest, options) {
    this.paused = false;

    const pipe = Stream.prototype.pipe.call(this, dest, options);

    dest.on('drain', () => {
      // console.log("YYY: DRAIN!!!");
      if (this.paused) this.resume();
    });
    dest.on('end', () => {
      // console.log("YYY: END!!");
      if (this.paused) this.resume();
    });
    dest.on('close', () => {
      // console.log("YYY: CLOSE!!");
      if (this.paused) this.resume();
    });

    return pipe;
  }

  setEncoding(enc) {
    if (enc !== 'binary') {
      throw 'Unable to set encoding to anything other than binary';
    }
    this.encoding = enc;
  }

  pause() {
    // console.log("YYY: PAUSE!!");
    this.paused = true;
    if (this.connection) {
      // console.log("YYYY: Backpressure pause");
      this.connection.pause();
    }
  }

  resume() {
    // console.log("YYY: RESUME!!");
    if (this.connection) {
      // console.log("YYYY: Backpressure resume");
      this.connection.resume();
    }
    this.paused = false;
    if (this.buffer.length) {
      while (this.paused === false && this.buffer.length > 0) {
        this.emit_data(this.buffer.shift());
      }
      if (this.buffer.length === 0 && this.end_emitted) {
        this.emit('end');
      }
    } else if (this.end_emitted) {
      this.emit('end');
    }
  }

  destroy() {
    // console.log("YYYY: Stream destroyed");
  }
}

exports.createAttachmentStream = (header) => new AttachmentStream(header);