snowplow/snowplow-javascript-tracker

View on GitHub
trackers/node-tracker/src/got_emitter.ts

Summary

Maintainability
C
1 day
Test Coverage
import util from 'util';
import got, { Response, RequestError, Agents, RequiredRetryOptions, ToughCookieJar, PromiseCookieJar } from 'got';
import { Payload, version } from '@snowplow/tracker-core';

import { Emitter, HttpProtocol, HttpMethod, preparePayload } from './emitter';

/**
 * Create an emitter object, which uses the `got` library, that will send events to a collector
 *
 * @param endpoint - The collector to which events will be sent
 * @param protocol - http or https
 * @param port - The port for requests to use
 * @param method - get or post
 * @param bufferSize - Number of events which can be queued before flush is called
 * @param retry - Configure the retry policy for `got` - https://github.com/sindresorhus/got/blob/v11.5.2/readme.md#retry
 * @param cookieJar - Add a cookieJar to `got` - https://github.com/sindresorhus/got/blob/v11.5.2/readme.md#cookiejar
 * @param callback - Callback called after a `got` request following retries - called with ErrorRequest (https://github.com/sindresorhus/got/blob/v11.5.2/readme.md#errors) and Response (https://github.com/sindresorhus/got/blob/v11.5.2/readme.md#response)
 * @param agents - Set new http.Agent and https.Agent objects on `got` requests - https://github.com/sindresorhus/got/blob/v11.5.2/readme.md#agent
 * @param serverAnonymization - If the request should undergo server anonymization.
 */
export function gotEmitter(
  endpoint: string,
  protocol: HttpProtocol = HttpProtocol.HTTPS,
  port?: number,
  method?: HttpMethod,
  bufferSize?: number,
  retry?: number | Partial<RequiredRetryOptions>,
  cookieJar?: PromiseCookieJar | ToughCookieJar,
  callback?: (error?: RequestError, response?: Response<string>) => void,
  agents?: Agents,
  serverAnonymization: boolean = false
): Emitter {
  const maxBufferLength = bufferSize ?? (method === HttpMethod.GET ? 0 : 10);
  const path = method === HttpMethod.GET ? '/i' : '/com.snowplowanalytics.snowplow/tp2';
  const targetUrl = protocol + '://' + endpoint + (port ? ':' + port : '') + path;
  const debuglog = util.debuglog('snowplow');

  let buffer: Array<Payload> = [];

  /**
   * Handles the callback on a successful response if the callback is present
   * @param response - The got response object
   */
  const handleSuccess = (response: Response<string>) => {
    if (callback) {
      try {
        callback(undefined, response);
      } catch (e) {
        debuglog('Error in callback after failure', e);
      }
    }
  };

  /**
   * Handles the callback on a failed request if the callback is present
   * @param error - The got error object
   */
  const handleFailure = (error: RequestError) => {
    if (callback) {
      try {
        callback(error);
      } catch (e) {
        debuglog('Error in callback after failure', e);
      }
    }
  };

  /**
   * Flushes all events currently stored in buffer
   */
  const flush = (): void => {
    const bufferCopy = buffer;
    buffer = [];
    if (bufferCopy.length === 0) {
      return;
    }

    const headers = {
      'user-agent': `snowplow-nodejs-tracker/${version}`,
      ...(serverAnonymization && { 'SP-Anonymous': '*' }),
      ...(method === HttpMethod.POST && { 'content-type': 'application/json; charset=utf-8' }),
    };

    if (method === HttpMethod.POST) {
      const postJson = {
        schema: 'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
        data: bufferCopy.map(preparePayload),
      };
      got
        .post(targetUrl, {
          json: postJson,
          agent: agents,
          headers,
          retry,
          cookieJar,
        })
        .then(handleSuccess, handleFailure);
    } else {
      for (let i = 0; i < bufferCopy.length; i++) {
        got
          .get(targetUrl, {
            searchParams: preparePayload(bufferCopy[i]),
            agent: agents,
            headers,
            retry,
            cookieJar,
          })
          .then(handleSuccess, handleFailure);
      }
    }
  };

  /**
   * Adds a payload to the internal buffer and sends if buffer >= bufferSize
   * @param payload - Payload to add to buffer
   */
  const input = (payload: Payload): void => {
    buffer.push(payload);
    if (buffer.length >= maxBufferLength) {
      flush();
    }
  };

  const setAnonymization = (shouldAnonymize: boolean) => {
    serverAnonymization = shouldAnonymize;
  };

  return {
    /**
     * Send all events queued in the buffer to the collector
     */
    flush,
    input,
    setAnonymization,
  };
}