ThinkDeepTech/thinkdeep

View on GitHub
packages/deep-microservice-analysis/src/analysis-service.js

Summary

Maintainability
B
6 hrs
Test Coverage
import {attachExitHandler} from '@thinkdeep/attach-exit-handler';
import {EconomicEntityFactory, validEconomicEntities} from '@thinkdeep/model';
import {validDate, validString} from '@thinkdeep/util';
import {hasReadAllAccess} from './permissions.js';

/**
 * Determine if a value is a valid end date.
 * @param {any} val
 * @return {Boolean} True if valid. False otherwise.
 */
const validEndDate = (val) => {
  return val === null || validDate(val);
};

/**
 * Service that applies business logic to data analysis operations for the application.
 */
class AnalysisService {
  /**
   * Business layer for analysis operations.
   *
   * NOTE: The parameters below are injected because that improves testability of the codebase.
   *
   * @param {Object} neo4jDataStore - Neo4j data store to use.
   * @param {Object} sentimentLib - Library to use for sentiment analysis. This is an instance of Sentiment from 'sentiment' package.
   * @param {Object} kafkaClient - KafkaJS client to use.
   * @param {Object} logger - Logger to use.
   */
  constructor(neo4jDataStore, sentimentLib, kafkaClient, logger) {
    this._neo4jDataStore = neo4jDataStore;
    this._sentimentLib = sentimentLib;
    this._kafkaClient = kafkaClient;

    this._admin = this._kafkaClient.admin();
    this._consumer = this._kafkaClient.consumer({
      groupId: 'deep-microservice-analysis-consumer',
    });
    this._producer = this._kafkaClient.producer();

    this._logger = logger;
  }

  /**
   * Connect the analysis service to underlying support systems.
   */
  async connect() {
    this._logger.info(`Connecting to analysis service support systems.`);

    await attachExitHandler(async () => {
      this._logger.info('Cleaning up kafka connections.');
      await this._admin.disconnect();
      await this._consumer.disconnect();
      await this._producer.disconnect();
    });

    this._logger.info('Connecting to Kafka.');
    await this._admin.connect();
    await this._consumer.connect();
    await this._producer.connect();

    await this._topicCreation([
      {topic: 'TWEETS_COLLECTED', replicationFactor: 1},
      {topic: 'SENTIMENT_COMPUTED', replicationFactor: 1},
    ]);

    this._logger.debug(`Subscribing to TWEETS_COLLECTED topic`);
    await this._consumer.subscribe({
      topic: 'TWEETS_COLLECTED',
      fromBeginning: true,
    });

    this._logger.debug(`Running consumer handlers on each message.`);
    await this._consumer.run({
      eachMessage: async ({message}) => {
        this._logger.debug(
          `Received kafka message: ${message.value.toString()}`
        );

        const eventData = JSON.parse(message.value.toString());

        const economicEntity = EconomicEntityFactory.get(
          eventData.economicEntity
        );
        const timeSeriesItems = eventData.timeSeriesItems;

        await this._computeSentiment(economicEntity, timeSeriesItems);
      },
    });
  }

  /**
   * Get the sentiments associated with the specified economic entity and type.
   *
   * @param {Array<Object>} economicEntities Array of objects of the form { name: <some business name>, type: <some entity type> }.
   * @param {String} startDate UTC date time.
   * @param {String} endDate UTC date time.
   * @param {Object} permissions Permissions for the user making the request.
   * @return {Array} The formatted sentiment objects in array form or [].
   */
  async sentiments(economicEntities, startDate, endDate, permissions) {
    if (!validEconomicEntities(economicEntities)) {
      throw new Error(`An invalid economic entity was received.`);
    }

    if (!validDate(startDate)) {
      throw new Error(`The start date ${startDate} is invalid.`);
    }

    if (!validEndDate(endDate)) {
      throw new Error(`The end date ${endDate} is invalid.`);
    }

    if (!hasReadAllAccess(permissions)) return [];

    const results = [];
    for (const economicEntity of economicEntities) {
      this._logger.info(
        `Querying sentiments for ${economicEntity.type} ${economicEntity.name}.`
      );

      results.push(
        await this._sentimentData(economicEntity, startDate, endDate)
      );
    }

    return results;
  }

  /**
   * Get the sentiment.
   * @param {Object} economicEntity Object of the form { name: <entity name>, type: <entity type> }.
   * @param {String} startDate UTC date time.
   * @param {String | null} endDate UTC date time.
   * @return {Object} Sentiment data.
   */
  async _sentimentData(economicEntity, startDate, endDate) {
    if (!validEconomicEntities([economicEntity])) {
      throw new Error(`An invalid economic entity was received.`);
    }

    if (!validDate(startDate)) {
      throw new Error(`The start date ${startDate} is invalid.`);
    }

    if (!validEndDate(endDate)) {
      throw new Error(`The end date ${endDate} is invalid.`);
    }

    return this._neo4jDataStore.readSentiments(
      economicEntity,
      startDate,
      endDate
    );
  }

