weacast/weacast-loader

View on GitHub
job-arpege.js

Summary

Maintainability
A
0 mins
Test Coverage
A
94%
import path from 'path'
import util from 'util'
import { fileURLToPath } from 'url'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
const outputPath = path.join(__dirname, 'forecast-data')

const defaults = (options) => ({
  id: 'weacast-arpege',
  model: 'arpege',
  dbUrl: process.env.DB_URL || 'mongodb://127.0.0.1:27017/weacast',
  request: {},
  subsets: {},
  nwp: {},
  elements: [{
    element: 'u-wind',
    name: 'U_COMPONENT_OF_WIND__SPECIFIC_HEIGHT_LEVEL_ABOVE_GROUND',
    levels: [10]
  }, {
    element: 'gust',
    name: 'WIND_SPEED_GUST__SPECIFIC_HEIGHT_LEVEL_ABOVE_GROUND',
    levels: [10]
  }, {
    element: 'v-wind',
    name: 'V_COMPONENT_OF_WIND__SPECIFIC_HEIGHT_LEVEL_ABOVE_GROUND',
    levels: [10]
  }, {
    element: 'precipitations',
    name: 'TOTAL_PRECIPITATION__GROUND_OR_WATER_SURFACE',
    accumulated: true,
    lowerLimit: 3 * 3600, // Accumulation from T to T-3H
    levels: [undefined] // Implicit surface level
  }, {
    element: 'temperature',
    name: 'TEMPERATURE__SPECIFIC_HEIGHT_LEVEL_ABOVE_GROUND',
    levels: [2]
  }],
  // By naming files locally by the number of hours from run time we reuse the same names and avoid having to purge
  filepath: '<%= element %>/<%= level ? level : \'surface\' %>/<%= runTime.format(\'HH\') %>/<%= timeOffset / 3600 %>',
  collection: '<% if (levels.length > 1) { %><%= model %>-<%= element %>-<%= level %><% } else { %><%= model %>-<%= element %><% } %>',
  archiveId: (options.isobaric ? 'archive/<%= model %>-isobaric' : 'archive/<%= model %>') +
    '/<%= runTime.format(\'YYYY/MM/DD/HH\') %>/<%= element %>/<%= level ? level : \'surface\' %>/<%= forecastTime.format(\'YYYY-MM-DD-HH\') %>',
  cog: true
})

