GladysProject/Gladys

View on GitHub
server/lib/device/device.calculateAggregate.js

Summary

Maintainability
A
3 hrs
Test Coverage
const Promise = require('bluebird');
const path = require('path');
const { spawn } = require('child_process');
const logger = require('../../utils/logger');

const {
  DEVICE_FEATURE_STATE_AGGREGATE_TYPES,
  SYSTEM_VARIABLE_NAMES,
  DEFAULT_AGGREGATES_POLICY_IN_DAYS,
} = require('../../utils/constants');

const LAST_AGGREGATE_ATTRIBUTES = {
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.HOURLY]: 'last_hourly_aggregate',
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.DAILY]: 'last_daily_aggregate',
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.MONTHLY]: 'last_monthly_aggregate',
};

const AGGREGATES_POLICY_RETENTION_VARIABLES = {
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.HOURLY]: SYSTEM_VARIABLE_NAMES.DEVICE_STATE_HOURLY_AGGREGATES_RETENTION_IN_DAYS,
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.DAILY]: SYSTEM_VARIABLE_NAMES.DEVICE_STATE_DAILY_AGGREGATES_RETENTION_IN_DAYS,
  [DEVICE_FEATURE_STATE_AGGREGATE_TYPES.MONTHLY]:
    SYSTEM_VARIABLE_NAMES.DEVICE_STATE_MONTHLY_AGGREGATES_RETENTION_IN_DAYS,
};

const AGGREGATE_STATES_PER_INTERVAL = 100;

const SCRIPT_PATH = path.join(__dirname, 'device.calculcateAggregateChildProcess.js');

/**
 * @description Calculate Aggregates.
 * @param {string} [type] - Type of the aggregate.
 * @param {string} [jobId] - Id of the job in db.
 * @returns {Promise} - Resolve when finished.
 * @example
 * await calculateAggregate('monthly');
 */
async function calculateAggregate(type, jobId) {
  logger.info(`Calculating aggregates device feature state for interval ${type}`);
  // First we get the retention policy of this aggregates type
  let retentionPolicyInDays = await this.variable.getValue(AGGREGATES_POLICY_RETENTION_VARIABLES[type]);

  // if the setting exist, we parse it
  // otherwise, we take the default value
  if (retentionPolicyInDays) {
    retentionPolicyInDays = parseInt(retentionPolicyInDays, 10);
  } else {
    retentionPolicyInDays = DEFAULT_AGGREGATES_POLICY_IN_DAYS[type];
  }

  logger.debug(`Aggregates device feature state policy = ${retentionPolicyInDays} days`);

  const now = new Date();
  // the aggregate should be from this date at most
  const minStartFrom = new Date(new Date().setDate(now.getDate() - retentionPolicyInDays));

  let endAt;

  if (type === DEVICE_FEATURE_STATE_AGGREGATE_TYPES.MONTHLY) {
    endAt = new Date(now.getFullYear(), now.getMonth(), 1);
  } else if (type === DEVICE_FEATURE_STATE_AGGREGATE_TYPES.DAILY) {
    endAt = new Date(now.getFullYear(), now.getMonth(), now.getDate(), 0);
  } else if (type === DEVICE_FEATURE_STATE_AGGREGATE_TYPES.HOURLY) {
    endAt = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours(), 0);
  }

  const params = {
    AGGREGATE_STATES_PER_INTERVAL,
    DEVICE_FEATURE_STATE_AGGREGATE_TYPES,
    LAST_AGGREGATE_ATTRIBUTES,
    type,
    minStartFrom,
    endAt,
    jobId,
  };

  const promise = new Promise((resolve, reject) => {
    let err = '';
    const childProcess = spawn('node', [SCRIPT_PATH, JSON.stringify(params)]);

    childProcess.stdout.on('data', async (data) => {
      const text = data.toString();
      logger.debug(`device.calculateAggregate stdout: ${data}`);
      if (text && text.indexOf('updateProgress:') !== -1) {
        const position = text.indexOf('updateProgress:');
        const before = text.substr(position + 15);
        const splitted = before.split(':');
        const progress = parseInt(splitted[0], 10);
        if (!Number.isNaN(progress)) {
          await this.job.updateProgress(jobId, progress);
        }
      }
    });

    childProcess.stderr.on('data', (data) => {
      logger.warn(`device.calculateAggregate stderr: ${data}`);
      err += data;
    });

    childProcess.on('close', (code) => {
      if (code !== 0) {
        logger.warn(`device.calculateAggregate: Exiting child process with code ${code}`);
        const error = new Error(err);
        reject(error);
      } else {
        logger.info(`device.calculateAggregate: Finishing processing for interval ${type} `);
        resolve();
      }
    });
  });

  await promise;

  return null;
}

module.exports = {
  calculateAggregate,
};