ThinkDeepTech/thinkdeep

View on GitHub
packages/data-collector/src/client.js

Summary

Maintainability
A
1 hr
Test Coverage
import {attachExitHandler} from '@thinkdeep/attach-exit-handler';

/**
 * Client used to execute data collection tasks.
 */
class Client {
  /**
   * @param {Object} twitterClient - Twitter API v2 client. See: https://github.com/PLhery/node-twitter-api-v2
   * @param {Object} kafkaClient - KafkaJS client. See: https://github.com/tulios/kafkajs
   * @param {Object} logger - The logger to use.
   */
  constructor(twitterClient, kafkaClient, logger) {
    this._twitterClient = twitterClient;
    this._kafkaClient = kafkaClient;
    this._logger = logger;

    this._admin = this._kafkaClient.admin();
    this._producer = this._kafkaClient.producer();
  }

  /**
   * Connect to all the services on which the client depends.
   */
  async connect() {
    await attachExitHandler(async () => {
      this._logger.info('Disconnecting from kafka.');
      await this._admin.disconnect();
      await this._producer.disconnect();
    });

    this._logger.info('Connecting to kafka.');
    await this._admin.connect();
    await this._producer.connect();
  }

  /**
   * Fetch recent tweets from the twitter API.
   * @param {Object} query - Query to send to the API. I.e, { query: 'Google', max_results: 100}.
   */
  async fetchRecentTweets(query) {
    this._logger.debug(
      `Querying recent tweets. Query: ${JSON.stringify(query)}`
    );
    const response = await this._twitterClient.v2.get(
      'tweets/search/recent',
      query
    );
    return response.data;
  }

  /**
   * Emit the specified event into kafka.
   * @param {String} eventName - Name of the event.
   * @param {Object} data - The data associated with the event.
   */
  async emitEvent(eventName, data) {
    this._logger.debug(
      `Creating event ${eventName} in kafka if it does not already exist.`
    );
    try {
      const created = await this._admin.createTopics({
        waitForLeaders: true,
        topics: [
          {
            topic: eventName,
            replicationFactor: 1,
          },
        ],
      });
      this._logger.debug(
        created ? `Topic creation successful.` : `Topic already present.`
      );
    } catch (e) {
      this._logger.warn(
        `An error occurred while creating the topic: ${e.message.toString()}`
      );
    }

    this._logger.info(
      `Emitting event ${eventName}. Data: ${JSON.stringify(data)}`
    );
    await this._producer.send({
      topic: eventName,
      messages: [{value: JSON.stringify(data)}],
    });
  }
}

export {Client};