export default (options) => {
  options = Object.assign({}, defaults(options), options)
  const filepath = options.filepath
  const id = `${options.model}/${filepath}`
  const archiveId = options.archiveId
  const collection = options.collection
  const bucket = collection
  const keepPastRuns = options.nwp.keepPastRuns
  const indices = (item) => {
    let expiration = item.ttl || options.nwp.ttl || item.interval || options.nwp.interval
    // Extend the expiration period if we need to keep past data
    if (keepPastRuns) expiration += options.nwp.oldestRunInterval
    return [
      { x: 1, y: 1 },
      { geometry: 1 },
      { geometry: '2dsphere' },
      [{ forecastTime: 1 }, { expireAfterSeconds: expiration }],
      { forecastTime: 1, geometry: 1 }
    ]
  }
  // Forward global data store to elements
  if (options.dataStore) options.elements.forEach(element => Object.assign(element, { dataStore: options.dataStore }))
  // Check if we archive on S3
  const stores = [{
    id: 'fs',
    options: {
      path: outputPath
    }
  }]
  if (process.env.S3_BUCKET) {
    stores.push({
      id: 's3',
      options: {
        client: {
          accessKeyId: process.env.S3_ACCESS_KEY,
          secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
          endpoint: process.env.S3_ENDPOINT
        },
        bucket: process.env.S3_BUCKET
      }
    })
  }

  return {
    id: options.id,
    store: 'fs',
    options: {
      workersLimit: (process.env.WORKERS_LIMIT ? Number(process.env.WORKERS_LIMIT) : (options.workersLimit || 2)),
      faultTolerant: true
    },
    taskTemplate: {
      id,
      type: 'wcs',
      // Common options for models, some will be setup on a per-model basis
      options: Object.assign({
        url: 'https://public-api.meteofrance.fr/public/arpege/1.0/wcs/MF-NWP-GLOBAL-ARPEGE-025-GLOBE-WCS/GetCoverage',
        version: '2.0.1',
        apikey: process.env.METEO_FRANCE_TOKEN,
        coverageid: '<%= name %>___<%= runTime.format() %>',
        subsets: Object.assign({
          long: [options.bounds[0], options.bounds[2]],
          lat: [options.bounds[1], options.bounds[3]],
          time: '<%= forecastTime.format() %>'
        }, (options.isobaric ? { pressure: '<%= level %>' } : { height: '<%= level %>' }), options.subsets)
      }, options.request)
    },
    hooks: {
      tasks: {
        before: {
          // Avoid hitting rate limit by adding a delay between requests
          waitBeforeRequest: {
            hook: 'apply',
            function: async () => {
              await util.promisify(setTimeout)(process.env.REQUEST_DELAY ? Number(process.env.REQUEST_DELAY) : 3000)
            }
          },
          readMongoCollection: {
            collection,
            dataPath: 'data.previousData',
            // When keeping only the most recent forecast check if it comes from an older run time
            // When keeping all run times check if it already exist for the current run time
            query: Object.assign({ forecastTime: '<%= forecastTime.format() %>', geometry: { $exists: false } },
              keepPastRuns ? { runTime: '<%= runTime.format() %>' } : {}),
            project: { _id: 1, runTime: 1, forecastTime: 1 },
            transform: { asObject: true }
          },
          // Do not download data if already here
          discardIf: {
            predicate: (item) => item.previousData.runTime && (item.runTime.valueOf() === item.previousData.runTime.getTime())
          },
          // For surface cariables the height parameter is implicit, remove it from request as the WCS service does not like it
          surface: {
            hook: 'apply',
            function: (item) => {
              if (!item.level) delete item.options.subsets.height
            }
          },
          // When the accumulation period X is less than 1 day suffix is PTXH otherwise the suffix is PXD.
          accumulation: {
            hook: 'apply',
            match: { accumulated: true },
            function: (item) => {
              const accumulationPeriod = item.lowerLimit / 3600
              if (accumulationPeriod < 24) item.options.coverageid += '_PT' + accumulationPeriod + 'H'
              else item.options.coverageid += '_P' + (accumulationPeriod / 24) + 'D'
            }
          }
        },
        after: {
          // Generate Cloud optimized GeoTIFF for archiving
          // Move from [0°, 360°] longitude range to [-180°, 180°] longitude range whenever required
          processAndSwipeRawData: {
            match: { predicate: () => process.env.S3_BUCKET && options.cog && (options.bounds[2] > 180) },
            hook: 'runCommand',
            command: [
            // Create first a replication from [0, 360] to [-360, 0] and a VRT covering [-360, 360]
            // Then extract the portion between [-180, 180] from this VRT
              `gdal_translate -a_ullr -360.125 90.125 -0.125 -90.125 ${outputPath}/<%= id %> ${outputPath}/<%= id %>_shifted`,
              `gdalbuildvrt ${outputPath}/<%= id %>.vrt ${outputPath}/<%= id %> ${outputPath}/<%= id %>_shifted`,
              `gdal_translate ${outputPath}/<%= id %>.vrt ${outputPath}/<%= id %>_180.vrt -projwin -180.125 90.125 179.875 -90.125 -of VRT`,
              `gdalwarp -overwrite -ot Float32 -wo NUM_THREADS=6 -wo SOURCE_EXTRA=100 ${outputPath}/<%= id %>_180.vrt ${outputPath}/<%= id %>_180.tif`,
              `gdal_translate ${outputPath}/<%= id %>_180.tif ${outputPath}/<%= id %>.tif -ot Float32 -co COMPRESS=DEFLATE -co NUM_THREADS=ALL_CPUS -co TILED=YES -co BLOCKXSIZE=256 -co BLOCKYSIZE=256 -co COPY_SRC_OVERVIEWS=YES`
            ]
          },
          processRawData: {
            match: { predicate: () => process.env.S3_BUCKET && options.cog && (options.bounds[2] <= 180) },
            hook: 'runCommand',
            command: `gdal_translate ${outputPath}/<%= id %> ${outputPath}/<%= id %>.tif -ot Float32 -co COMPRESS=DEFLATE -co NUM_THREADS=ALL_CPUS -co TILED=YES -co BLOCKXSIZE=256 -co BLOCKYSIZE=256 -co COPY_SRC_OVERVIEWS=YES`
          },
          // Upload raw archive data to S3
          archiveRawData: {
            match: { predicate: () => !options.cog && process.env.S3_BUCKET },
            hook: 'copyToStore',
            input: { key: '<%= id %>', store: 'fs' },
            output: {
              key: `${archiveId}.tif`,
              store: 's3',
              params: { ACL: 'public-read' }
            }
          },
          archiveProcessedData: {
            match: { predicate: () => options.cog && process.env.S3_BUCKET },
            hook: 'copyToStore',
            input: { key: '<%= id %>.tif', store: 'fs' },
            output: {
              key: `${archiveId}.cog`,
              store: 's3',
              params: { ACL: 'public-read' }
            }
          },
          runCommand: {
            command: `gtiff2json ${outputPath}/<%= id %> -p <%= (element.precision || 2) %> -o ${outputPath}/<%= id %>.json`
          },
          // This will add grid data in a data field
          readJson: {
            key: '<%= id %>.json'
          },
          transformJson: {
            dataPath: 'result',
            pick: ['id', 'model', 'element', 'level', 'levels', 'runTime', 'forecastTime', 'data', 'dataStore', 'client']
          },
          // Convert temperature from K to C°
          // Although it would be required according to documentation it does not seem to be
          /*
          apply: {
            match: { element: 'temperature' },
            function: (item) => {
              for (let i = 0; i < item.data.length; i++) {
                item.data[i] = item.data[i] - 273.15
              }
            }
          },
          */
          computeStatistics: { dataPath: 'result.data', min: 'minValue', max: 'maxValue' },
          // Erase previous data if any
          deleteMongoCollection: {
            // Do not delete any previous data if keeping all run times
            match: { predicate: () => !keepPastRuns },
            collection,
            filter: { forecastTime: '<%= forecastTime.format() %>' }
          },
          writeRawData: {
            match: { dataStore: { $ne: 'gridfs' } },
            hook: 'writeMongoCollection',
            dataPath: 'result',
            collection,
            transform: {
              omit: ['id', 'model', 'levels', 'element', 'dataStore', 'client']
            }
          },
          writeMetaData: {
            match: { dataStore: { $eq: 'gridfs' } },
            hook: 'writeMongoCollection',
            dataPath: 'result',
            collection,
            transform: {
              omit: ['id', 'model', 'levels', 'element', 'data', 'dataStore', 'client'],
              merge: { filePath: '<%= id %>', convertedFilePath: '<%= id %>.json' }
            }
          },
          writeRawFile: {
            match: { dataStore: { $eq: 'gridfs' } },
            hook: 'writeMongoBucket',
            key: '<%= id %>.json',
            bucket,
            metadata: { forecastTime: '<%= forecastTime.format() %>' }
          },
          emitEvent: { name: collection, pick: ['runTime', 'forecastTime'] },
          tileGrid: {
            match: { predicate: (item) => options.tileResolution },
            dataPath: 'result.data',
            input: { bounds: options.bounds, origin: options.origin, size: options.size, resolution: options.resolution },
            output: { resolution: options.tileResolution },
            transform: {
              merge: { forecastTime: '<%= forecastTime.format() %>', runTime: '<%= runTime.format() %>', timeseries: false }
            }
          },
          writeTiles: {
            hook: 'writeMongoCollection',
            dataPath: 'result.data',
            collection,
            match: { predicate: (item) => options.tileResolution },
            transform: {
              unitMapping: { forecastTime: { asDate: 'utc' }, runTime: { asDate: 'utc' } }
            }
          },
          clearData: {} // This will free memory for grid data
        }
      },
      jobs: {
        before: {
          createStores: stores,
          connectMongo: {
            url: options.dbUrl,
            // Required so that client is forwarded from job to tasks
            clientPath: 'taskTemplate.client'
          },
          createCollections: {
            hook: 'parallel',
            hooks: options.elements.map(item => (item.levels || [null]).map(level => ({
              hook: 'createMongoCollection',
              collection: (item.levels && item.levels.length > 1 ? `${options.model}-${item.element}-${level}` : `${options.model}-${item.element}`),
              indices: indices(item),
              // Required so that client is forwarded from job to tasks
              clientPath: 'taskTemplate.client'
            }))).reduce((hooks, hooksForLevels) => hooks.concat(hooksForLevels), [])
          },
          createBuckets: {
            hook: 'parallel',
            hooks: options.elements.filter(item => item.dataStore === 'gridfs').map(item => (item.levels || [null]).map(level => ({
              hook: 'createMongoBucket',
              bucket: (item.levels && item.levels.length > 1 ? `${options.model}-${item.element}-${level}` : `${options.model}-${item.element}`),
              // Required so that client is forwarded from job to tasks
              clientPath: 'taskTemplate.client'
            }))).reduce((hooks, hooksForLevels) => hooks.concat(hooksForLevels), [])
          },
          // Common options for models, some will be setup on a per-model basis
          generateNwpTasks: Object.assign({
            runIndex: 0, // -1 is not current run but previous one to ensure it is already available
            keepPastForecasts: true, // We'd like to keep forecast data since the start of the run for archiving
            elements: options.elements.map(element => Object.assign({ model: options.model }, element))
          }, options.nwp)
        },
        after: {
          disconnectMongo: {
            clientPath: 'taskTemplate.client'
          },
          clearOutputs: {},
          removeStores: stores.map(store => store.id)
        }
      }
    }
  }
}