weacast/weacast-core

View on GitHub
src/mixins/mixin.refresh.js

Summary

Maintainability
A
0 mins
Test Coverage
D
63%
import path from 'path'
import fs from 'fs-extra'
import _ from 'lodash'
import moment from 'moment'
import request from 'request'
import errors from '@feathersjs/errors'
import logger from 'winston'
import makeDebug from 'debug'
import { Grid } from '../common/grid'
const debug = makeDebug('weacast:weacast-core')

export default {

  // Retrieve the path where downloaded/persited data are
  getDataDirectory () {
    return path.join(this.app.get('forecastPath'), this.forecast.name, this.element.name)
  },

  // Generate file name to store temporary output (i.e. converted) data, assume by default a similar name than getForecastTimeFilePath() with a json extension
  getForecastTimeConvertedFilePath (runTime, forecastTime) {
    let filePath = this.getForecastTimeFilePath(runTime, forecastTime)
    return path.join(path.dirname(filePath), path.basename(filePath, path.extname(filePath)) + '.json')
  },

  downloadForecastTime (runTime, forecastTime) {
    let promise = new Promise((resolve, reject) => {
      const filePath = this.getForecastTimeFilePath(runTime, forecastTime)
      if (fs.existsSync(filePath)) {
        logger.verbose('Already downloaded ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
        resolve(filePath)
        return
      }
      // Get request options
      logger.verbose('Downloading ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
      let errorMessage = 'Could not download ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format()
      // Get request options
      request.get(this.getForecastTimeRequest(runTime, forecastTime))
      .on('error', err => {
        logger.error(errorMessage, err)
        reject(err)
      })
      .on('timeout', err => {
        logger.error(errorMessage + ', provider timed out')
        reject(err)
      })
      .on('response', response => {
        if (response.statusCode !== 200) {
          errorMessage += ', provider responded with HTTP code ' + response.statusCode
          reject(errors.convert({
            name: response.statusCode,
            message: errorMessage
          }))
        } else {
          let file = fs.createWriteStream(filePath)
          response.pipe(file)
          .on('finish', _ => {
            file.close()
            logger.verbose('Written ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
            resolve(filePath)
          })
          .on('error', err => {
            logger.error(errorMessage + ', unable to write temporary file', err)
            debug('Output TIFF file was : ' + filePath)
            reject(err)
          })
        }
      })
    })

    return promise
  },

  async processForecastTime (runTime, forecastTime) {
    await this.downloadForecastTime(runTime, forecastTime)
    let grid = await this.convertForecastTime(runTime, forecastTime)
    if (this.element.dataStore === 'gridfs') {
      await this.saveToGridFS(this.getForecastTimeConvertedFilePath(runTime, forecastTime),
        { forecastTime: forecastTime.toDate() })
    }
    // Check for processing function
    if (typeof this.element.transform === 'function') {
      for (let i = 0; i < grid.length; i++) {
        grid[i] = this.element.transform({
          runTime, forecastTime, value: grid[i], index: i, data: grid, forecast: this.forecast, element: this.element
        })
      }
    }
    // Compute min/max values
    let forecast = Object.assign({
      runTime: runTime,
      forecastTime: forecastTime
    }, Grid.getMinMax(grid))
    // Depending on data storage if we keep files include a link to files in addition to data directly in the object
    if (this.isExternalDataStorage()) {
      Object.assign(forecast, {
        filePath: this.getForecastTimeFilePath(runTime, forecastTime),
        convertedFilePath: this.getForecastTimeConvertedFilePath(runTime, forecastTime)
      })
    }
    return Object.assign(forecast, { data: grid })
  },

  async updateForecastTimeInDatabase (forecast, previousForecast) {
    // Test if we have to remove existing data first, except if keeping all run times
    if (previousForecast && !this.forecast.keepPastRuns && !this.element.keepPastRuns) {
      await this.remove(null, {
        query: {
          forecastTime: forecast.forecastTime,
          geometry: { $exists: false } // Raw data doesn't have a geometry
        }
      })
      // Remove persistent file associated with data if any
      if (this.element.dataStore === 'fs') {
        fs.remove(previousForecast.convertedFilePath)
      } else if (this.element.dataStore === 'gridfs') {
        this.removeFromGridFS(previousForecast.convertedFilePath)
      }
    }

    // Do not store in-memory data if delegated to external storage
    const data = forecast.data
    if (this.isExternalDataStorage()) delete forecast.data
    let result = await this.create(forecast)
    // Save tiles if tiling is enabled
    if (this.forecast.tileResolution) {
      let grid = new Grid({
        bounds: this.forecast.bounds,
        origin: this.forecast.origin,
        size: this.forecast.size,
        resolution: this.forecast.resolution,
        data
      })
      let tiles = grid.tileset(this.forecast.tileResolution)
      tiles = tiles.map(tile =>
        Object.assign(Grid.toGeometry(tile.bounds),
                      tile,
                      Grid.getMinMax(tile.data),
                      _.pick(forecast, ['runTime', 'forecastTime']),
                      { timeseries: false }) // Tag this is not an aggregated tile
      )
      // Test if we have to remove existing data first, except if keeping all run times
      if (previousForecast && !this.forecast.keepPastRuns && !this.element.keepPastRuns) {
        await this.remove(null, {
          query: {
            forecastTime: forecast.forecastTime,
            geometry: { $exists: true } // Only tiles have a geometry
          }
        })
      }
      await this.create(tiles)
      // Do not keep track of all in-memory data
      tiles.forEach(tile => delete tile.data)
    }
    return result
  },

  async aggregateTiles () {
    const collection = this.Model
    const grid = new Grid({
      bounds: this.forecast.bounds,
      origin: this.forecast.origin,
      size: this.forecast.size,
      resolution: this.forecast.resolution
    })
    const { tilesetSize } = grid.getTiling(this.forecast.tileResolution)
    logger.verbose('Aggregating tiles for ' + this.forecast.name + '/' + this.element.name + ' forecast')
    // Iterate over tiles
    for (let j = 0; j < tilesetSize[1]; j++) {
      for (let i = 0; i < tilesetSize[0]; i++) {
        // Delete previous aggregated tile if any
        await this.remove(null, { query: { x: i, y: j, timeseries: true } })
        // Aggregate data over time for current tile
        let tiles = await collection.aggregate([{
          // Select only single and available data for current tile
          $match: { x: i, y: j, timeseries: false, data: { $exists: true } }
        }, {
          $group: {
            _id: { x: '$x', y: '$y' }, // Group by tile so that we get a single merged result
            forecastTime: { $push: '$forecastTime' }, // Keep track of all forecast times
            runTime: { $push: '$runTime' }, // Keep track of all run times
            data: { $push: '$data' }, // Accumulate data
            minValue: { $push: '$minValue' }, // Accumulate min
            maxValue: { $push: '$maxValue' }, // Accumulate max
            geometry: { $last: '$geometry' } // geometry is similar for all results, keep last
          }
        }]).toArray()
        if (tiles.length !== 1) {
          logger.error('Could not aggregate tiles for ' + this.forecast.name + '/' + this.element.name + ' forecast')
        } else {
          const tile = tiles[0]
          // Delete temporary tiles with a single forecast time
          await this.remove(null, { query: { x: i, y: j } })
          // Then save aggregated tile extracting x/y from group by ID and tagging as timeseries aggregation
          Object.assign(tile, tile._id, { timeseries: true })
          delete tile._id
          await this.create(tile)
        }
      }
    }
  },

  async refreshForecastTime (datetime, runTime, forecastTime) {
    // Retrieve last available forecast if any
    let query = {
      $select: ['_id', 'runTime', 'forecastTime'], // We only need object ID
      forecastTime
    }
    if (this.forecast.keepPastRuns || this.element.keepPastRuns) {
      query.runTime = runTime
    }
    let result = await this.find({
      query,
      paginate: false
    })

    let previousForecast = (result.length > 0 ? result[0] : null)
    // Check if we are already up-to-date
    if (previousForecast && runTime.isSameOrBefore(previousForecast.runTime)) {
      logger.verbose('Up-to-date ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format() + ', not looking further')
      return previousForecast
    }
    // Otherwise download and process data
    let forecast = await this.processForecastTime(runTime, forecastTime)
    forecast = await this.updateForecastTimeInDatabase(forecast, previousForecast)
    logger.verbose((previousForecast ? 'Updated ' : 'Created ') + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
    // Remove temporary file associated with data except when using fs data store
    // FIXME: trying to remove temporary files as soon as possible raises "EBUSY: resource busy or locked" because there is probably some async operation still running
    // For now we remove temporary files as a whole by removing the data dir on each update process of the element
    /*
    if (this.element.dataStore !== 'fs') {
      const filePath = this.getForecastTimeFilePath(runTime, forecastTime)
      const convertedFilePath = this.getForecastTimeConvertedFilePath(runTime, forecastTime)
      if (fs.existsSync(filePath)) fs.remove(filePath)
      if (fs.existsSync(convertedFilePath)) fs.remove(convertedFilePath)
    }
    */
    return forecast
  },

  async harvestForecastTime (datetime, runTime, forecastTime) {
    try {
      let result = await this.refreshForecastTime(datetime, runTime, forecastTime)
      // Do not keep track of all in-memory data
      delete result.data
      return result
    } catch (error) {
      // 404 might be 'normal' errors because some data are not available at the planned run time from meteo providers
      // or some might vary the time steps available in the forecast depending on the run
      if (!error || !error.code || error.code !== 404) {
        logger.error('Could not update ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
        logger.error(error.message)
      } else {
        logger.verbose('Could not update ' + this.forecast.name + '/' + this.element.name + ' forecast at ' + forecastTime.format() + ' for run ' + runTime.format())
        logger.verbose(error.message)
      }
      let previousRunTime = runTime.clone().subtract({ seconds: this.forecast.runInterval })
      // When data for current time is not available we might try previous data
      // check here that we go back until the configured limit
      // because otherwise this means there is a real problem with the provider and/or we will have outdated data
      if (datetime.diff(previousRunTime, 'seconds') > this.forecast.oldestRunInterval) {
        logger.verbose('Hit oldest run time limit ' + runTime.format() + ' on ' + this.forecast.name + '/' + this.element.name + ', there is a too much big gap in data from the provider')
        throw error
      } else {
        logger.verbose('Harvesting further run time ' + previousRunTime.format() + ' on ' + this.forecast.name + '/' + this.element.name)
        await this.harvestForecastTime(datetime, previousRunTime, forecastTime)
      }
    }
  },

  async refreshForecastData (datetime) {
    // Retrieve forecast or overriden element update options
    const interval = this.element.interval || this.forecast.interval
    // These ones can be 0 take care the way the test is written
    const lowerLimit = (_.has(this.element, 'lowerLimit') ? this.element.lowerLimit : this.forecast.lowerLimit)
    const upperLimit = (_.has(this.element, 'upperLimit') ? this.element.upperLimit : this.forecast.upperLimit)
    // Compute nearest run T0
    let runTime = this.getNearestRunTime(datetime)
    // We don't care about the past, however a forecast is still potentially valid at least until we reach the next one
    let lowerTime = datetime.clone().subtract({ seconds: interval })
    let times = []
    // Check for each forecast step if update is required
    for (let timeOffset = lowerLimit; timeOffset <= upperLimit; timeOffset += interval) {
      let forecastTime = runTime.clone().add({ seconds: timeOffset })
      let discard = false
      if (!this.forecast.keepPastForecasts) {
        discard = forecastTime.isBefore(lowerTime)
      }
      if (!discard) {
        try {
          times.push(await this.harvestForecastTime(datetime, runTime, forecastTime))
        } catch (error) {
          // This catch does not rethrow the error so that the update process will not stop and we try the next time
        }
      }
    }
    // Aggregate tiles to generate timeseries if anabled and tiling as well
    if (this.forecast.tileResolution && this.forecast.timeseries) {
      this.aggregateTiles()
    }
    return times
  },

  async updateForecastData () {
    // Avoid stacking updates
    if (this.updateRunning) {
      logger.info('Skipping forecast data update on ' + this.forecast.name + '/' + this.element.name + ' as previous one is not yet finished')
      return
    }
    this.updateRunning = true
    const now = moment.utc()
    logger.info('Checking for up-to-date forecast data on ' + this.forecast.name + '/' + this.element.name)
    // Make sure we've got somewhere to put data and clean it up if we only use file as a temporary data store
    let dataDir = this.getDataDirectory()
    if (this.element.dataStore === 'fs') {
      fs.ensureDirSync(dataDir)
    } else {
      fs.emptyDirSync(dataDir)
    }
    // Try data refresh for current time
    try {
      let times = await this.refreshForecastData(now)
      logger.info('Completed forecast data update on ' + this.forecast.name + '/' + this.element.name)
      this.updateRunning = false
      return times
    } catch (error) {
      logger.error('Forecast data update on ' + this.forecast.name + '/' + this.element.name + ' failed')
      this.updateRunning = false
      throw error
    }
  },

  async cleanForecastData () {
    // Avoid stacking cleanups
    if (this.cleanupRunning) {
      logger.info('Skipping forecast data cleanup on ' + this.forecast.name + '/' + this.element.name + ' as previous one is not yet finished')
      return
    }
    this.cleanupRunning = true
    const now = moment.utc()
    logger.info('Cleaning up forecast data on ' + this.forecast.name + '/' + this.element.name)
    // Try data cleanup for current time
    try {
      await this.cleanGridFS(now)
      logger.info('Completed forecast data cleanup on ' + this.forecast.name + '/' + this.element.name)
      this.cleanupRunning = false
    } catch (error) {
      logger.error('Forecast data cleanup on ' + this.forecast.name + '/' + this.element.name + ' failed')
      this.cleanupRunning = false
      throw error
    }
  }
}