lib/index.js
import { promisify } from 'util'
import errors from '@feathersjs/errors'
import makeCote from 'cote'
import makeDebug from 'debug'
import portfinder from 'portfinder'
import { v4 as uuid } from 'uuid'
import { DEFAULT_METHODS, DEFAULT_EVENTS, COMPONENTS, HealthcheckService } from './utils.js'
import { publishService, unpublishService, publishServices } from './publish.js'
import { registerService, unregisterService, registerApplication, unregisterApplication } from './register.js'
const { Unavailable } = errors
const debug = makeDebug('feathers-distributed')
const debugIgnore = makeDebug('feathers-distributed:ignore')
export async function initialize (app) {
debug('Initializing cote with options', app.coteOptions)
// Setup cote with options
app.cote = makeCote(app.coteOptions)
app.distributionKey = app.distributionOptions.key || 'default'
// Placeholder for request/events managers for remote services
app.serviceRequesters = {}
app.serviceEventsSubscribers = {}
// Placeholder for remote app replicas
app.remoteApps = {}
// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber({
name: COMPONENTS.SERVICES_SUBSCRIBER,
namespace: 'services',
key: 'services',
subscribesTo: ['service', 'service-removed'],
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', async serviceDescriptor => {
// When a new app pops up ensure the required proxy to it is created first
// Indeed this should be done by new component detection but as it is based on a check interval it might occur later
await registerApplication(app, serviceDescriptor)
registerService(app, serviceDescriptor)
})
// When a remote service is removed remove the local proxy interface to it
app.serviceSubscriber.on('service-removed', serviceDescriptor => {
unregisterService(app, serviceDescriptor)
})
// Wait before instanciating new component to avoid too much concurrency on port allocation
await promisify(setTimeout)(app.distributionOptions.componentDelay)
// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: COMPONENTS.SERVICES_PUBLISHER,
namespace: 'services',
key: 'services',
broadcasts: ['service', 'service-removed'],
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// Dispatcher of service events to other nodes
if (app.distributionOptions.publishEvents) {
// Wait before instanciating new component to avoid too much concurrency on port allocation
await promisify(setTimeout)(app.distributionOptions.componentDelay)
app.serviceEventsPublisher = new app.cote.Publisher({
name: COMPONENTS.SERVICES_EVENTS_PUBLISHER,
namespace: app.distributionKey,
key: app.distributionKey,
broadcasts: app.distributionOptions.distributedEvents || DEFAULT_EVENTS,
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Service events publisher ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
}
// Wait before instanciating new component to avoid too much concurrency on port allocation
await promisify(setTimeout)(app.distributionOptions.componentDelay)
// Create the response manager for local services
const methods = app.distributionOptions.distributedMethods || DEFAULT_METHODS
app.serviceResponder = new app.cote.Responder({
name: COMPONENTS.SERVICES_RESPONDER,
namespace: app.distributionKey,
key: app.distributionKey,
requests: methods.concat(['healthcheck']),
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Service responder ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
debug('Registering listeners for ', methods.concat(['healthcheck']))
// Answer requests from other nodes
if (methods.includes('find')) {
app.serviceResponder.on('find', async (req) => {
const service = app.service(req.path)
debug('Responding find() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.find(Object.assign({ fromRemote: true }, req.params))
debug('Successfully find() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
if (methods.includes('get')) {
app.serviceResponder.on('get', async (req) => {
const service = app.service(req.path)
debug('Responding get() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.get(req.id, Object.assign({ fromRemote: true }, req.params))
debug('Successfully get() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
if (methods.includes('create')) {
app.serviceResponder.on('create', async (req) => {
const service = app.service(req.path)
debug('Responding create() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.create(req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully create() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
if (methods.includes('update')) {
app.serviceResponder.on('update', async (req) => {
const service = app.service(req.path)
debug('Responding update() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.update(req.id, req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully update() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
if (methods.includes('patch')) {
app.serviceResponder.on('patch', async (req) => {
const service = app.service(req.path)
debug('Responding patch() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.patch(req.id, req.data, Object.assign({ fromRemote: true }, req.params))
debug('Successfully patch() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
if (methods.includes('remove')) {
app.serviceResponder.on('remove', async (req) => {
const service = app.service(req.path)
debug('Responding remove() local service on path ' + req.path + ' with key ' + req.key, req)
const result = await service.remove(req.id, Object.assign({ fromRemote: true }, req.params))
debug('Successfully remove() local service on path ' + req.path + ' with key ' + req.key)
return result
})
}
// Healthcheck is always used
app.serviceResponder.on('healthcheck', async (req) => {
debug('Responding healthcheck() local service on path ' + req.path + ' with key ' + req.key, req)
const service = app.service(req.path)
if (!service) throw new Unavailable('Unavailable distributed service on path ' + req.path + ' with key ' + req.key)
debug('Successfully healthcheck() local service on path ' + req.path + ' with key ' + req.key)
return true
})
// Process custom methods
methods.forEach(method => {
if (!DEFAULT_METHODS.includes(method)) {
app.serviceResponder.on(method, async (req) => {
const service = app.service(req.path)
debug(`Responding ${method}() local service on path ` + req.path + ' with key ' + req.key, req)
const result = await service[method](req.data, Object.assign({ fromRemote: true }, req.params))
debug(`Successfully ${method}() local service on path ` + req.path + ' with key ' + req.key)
return result
})
}
})
// Each time a new app pops up we republish local services so that
// service distribution does not depend on the initialization order of the apps
app.servicePublisher.on('cote:added', (data) => {
// As this event is emitted for all cote components filtering one should be sufficient
if (!data.advertisement || (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER)) {
debugIgnore('Ignoring cote:added event for other components than services subscriber', data.advertisement)
return
}
const key = data.advertisement.appDistributionKey
const uuid = data.advertisement.appUuid
if (!key || !uuid) {
debugIgnore('Ignoring cote:added event for services subscriber without key/uuid', data.advertisement)
return
}
const shortUuid = uuid.split('-')[0]
debug('New services subscriber detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, data.advertisement)
// When a new app pops up create the required proxy to it first
registerApplication(app, { uuid, shortUuid, key })
})
// Manage app going offline
app.servicePublisher.on('cote:removed', (data) => {
// As this event is emitted for all cote components filtering one should be sufficient
if (!data.advertisement || (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER)) {
debugIgnore('Ignoring cote:removed event for other components than services subscriber', data.advertisement)
return
}
const key = data.advertisement.appDistributionKey
const uuid = data.advertisement.appUuid
if (!key || !uuid) {
debugIgnore('Ignoring cote:removed event for services subscriber without key/uuid', data.advertisement)
return
}
const shortUuid = uuid.split('-')[0]
debug('Services subscriber loss detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, data.advertisement)
// When an app goes offline check if we need to keep cote components alive for remaining replicas
unregisterApplication(app, { uuid, shortUuid, key })
})
// Tell others apps I'm here
publishServices(app)
// Add an interval so that we regularly publish services to others nodes
if (app.distributionOptions.heartbeatInterval > 0) {
app.heartbeatInterval = setInterval(_ => {
Object.getOwnPropertyNames(app.services).forEach(path => {
publishService(app, path)
})
}, app.distributionOptions.heartbeatInterval)
debug('Scheduled heartbeat local services publishing for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
}
}
export function finalize (app) {
debug('Finalizing cote')
delete app.remoteApps
if (app.serviceRequesters) {
Object.getOwnPropertyNames(app.serviceRequesters).forEach(key => {
debug(`Finalizing service requester for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceRequesters[key].close()
})
delete app.serviceRequesters
}
if (app.serviceEventsSubscribers) {
Object.getOwnPropertyNames(app.serviceEventsSubscribers).forEach(key => {
debug(`Finalizing service event subscriber for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceEventsSubscribers[key].close()
})
delete app.serviceEventsSubscribers
}
if (app.serviceSubscriber) {
debug(`Finalizing service subscriber for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceSubscriber.close()
delete app.serviceSubscriber
}
if (app.servicePublisher) {
debug(`Finalizing service publisher for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.servicePublisher.close()
delete app.servicePublisher
}
if (app.serviceResponder) {
debug(`Finalizing service responder for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceResponder.close()
delete app.serviceResponder
}
if (app.applicationPublicationTimeout) clearTimeout(app.applicationPublicationTimeout)
if (app.coteInitializationTimeout) clearTimeout(app.coteInitializationTimeout)
if (app.heartbeatInterval) clearInterval(app.heartbeatInterval)
}
export default function init (options = {}) {
return function (app) {
// We need to uniquely identify the app to avoid infinite loop by registering our own services
app.uuid = uuid()
// For display purpose
app.shortUuid = app.uuid.split('-')[0]
app.coteOptions = Object.assign({
helloInterval: 10000,
checkInterval: 20000,
nodeTimeout: 30000,
masterTimeout: 60000,
log: (!!process.env.COTE_LOG),
basePort: (process.env.BASE_PORT ? Number(process.env.BASE_PORT) : 10000),
highestPort: (process.env.HIGHEST_PORT ? Number(process.env.HIGHEST_PORT) : 20000)
}, options.cote)
app.distributionOptions = Object.assign({
publicationDelay: (process.env.PUBLICATION_DELAY ? Number(process.env.PUBLICATION_DELAY) : 10000),
componentDelay: (process.env.COMPONENT_DELAY ? Number(process.env.COMPONENT_DELAY) : 1000),
coteDelay: (process.env.COTE_DELAY ? Number(process.env.COTE_DELAY) : undefined),
heartbeatInterval: (process.env.HEARTBEAT_INTERVAL ? Number(process.env.HEARTBEAT_INTERVAL) : undefined),
middlewares: {},
publishEvents: true,
distributedEvents: DEFAULT_EVENTS,
distributedMethods: DEFAULT_METHODS
}, options)
debug('Initializing feathers-distributed with options', app.distributionOptions)
// Change default base/highest port for automated port finding
portfinder.basePort = app.coteOptions.basePort
portfinder.highestPort = app.coteOptions.highestPort
// Setup cote with options and required delay
if (app.distributionOptions.coteDelay) {
// -1 means the caller wants to initialize byitself
if (app.distributionOptions.coteDelay > 0) {
app.coteInitializationTimeout = setTimeout(_ => { initialize(app) }, app.distributionOptions.coteDelay)
}
} else {
initialize(app)
}
// Healthcheck endpoint(s)
const healthcheckRoute = (options.healthcheckPath || '/distribution/healthcheck/')
debug('Initializing feathers-distributed healthcheck route', healthcheckRoute)
// Route for specific app
app.use(healthcheckRoute + ':key', new HealthcheckService(app))
// Route for all registered apps
app.use(healthcheckRoute, new HealthcheckService(app))
// We replace the use/unuse method to inject service publisher/responder
const superUse = app.use
app.use = function () {
const path = arguments[0]
// Register the service normally first
const superReturn = superUse.apply(app, arguments)
// Check if cote has already been initialized
if (!app.cote) return superReturn
// With express apps we can directly register middlewares: not supported
if (typeof path !== 'string') return superReturn
publishService(app, path)
return superReturn
}
const superUnuse = app.unuse
app.unuse = function () {
const path = arguments[0]
// Unregister the remote services first
// Check if cote has already been initialized
if (app.cote) unpublishService(app, path)
// Then local service
return superUnuse.apply(app, arguments)
}
}
}