aaronjameslang/survey-monkey-streams

View on GitHub
src/SurveysResponsesBulkReader.ts

Summary

Maintainability
A
1 hr
Test Coverage
import { Readable, ReadableOptions } from "stream";
import ResponsesBulkReader from "./ResponsesBulkReader";
import SurveysReader, { RequestOptions } from "./SurveysReader";

class SurveysResponsesBulkReader extends Readable {
  private readonly readableOptions: ReadableOptions;
  private readonly requestOptions: RequestOptions;
  private readonly surveysReader: SurveysReader;
  private responsesBulkReader?: ResponsesBulkReader;
  private surveyProgressCount = 0;
  private surveyProgressTotal = 0;
  private surveysReaderEnded = false;
  /**
   * @example new SurveysResponsesBulkReader('152299598', {
   *   headers: { authorization: 'bearer xxxxx.yyyyy.zzzzz' }
   * })
   */
  public constructor(
    surveyRequestOptions: RequestOptions,
    responseRequestOptions?: RequestOptions,
    readableOptions: ReadableOptions = {},
  ) {
    super({ ...readableOptions, objectMode: true });
    this.requestOptions = responseRequestOptions || surveyRequestOptions;
    this.readableOptions = readableOptions;
    this.surveysReader = this.initSurveysReader(
      surveyRequestOptions,
      readableOptions,
    );
  }

  public initSurveysReader(
    requestOptions: RequestOptions,
    readableOptions: ReadableOptions,
  ) {
    return new SurveysReader(requestOptions, readableOptions)
      .on("data", (survey: { id: string }) => {
        this.surveysReader.pause();
        this.surveyProgressCount += 1;
        this.emit("survey", survey);
        this.initResponsesBulkReader(survey.id);
      })
      .on("page", (page) => {
        this.surveyProgressTotal = page.total;
        this.emit("page", page, "survey");
      })
      .on("end", () => {
        this.surveysReaderEnded = true;
      })
      .on("error", (error) => {
        this.emit("error", error);
      })
      .pause();
  }

  public initResponsesBulkReader(id: string) {
    if (this.responsesBulkReader) {
      throw new Error("Impossible");
    }
    this.responsesBulkReader = new ResponsesBulkReader(
      id,
      this.requestOptions,
      this.readableOptions,
    )
      .on("data", (response) => {
        const more = this.push(response);
        if (this.responsesBulkReader && !more) {
          this.responsesBulkReader.pause();
        } else if (!this.responsesBulkReader) {
          throw new Error("Impossible");
        }
      })
      .on("page", (page) => {
        this.emit("page", page, "response");
      })
      .on("progress", (progress) => {
        this.emit("progress", {
          response: progress,
          survey: {
            count: this.surveyProgressCount,
            total: this.surveyProgressTotal,
          },
        });
      })
      .on("end", () => {
        if (this.surveysReaderEnded) {
          this.emit("end");
        } else {
          delete this.responsesBulkReader;
          this.surveysReader.resume();
        }
      })
      .on("error", (error) => {
        this.emit("error", error);
      });
  }

  public _read() {
    if (this.responsesBulkReader) {
      this.responsesBulkReader.resume();
    } else if (!this.surveysReaderEnded) {
      this.surveysReader.resume();
    }
  }
}

export default SurveysResponsesBulkReader;
export { SurveysResponsesBulkReader, ReadableOptions, RequestOptions };