weacast/weacast

View on GitHub
packages/probe/src/services/probes/probes.service.js

Summary

Maintainability
C
1 day
Test Coverage
B
88%
import makeDebug from 'debug'
import _ from 'lodash'
import dot from 'dot-object'
import moment from 'moment'
import errors from '@feathersjs/errors'
import { Grid } from '@weacast/core'

const debug = makeDebug('weacast:weacast-probe:service')
const debugResults = makeDebug('weacast:weacast-probe:results')
const uComponentPrefix = 'u-'
const vComponentPrefix = 'v-'

function isDirectionElement (elementName) {
  const isUComponentOfDirection = elementName.startsWith(uComponentPrefix) // e.g. 'u-wind'
  const isVComponentOfDirection = elementName.startsWith(vComponentPrefix) // e.g. 'v-wind'
  return (isUComponentOfDirection || isVComponentOfDirection)
}

function getElementPrefix (elementName) {
  const isUComponentOfDirection = elementName.startsWith(uComponentPrefix) // e.g. 'u-wind'
  const isVComponentOfDirection = elementName.startsWith(vComponentPrefix) // e.g. 'v-wind'
  return isUComponentOfDirection ? uComponentPrefix : (isVComponentOfDirection ? vComponentPrefix : '') // e.g. 'u-' for u-wind'
}

function getElementSuffix (elementName) {
  // e.g. will generate 'wind-1000' for 'u-wind-1000'/'v-wind-1000'
  const suffix = elementName.replace(getElementPrefix(elementName), '')
  const index = suffix.indexOf('-')
  // e.g. will generate '-1000' for 'wind-1000'
  return (index >= 0 ? suffix.substring(index) : '')
}

function getDirectionElement (elementName) {
  // e.g. will generate 'wind' for 'u-wind'/'v-wind' or 'u-wind-1000'/'v-wind-1000'
  return elementName.replace(getElementPrefix(elementName), '').replace(getElementSuffix(elementName), '')
}