  /**
   * Read the most recent sentiments.
   * @param {Array<Object>} economicEntities
   */
  async _mostRecentSentiments(economicEntities) {
    if (!validEconomicEntities(economicEntities)) {
      throw new Error(`An invalid economic entity was received.`);
    }

    const results = [];
    for (const economicEntity of economicEntities) {
      this._logger.debug(
        `Reading most recent sentiment for ${economicEntity.type} ${economicEntity.name}.`
      );

      results.push(
        await this._neo4jDataStore.readMostRecentSentiment(economicEntity)
      );
    }

    return results;
  }

  /**
   * Compute sentiment.
   *
   * NOTE: This sends a kafka event after sentiment computation.
   *
   * @param {Object} economicEntity Economic entity object.
   * @param {Array} datas Consists of objects of the form [{ utcDateTime: <Number>, tweets: [{ text: 'tweet text' }]}]
   */
  async _computeSentiment(economicEntity, datas) {
    if (!validEconomicEntities([economicEntity])) {
      throw new Error(`An invalid economic entity was received.`);
    }

    if (!this._validSentimentDatas(datas)) {
      this._logger.debug(
        `Invalid sentiment data received: ${JSON.stringify(datas)}`
      );
      return;
    }

    for (const data of datas) {
      for (const tweet of data.tweets || []) {
        const text = tweet.text || '';

        const sentiment = this._sentiment(text);

        this._logger.info(
          `
          Adding sentiment to graph for ${economicEntity.type} ${economicEntity.name}
          tweet ${text}
          comparative sentament ${sentiment.comparative}
          received ${data.utcDateTime}
          `
        );

        await this._neo4jDataStore.addSentiments(economicEntity, [
          {
            utcDateTime: data.utcDateTime,
            tweet: text,
            sentiment,
          },
        ]);
      }
    }

    const mostRecentSentiment = (
      await this._mostRecentSentiments([economicEntity])
    )[0][0];

    const event = {
      economicEntity,
      data: mostRecentSentiment,
    };

    const eventName = 'SENTIMENT_COMPUTED';

    await this._emit(eventName, event);
  }

  /**
   * Validate sentiment datas.
   * @param {Array} datas Consists of objects of the form [{ utcDateTime: <Number>, tweets: [{ text: 'tweet text' }]}]
   * @return {Boolean} True if valid. False otherwise.
   */
  _validSentimentDatas(datas) {
    if (!Array.isArray(datas) || datas.length <= 0) {
      return false;
    }

    for (const data of datas || []) {
      if (!validDate(data.utcDateTime)) {
        return false;
      }

      if (!Array.isArray(data.tweets) || data.tweets.length <= 0) {
        return false;
      }

      for (const tweet of data.tweets || []) {
        if (!tweet.text) {
          return false;
        }
      }
    }

    return true;
  }

  /**
   * Compute the sentiment.
   * @param {String} text Subject of the computation.
   * @return {Object} Sentiment object of the form { score: <number>, comparative: <number>, ...}.
   */
  _sentiment(text) {
    if (!validString(text)) {
      throw new Error(`The value ${text} is invalid.`);
    }

    return this._sentimentLib.analyze(text.toLowerCase());
  }

  /**
   * Create the specified topics.
   *
   * @param {Array<Object>} topics - Array consisting of topic objects from kafkajs.
   */
  async _topicCreation(topics) {
    if (!topics || !Array.isArray(topics) || topics.length <= 0) {
      throw new Error(`Valid topics are required.`);
    }

    try {
      this._logger.debug(`Creating topics ${JSON.stringify(topics)}`);
      await this._admin.createTopics({
        /**
         * NOTE: If you don't wait for leaders the system throws an error when trying to write to the topic if a leader
         * hasn't been selected.
         */
        waitForLeaders: true,
        topics,
      });
    } catch (error) {
      /** An error is thrown when the topic has already been created */
      this._logger.warn(
        `Creation of topics ${JSON.stringify(
          topics
        )} exited with error: ${JSON.stringify(
          error
        )}, message: ${error.message.toString()}`
      );
    }
  }

  /**
   * Emit a particular kafka event.
   * @param {String} eventName - Name of the event.
   * @param {Object} data - Event data to transmit.
   */
  async _emit(eventName, data) {
    if (!validString(eventName)) {
      throw new Error(`The event name ${eventName} is invalid.`);
    }

    this._logger.info(
      `Emitting event ${eventName} with data ${JSON.stringify(data)}`
    );
    await this._producer.send({
      topic: eventName,
      messages: [{value: JSON.stringify(data)}],
    });
  }
}

export {AnalysisService};