src/flow-deployer.coffee
_ = require 'lodash'
async = require 'async'
request = require 'request'
MeshbluConfig = require 'meshblu-config'
debug = require('debug')('nanocyte-deployer:flow-deployer')
FlowStatusMessenger = require './flow-status-messenger'
SimpleBenchmark = require 'simple-benchmark'
FLOW_START_NODE = 'engine-start'
FLOW_STOP_NODE = 'engine-stop'
INTERVAL_SERVICE_UUID = '765bd3a4-546d-45e6-a62f-1157281083f0'
class FlowDeployer
constructor: (options, dependencies={}) ->
{
@flowUuid
@instanceId
@flowToken
@forwardUrl
@userUuid
@userToken
@octobluUrl
@deploymentUuid
@flowLoggerUuid
@client
@intervalServiceUri
} = options
{
@configurationSaver
@configurationGenerator
MeshbluHttp
} = dependencies
@benchmark = new SimpleBenchmark label: "nanocyte-deployer-#{@flowUuid}-#{@deploymentUuid}"
MeshbluHttp ?= require 'meshblu-http'
meshbluConfig = new MeshbluConfig
meshbluJSON = _.defaults {uuid: @flowUuid, token: @flowToken}, meshbluConfig.toJSON()
@meshbluHttp = new MeshbluHttp meshbluJSON
throw new Error 'NanocyteDeployer requires client' unless @client?
throw new Error 'NanocyteDeployer requires intervalServiceUri' unless @intervalServiceUri?
@flowStatusMessenger = new FlowStatusMessenger @meshbluHttp,
userUuid: @userUuid
flowUuid: @flowUuid
workflow: 'flow-start'
deploymentUuid: @deploymentUuid
flowLoggerUuid: @flowLoggerUuid
deploy: (callback=->) =>
debug 'deploy', @benchmark.toString()
@flowStatusMessenger.message 'begin'
@getFlowDevice (error) =>
return @_handleError error, callback if error?
flowData = @flowDevice.flow
@registerIntervalDevices flowData.nodes, (error, nodes) =>
debug 'registerIntervalDevices', @benchmark.toString()
return @_handleError error, callback if error?
flowData.nodes = nodes
@configurationGenerator.configure {flowData, @flowToken, @deploymentUuid}, (error, config, stopConfig) =>
debug 'configurationGenerator.configure', @benchmark.toString()
return @_handleError error, callback if error?
@clearAndSaveConfig {config, stopConfig}, (error) =>
debug 'clearAndSaveConfig', @benchmark.toString()
return @_handleError error, callback if error?
@setupDevice {flowData, config}, (error) =>
debug 'setupDevice', @benchmark.toString()
return @_handleError error, callback if error?
@flowStatusMessenger.message 'end'
callback()
destroy: (callback=->) =>
@_stop {flowId: @flowUuid}, callback
_stop: ({flowId}, callback) =>
@configurationSaver.stop {flowId}, (error, records) =>
debug 'configurationSaver.stop', @benchmark.toString()
return callback error if error?
async.each records, @_unregisterInstance, (error) =>
@client.del flowId, callback
_unregisterInstance: (record, callback) =>
flowData = JSON.parse(record.flowData)
nodes = _.map _.values(flowData), 'config'
intervals = _.uniqBy @_filterIntervalNodes(nodes), 'id'
async.eachSeries intervals, @_unregisterIntervalDevice, (error) =>
return callback error if error?
deviceIds = _.map intervals, 'deviceId'
_.pull deviceIds, INTERVAL_SERVICE_UUID
if _.isEmpty deviceIds
return callback null, nodes
updateSendWhitelist =
$pullAll:
sendWhitelist: deviceIds
@meshbluHttp.updateDangerously @flowUuid, updateSendWhitelist, (error) =>
callback error, nodes
_unregisterIntervalDevice: (node, callback) =>
options =
baseUrl: @intervalServiceUri
uri: "/nodes/#{node.id}/intervals/#{node.deviceId}"
auth:
username: @flowUuid
password: @flowToken
json: true
if node.deviceId == INTERVAL_SERVICE_UUID
return callback()
request.delete options, (error, response, body) =>
return callback error if error?
callback()
clearAndSaveConfig: (options, callback) =>
{config, stopConfig} = options
saveOptions =
flowId: @flowUuid
instanceId: @instanceId
flowData: config
saveStopOptions =
flowId: "#{@flowUuid}-stop"
instanceId: @instanceId
flowData: stopConfig
async.series [
async.apply @_stop, flowId: @flowUuid
async.apply @configurationSaver.save, saveOptions
async.apply @configurationSaver.save, saveStopOptions
], callback
getFlowDevice: (callback) =>
return callback() if @flowDevice?
query =
uuid: @flowUuid
projection =
uuid: true
flow: true
'meshblu.forwarders.broadcast': true
@meshbluHttp.search query, {projection}, (error, devices) =>
return callback error if error?
@flowDevice = _.first devices
unless @flowDevice?
error = new Error 'Device Not Found'
error.code = 404
return callback error
unless @flowDevice?.flow
error = new Error 'Device is missing flow property'
error.code = 400
return callback error
callback null, @flowDevice
setupDevice: ({flowData, config}, callback=->) =>
async.series [
async.apply @createSelfSubscriptions
async.apply @createSubscriptions, config
async.apply @setupDeviceForwarding
async.apply @setupMessageSchema, flowData.nodes
], callback
setupDeviceForwarding: (callback=->) =>
messageHook =
url: @forwardUrl
method: 'POST'
signRequest: true
name: 'nanocyte-flow-deploy'
type: 'webhook'
@getFlowDevice (error) =>
return callback error if error?
pullMessageHooks =
$pull:
'meshblu.forwarders.received': {name: messageHook.name}
'meshblu.messageHooks': {name: messageHook.name}
'meshblu.forwarders.broadcast.received': {name: messageHook.name}
'meshblu.forwarders.message.received': {name: messageHook.name}
'meshblu.forwarders.configure.received': {name: messageHook.name}
addNewMessageHooks =
$addToSet:
'meshblu.forwarders.broadcast.received': messageHook
'meshblu.forwarders.message.received': messageHook
'meshblu.forwarders.configure.received': messageHook
tasks = [
async.apply @meshbluHttp.updateDangerously, @flowUuid, pullMessageHooks
async.apply @meshbluHttp.updateDangerously, @flowUuid, addNewMessageHooks
]
if _.isArray @flowDevice?.meshblu?.forwarders?.broadcast
removeOldMessageHooks =
$unset:
'meshblu.forwarders.broadcast': ''
tasks.unshift async.apply @meshbluHttp.updateDangerously, @flowUuid, removeOldMessageHooks
async.series tasks, (error) =>
debug 'setupDeviceForwarding', @benchmark.toString()
callback error
setupMessageSchema: (nodes, callback=->) =>
triggers = _.filter nodes, class: 'trigger'
messageSchema =
type: 'object'
properties:
from:
type: 'string'
title: 'Trigger'
required: true
enum: _.map(triggers, 'id')
payload:
title: "payload"
description: "Use {{msg}} to send the entire message"
replacePayload:
type: 'string'
default: 'payload'
messageFormSchema = [
{ key: 'from', titleMap: @buildFormTitleMap triggers }
{ key: 'payload', 'type': 'input', title: "Payload", description: "Use {{msg}} to send the entire message"}
]
setMessageSchema =
$set : {messageSchema, messageFormSchema, @instanceId}
@meshbluHttp.updateDangerously @flowUuid, setMessageSchema, (error) =>
debug 'setupMessageSchema', @benchmark.toString()
callback error
_filterIntervalNodes: (nodes) =>
_.filter nodes, (node) =>
return _.includes ['interval', 'schedule', 'throttle', 'debounce', 'delay', 'leading-edge-debounce'], node?.class
registerIntervalDevices: (nodes, callback=->) =>
nodes = _.cloneDeep nodes
intervals = @_filterIntervalNodes nodes
async.eachSeries intervals, @_registerIntervalDevice, (error) =>
return callback error if error?
deviceIds = _.map intervals, 'deviceId'
if _.isEmpty deviceIds
return callback null, nodes
updateSendWhitelist =
$addToSet:
sendWhitelist:
$each: deviceIds
@meshbluHttp.updateDangerously @flowUuid, updateSendWhitelist, (error) =>
callback error, nodes
_registerIntervalDevice: (node, callback) =>
options =
baseUrl: @intervalServiceUri
uri: "/nodes/#{node.id}/intervals"
auth:
username: @flowUuid
password: @flowToken
json: true
request.post options, (error, response, body) =>
return callback error if error?
return callback new Error 'Unable to create interval' unless body?.uuid?
node.deviceId = body.uuid
callback()
buildFormTitleMap: (triggers) =>
_.transform triggers, (result, trigger) ->
triggerId = _.first trigger.id.split /-/
result[trigger.id] = "#{trigger.name} (#{triggerId})"
, {}
createSelfSubscriptions: (callback) =>
subscriptions =
'broadcast.received': [@flowUuid]
'message.received': [@flowUuid]
'configure.received': [@flowUuid]
async.forEachOf subscriptions, @createSubscriptionsForType, callback
createSubscriptions: (flowConfig, callback) =>
async.forEachOf flowConfig['subscribe-devices'].config, @createSubscriptionsForType, (error) =>
debug 'createSubscriptions', @benchmark.toString()
callback error
createSubscriptionsForType: (uuids, type, callback) =>
debug 'createSubscriptions', {uuids, type}
async.each uuids, ((uuid, cb) => @createSubscriptionForType uuid, type, cb), callback
createSubscriptionForType: (emitterUuid, type, callback) =>
subscriberUuid = @flowUuid
debug '@meshbluHttp.createSubscription', {subscriberUuid, emitterUuid, type}
@meshbluHttp.createSubscription {subscriberUuid, emitterUuid, type}, callback
startFlow: (callback=->) =>
onStartMessage =
devices: [@flowUuid]
payload:
from: FLOW_START_NODE
subscribePulseMessage =
devices: [@flowUuid]
topic: 'subscribe:pulse'
async.parallel [
async.apply @meshbluHttp.message, subscribePulseMessage
async.apply @meshbluHttp.message, onStartMessage
async.apply @meshbluHttp.updateDangerously, @flowUuid, $set: {online: true, deploying: false, stopping: false}
async.apply @flowStatusMessenger.message, 'begin', undefined, application: 'flow-runner'
], callback
stopFlow: (callback=->) =>
async.parallel [
async.apply @sendStopFlowMessage
async.apply @meshbluHttp.updateDangerously, @flowUuid, $set: {online: false, deploying: false, stopping: false}
], callback
sendStopFlowMessage: (callback) =>
message =
devices: [@flowUuid]
payload:
from: FLOW_STOP_NODE
@meshbluHttp.message message, callback
_handleError: (error, callback) =>
@flowStatusMessenger.message 'error', error.message
callback error
module.exports = FlowDeployer