export default {

  // Update the given probe results (given as features)
  async updateFeaturesInDatabase (features, probe, elementService, forecast) {
    const { runTime, forecastTime } = forecast
    // Get the service to store results in
    const resultService = this.app.getService('probe-results')
    const operations = []
    const elementName = elementService.element.name
    const propertyName = 'properties.' + elementName
    const isComponentOfDirection = isDirectionElement(elementName)
    const directionElement = getDirectionElement(elementName) // e.g. will generate 'wind' for 'u-wind'/'v-wind'
    const suffix = getElementSuffix(elementName) // e.g. will generate '-1000' for 'u-wind-1000'/'v-wind-1000'
    // e.g. will generate '-1000' for 'u-wind-1000'/'v-wind-1000'
    const speedPropertyName = 'properties.' + directionElement + 'Speed' + suffix
    const directionPropertyName = 'properties.' + directionElement + 'Direction' + suffix
    const bearingPropertyName = 'properties.' + directionElement + 'BearingDirection' + suffix
    // We don't use service level operations like update to avoid concurrency issues,
    // eg see https://github.com/weacast/weacast-probe/issues/2.
    // We also want good performances so we use a bulkWrite
    for (let i = 0; i < features.length; i++) {
      const feature = features[i]
      // Check if something to store for the element
      if (_.has(feature, propertyName)) {
        const data = {
          [propertyName]: _.get(feature, propertyName)
        }
        // Update derived direction values as well in this case
        if (isComponentOfDirection) {
          if (_.has(feature, speedPropertyName)) data[speedPropertyName] = _.get(feature, speedPropertyName)
          if (_.has(feature, directionPropertyName)) data[directionPropertyName] = _.get(feature, directionPropertyName)
          if (_.has(feature, bearingPropertyName)) data[bearingPropertyName] = _.get(feature, bearingPropertyName)
        }

        // Already stored in DB ?
        if (feature._id) {
          debugResults('Updating probe result for probe ' + feature.probeId + ' at ' + forecastTime.format() +
                       ' on run ' + runTime.format(), feature)
          // Because we will not go through service hooks in this case we have to format dates to basic object types manually
          data.runTime = new Date(runTime.format())
          // Call service hooks (DB update is skipped when bulk param is used)
          // This can actually cause weird bugs like https://github.com/weacast/weacast-probe/issues/53
          // because it updates data in place to eg convert times to moment objects,
          // probably better to call hooks manually whenever required
          // await resultService.update(feature._id, data, { bulk: true })
          // Create bulk operation for update
          operations.push({
            updateOne: {
              filter: { _id: feature._id }, // In this case we query by ID for update
              update: { $set: data } // and indicate we'd like to patch some fields
            }
          })
        } else {
          debugResults('Inserting probe result for probe ' + probe._id + ' at ' + forecastTime.format() +
                       ' on run ' + runTime.format(), feature)
          // The base feature to insert is the one without the computed elements
          let baseFeature = _.omit(feature, Object.keys(data))
          // Because we will not go through service hooks in this case we have to format dates to basic object types manually
          Object.assign(baseFeature, {
            runTime: new Date(runTime.format()),
            forecastTime: new Date(forecastTime.format())
          })
          // MongoDB requires the dot notation here so we perform conversion
          // Take care that ObjectIDs are not basic types so we remove it before
          delete baseFeature.probeId
          // The same for coordinates array
          _.unset(baseFeature, 'geometry.coordinates')
          baseFeature = dot.dot(baseFeature)
          // And add it back after conversion
          baseFeature.probeId = probe._id
          baseFeature['geometry.coordinates'] = _.get(feature, 'geometry.coordinates')
          // Create bulk operation for insert or update
          const filter = { // In this case we query by forecastTime/runTime/probeId
            runTime: new Date(runTime.format()),
            forecastTime: new Date(forecastTime.format()),
            probeId: probe._id
          }
          // And by feature unique ID, unify as array because unique Id can be obtained by combining multiple properties
          let ids = probe.featureId
          if (!Array.isArray(ids)) {
            ids = [ids]
          }
          ids.forEach(id => {
            filter[id] = _.get(feature, id)
          })
          // Call service hooks (DB update is skipped when bulk param is used)
          // This can actually cause weird bugs like https://github.com/weacast/weacast-probe/issues/53
          // because it updates data in place to eg convert times to moment objects,
          // probably better to call hooks manually whenever required
          // await resultService.create(data, { bulk: true })
          operations.push({
            updateOne: {
              filter,
              upsert: true, // and indicate we'd like to create it if it does not already exist
              update: {
                $set: data, // and indicate we'd like to patch some fields if the probe already exists
                $setOnInsert: baseFeature // and indicate we will use the whole feature data on creation
              }
            }
          })
        }
      }
    }
    // Run DB updates
    try {
      const response = await resultService.Model.bulkWrite(operations)

      this.app.logger.verbose(`Produced ${response.upsertedCount + response.modifiedCount} results (${response.upsertedCount} creates - ${response.modifiedCount} updates)
        for probe ${probe._id} on element ${elementService.forecast.name + '/' + elementService.element.name}
        at ${forecastTime.format()} for run ${runTime.format()}`)

      if (response.hasWriteErrors()) response.getWriteErrors().forEach(error => console.log(error))
      return features
    } catch (error) {
      console.log(error)
      return []
    }
  },

  pushTime (feature, timeName, elementName, time, value) {
    // For each element we store the list of available times
    const times = _.get(feature, timeName + '.' + elementName)
    // Initialize
    if (!times) _.set(feature, timeName + '.' + elementName, [time.format()])
    else times.push(time.format())
    // And we store associated value using a map to avoid any operation order-dependence
    if (!_.isNil(value)) _.set(feature.properties, elementName + '.' + time.format(), value)
  },

  getValueAtTime (feature, elementName, time) {
    return _.get(feature.properties, elementName + '.' + (moment.isMoment(time) ? time.format() : time))
  },

  // Update the given features, for given probe, with interpolated values according to given forecast grid, run/forecast time
  async updateFeatures (features, probe, elementService, forecast) {
    const { runTime, forecastTime, grid } = forecast

    // Check if we have to manage a direction composed from two axis components
    const elementName = elementService.element.name
    const isComponentOfDirection = isDirectionElement(elementName)
    const directionElement = getDirectionElement(elementName) // e.g. will generate 'wind' for 'u-wind'/'v-wind'
    const suffix = getElementSuffix(elementName) // e.g. will generate '-1000' for 'u-wind-1000'/'v-wind-1000'
    // e.g. will generate '-1000' for 'u-wind-1000'/'v-wind-1000'
    const speedProperty = directionElement + 'Speed' + suffix
    const directionProperty = directionElement + 'Direction' + suffix
    // Check if a bearing property is given to compute direction relatively to
    const bearingProperty = directionElement + 'BearingProperty'
    const bearingPropertyName = _.has(probe, bearingProperty) ? _.get(probe, bearingProperty) : undefined
    const bearingDirectionProperty = directionElement + 'BearingDirection' + suffix

    features.forEach(feature => {
      // Check if we process on-demand probing for a time range
      const isTimeRange = (feature.forecastTime && !moment.isMoment(feature.forecastTime))
      if (probe._id) feature.probeId = probe._id
      const value = grid.interpolate(feature.geometry.coordinates[0], feature.geometry.coordinates[1])
      if (!_.isNil(value) && isFinite(value)) { // Prevent values outside grid bbox
        // Store interpolated element value
        if (isTimeRange) {
          this.pushTime(feature, 'forecastTime', elementName, forecastTime, value)
          this.pushTime(feature, 'runTime', elementName, runTime)
        } else {
          feature.forecastTime = forecastTime
          feature.runTime = runTime
          feature.properties[elementName] = value
        }
        // Update derived direction values as well in this case
        if (isComponentOfDirection) {
          let u, v
          if (isTimeRange) {
            u = this.getValueAtTime(feature, uComponentPrefix + directionElement + suffix, forecastTime)
            v = this.getValueAtTime(feature, vComponentPrefix + directionElement + suffix, forecastTime)
          } else {
            u = feature.properties[uComponentPrefix + directionElement + suffix]
            v = feature.properties[vComponentPrefix + directionElement + suffix]
          }
          // Only possible if both elements are already computed
          if (!_.isNil(u) && !_.isNil(v) && isFinite(u) && isFinite(v)) {
            // Compute direction expressed in meteorological convention, i.e. angle from which the flow comes
            const norm = Math.sqrt(u * u + v * v)
            let direction = 180.0 + Math.atan2(u, v) * 180.0 / Math.PI
            // Then store it
            // Check if we process on-demand probing for a time range
            if (isTimeRange) {
              this.pushTime(feature, 'forecastTime', speedProperty, forecastTime, norm)
              this.pushTime(feature, 'runTime', speedProperty, runTime)
              this.pushTime(feature, 'forecastTime', directionProperty, forecastTime, direction)
              this.pushTime(feature, 'runTime', directionProperty, runTime)
            } else {
              // Forecast/run time already set on feature for previous element values
              feature.properties[speedProperty] = norm
              feature.properties[directionProperty] = direction
            }
            // Compute bearing relatively to a bearing property if given
            if (bearingPropertyName) {
              const bearing = _.toNumber(feature.properties[bearingPropertyName])
              if (isFinite(bearing)) {
                // Take care that bearing uses the geographical convention, i.e. angle toward which the element goes,
                // we need to convert from meteorological convention, i.e. angle from which the flow comes
                direction += 180
                if (direction >= 360) direction -= 360
                direction -= bearing
                if (direction < 0) direction += 360
                // Then store it
                if (isTimeRange) {
                  this.pushTime(feature, 'forecastTime', bearingDirectionProperty, forecastTime, direction)
                  this.pushTime(feature, 'runTime', bearingDirectionProperty, runTime)
                } else {
                  // Forecast/run time already set on feature for previous element values
                  feature.properties[bearingDirectionProperty] = direction
                }
              }
            }
          }
        }
      }
    })
  },

  // Update the given features, for given probe, with interpolated values according to given forecast data, run/forecast time
  // If forecast data are not given they are retrieved from the existing data in DB
  async probeForecastTime (features, probe, elementService, forecast) {
    const { _id, x, y, runTime, forecastTime, data } = forecast
    // Retrieve forecast data if required
    let forecastData = data
    if (!forecastData) {
      const query = {
        $select: ['data']
      }
      debug('No forecast data provided for probe ' + (probe._id ? probe._id : 'on-demand') + ' on element ' + elementService.forecast.name + '/' + elementService.element.name +
            ' at ' + forecastTime.format() + ' on run ' + runTime.format() + ', querying existing one', query)
      // If we have an ID we will use it, otherwise request by forecast time
      if (_id) {
        const response = await elementService.get(_id.toString(), { query })
        forecastData = response.data
      } else {
        query.forecastTime = forecastTime
        if (elementService.forecast.keepPastRuns || elementService.element.keepPastRuns) {
          query.runTime = forecast.runTime
        }
        // Take care that we need tile ID for tiles
        if (!_.isNil(x) && !_.isNil(y)) {
          Object.assign(query, {
            x,
            y,
            timeseries: false // Probe single time not timeseries
          })
        }
        const response = await elementService.find({ query })
        forecastData = (response.data.length > 0 ? response.data[0].data : null)
      }
    }

    if (!forecastData) {
      throw new Error('Cannot retrieve forecast data for probe ' + (probe._id ? probe._id : 'on-demand') + ' on element ' + elementService.forecast.name + '/' + elementService.element.name +
                ' at ' + forecastTime.format() + ' for run ' + runTime.format())
    }
    // Check if we process a tile or raw data
    let grid
    if (!_.isNil(x) && !_.isNil(y)) {
      grid = new Grid({
        bounds: forecast.bounds,
        origin: forecast.origin,
        size: forecast.size,
        resolution: forecast.resolution,
        data: forecastData
      })
    } else {
      grid = new Grid({
        bounds: elementService.forecast.bounds,
        origin: elementService.forecast.origin,
        size: elementService.forecast.size,
        resolution: elementService.forecast.resolution,
        data: forecastData
      })
    }
    this.updateFeatures(features, probe, elementService, { runTime, forecastTime, grid })
  },

  async getResultsForProbe (probe, elementService, forecast) {
    // Get the service to read results in
    const resultService = this.app.getService('probe-results')
    const query = {
      forecastTime: forecast.forecastTime,
      probeId: probe._id
    }
    if (elementService.forecast.keepPastRuns || elementService.element.keepPastRuns) {
      query.runTime = forecast.runTime
    }
    const results = await resultService.find({
      paginate: false,
      query
    })
    return results
  },

  // Retrieve all element services required to update the given probe
  getElementServicesForProbe (probe) {
    // Retrieve target elements for all models or specified one
    let services = this.app.getElementServices(probe.forecast)
    services = services.filter(service => {
      return probe.elements.reduce((contains, element) => contains || (service.name === probe.forecast + '/' + element), false)
    })
    return services
  },

  getRefreshCallbackName (probe) {
    return 'refresh_probe_' + probe._id.toString()
  },

  // Register to updates on all element services required to update the given probe
  registerForecastUpdates (probe) {
    const refreshCallbackName = this.getRefreshCallbackName(probe)
    // Retrieve target elements
    const services = this.getElementServicesForProbe(probe)
    services.forEach(service => {
      const app = service.app
      const forecastName = service.forecast.name
      const elementName = service.element.name
      // Callback to be called (if not already registered)
      if (!_.has(service, refreshCallbackName)) {
        debug('No existing refresh callback for probe ' + probe._id.toString() + ' on element ' + forecastName + '/' + elementName + ', registering')
        // Internal callback
        const refreshCallback = async forecast => {
          // Do not process tiles but only raw data
          if (forecast.geometry) return
          // Find probe results associated to this forecast data set
          debug('Looking for existing results with probe ' + probe._id + ' for element ' + forecastName + '/' + elementName +
                ' at ' + forecast.forecastTime.format() + ' on run ' + forecast.runTime.format())
          try {
            let features = await this.getResultsForProbe(probe, service, forecast)
            // Possible on first probing
            if (features.length === 0) {
              const result = await this.get(probe._id, { query: { $select: ['forecast', 'elements', 'features'] } })
              features = result.features
            }
            this.app.logger.verbose('Probing forecast data for element ' + forecastName + '/' + elementName + ' at ' + forecast.forecastTime.format() + ' on run ' + forecast.runTime.format())
            await this.probeForecastTime(features, probe, service, forecast)
            await this.updateFeaturesInDatabase(features, probe, service, forecast)
            // Send a message so that clients know there are new results, indeed for performance reasons standard events have been disabled on results
            // Take care to not forward forecast data
            delete forecast.data
            this.emit('results', { probe, forecast })
          } catch (error) {
            this.app.logger.error(error.message)
          }
        }
        // External callback
        const syncRefreshCallback = async (forecast) => {
          // Need to convert from string to in-memory date objects
          forecast.runTime = moment.utc(forecast.runTime)
          forecast.forecastTime = moment.utc(forecast.forecastTime)
          // In this case we don't already have data in memory so it will be fetched
          await service[refreshCallbackName](forecast)
        }

        // Register for forecast data updates when using internal event systems
        service[refreshCallbackName] = refreshCallback
        service.on('created', service[refreshCallbackName])
        // Or external loaders if any
        if (app.sync) {
          app.sync.on(forecastName + '-' + elementName, syncRefreshCallback)
        }
      }
    })
  },

  // Unregister from updates on all element services required to update the given probe
  unregisterForecastUpdates (probe) {
    const refreshCallbackName = this.getRefreshCallbackName(probe)
    // Retrieve target elements
    const services = this.getElementServicesForProbe(probe)
    services.forEach(service => {
      // Unregister for forecast data update
      debug('Removing existing refresh callback for probe ' + probe._id.toString() + ' on element ' + service.forecast.name + '/' + service.element.name + ', registering')
      service.removeListener('created', service[refreshCallbackName])
    })
  },

  // Perform probing on the input features if any (on-demand probe), in this case the probing time(s) must be given
  // Otherwise features are retrieved from the existing probe in DB, probing performed for each available forecast time,
  // and result features updated back in DB (probing stream)
  // This also registers the probe to perform updates on results when new forecast data are coming
  async probe (probe, query = {}) {
    const forecastTime = query.forecastTime
    const isTimeRange = (forecastTime && (forecastTime.$lt || forecastTime.$lte || forecastTime.$gt || forecastTime.$gte))
    const aggregate = _.get(query, 'aggregate', true)
    const geometry = _.get(query, 'geometry.$geoIntersects.$geometry')
    // If querying at a specific location automatically generate the GeoJSON feature
    if (geometry) {
      Object.assign(probe, {
        type: 'FeatureCollection',
        features: [{
          type: 'Feature',
          properties: {},
          geometry
        }]
      })
    }
    if (!probe.type || probe.type !== 'FeatureCollection') {
      throw new errors.BadRequest('Only GeoJSON FeatureCollection layers are supported to create probes')
    }
    if (!probe.forecast) {
      throw new errors.BadRequest('Target forecast model not specified')
    }
    if (!probe.elements || probe.elements.length === 0) {
      throw new errors.BadRequest('Target forecast element(s) not specified')
    }
    // Probe streaming request a unique feature ID
    if (!forecastTime && !probe.featureId) {
      throw new errors.BadRequest('Unique identifier for probe features not specified')
    }

    // Register for forecast data updates on probing streams
    if (!forecastTime) {
      this.registerForecastUpdates(probe)
    } else {
      // Retrieve target elements
      const services = this.getElementServicesForProbe(probe)
      debug('Probing following services for on-demand probe', services.map(service => service.name))
      // When probing a location we use tiles, take care to use only single time tiles not aggregated if any
      const forecastQuery = (geometry ? { timeseries: false } : {})
      Object.assign(forecastQuery, _.omit(query, ['aggregate']))
      debug('Probing query', forecastQuery)
      // Then run all probes
      try {
        // Initialize features data
        const features = probe.features
        features.forEach(feature => {
          // If we have a time range for on-demand probing tag features using an array
          if (isTimeRange && !feature.forecastTime) feature.forecastTime = {}
          // Take care to initialize properties holder if not given in input feature
          if (!feature.properties) feature.properties = {}
        })
        for (const service of services) {
          // Will get all available forecast times (probing stream) or selected one(s) (on-demand probe)
          const forecasts = await service.find({ paginate: false, query: forecastQuery })
          debug('Probing following forecasts for probe ' + (probe._id ? probe._id : 'on-demand '), forecasts)
          for (const forecast of forecasts) {
            // Ask to retrieve forecast data and perform probing
            await this.probeForecastTime(features, probe, service, forecast)
          }
        }
      } catch (error) {
        this.app.logger.error(error.message)
      }
    }

    if (forecastTime && isTimeRange) {
      // If we do not aggregate and generate a separated feature per time
      const features = {}
      // Rearrange data so that we get ordered arrays indexed by element instead of maps
      probe.features.forEach(feature => {
        // Split data according to time if required
        if (!aggregate) {
          _.forOwn(feature.forecastTime, (times, element) => {
            times.forEach((time, index) => {
              const featureId = (Array.isArray(probe.featureId)
                ? probe.featureId.map(id => _.get(feature, id)).join('-')
                : _.get(feature, probe.featureId))
              const featuresForId = features[featureId]
              let featureForTime
              if (featuresForId) featureForTime = featuresForId[time]
              if (!featureForTime) {
                featureForTime = Object.assign({}, _.pick(feature, ['type', 'geometry']))
                featureForTime.forecastTime = moment.utc(time)
                featureForTime.runTime = moment.utc(feature.runTime[element][index])
                featureForTime.properties = _.omit(feature.properties, probe.elements)
                if (featuresForId) features[featureId][time] = featureForTime
                else features[featureId] = { [time]: featureForTime }
              }
              featureForTime.properties[element] = this.getValueAtTime(feature, element, time)
            })
          })
        } else {
          // Create the arrays of ordered times
          feature.forecastTime = _.mapValues(feature.forecastTime,
            (times, element) => times.map(time => moment.utc(time)).sort((a, b) => a - b))
          feature.runTime = _.mapValues(feature.runTime,
            (times, element) => times.map(time => moment.utc(time)).sort((a, b) => a - b))
          // Then build the associated array of interpolated values
          const properties = {}
          _.forOwn(feature.forecastTime, (times, element) => {
            properties[element] = []
            times.forEach(time => properties[element].push(this.getValueAtTime(feature, element, time)))
          })
          // Update feature
          Object.assign(feature.properties, properties)
        }
      })
      // If we do not aggregate update output with a separated feature per time
      if (!aggregate) {
        probe.features = []
        _.forOwn(features, (times, id) => {
          probe.features = probe.features.concat(_.values(times).sort((a, b) => a.forecastTime - b.forecastTime))
        })
      }
    }
  }
}