lib/streamer/client.js
import { EV } from './ev';
const streamerClient = ({
Meteor,
ddp
}) => {
class StreamerCentral extends EV {
constructor() {
super();
this.instances = {};
this.ddpConnections = {}; // since each Streamer instance can provide its own ddp connection, store them by streamer name
}
setupDdpConnection(name, ddpConnection) {
// make sure we only setup event listeners for each ddp connection once
if (ddpConnection.hasMeteorStreamerEventListeners) {
return;
}
ddpConnection._stream.on('message', (raw_msg) => {
const msg = DDPCommon.parseDDP(raw_msg);
if (msg && msg.msg === 'changed' && msg.collection && msg.fields && msg.fields.eventName && msg.fields.args) {
msg.fields.args.unshift(msg.fields.eventName);
msg.fields.args.unshift(msg.collection);
this.emit.apply(this, msg.fields.args);
}
});
// store ddp connection
this.storeDdpConnection(name, ddpConnection);
}
storeDdpConnection(name, ddpConnection) {
// mark the connection as setup for Streamer, and store it
ddpConnection.hasMeteorStreamerEventListeners = true;
this.ddpConnections[name] = ddpConnection;
}
}
Meteor.StreamerCentral = new StreamerCentral;
Meteor.Streamer = class Streamer extends EV {
constructor(name, { useCollection = false, ddpConnection = ddp } = {}) {
if (Meteor.StreamerCentral.instances[name]) {
console.warn('Streamer instance already exists:', name);
return Meteor.StreamerCentral.instances[name];
}
Meteor.StreamerCentral.setupDdpConnection(name, ddpConnection);
super();
this.ddpConnection = ddpConnection || ddp;
Meteor.StreamerCentral.instances[name] = this;
this.name = name;
this.useCollection = useCollection;
this.subscriptions = {};
Meteor.StreamerCentral.on(this.subscriptionName, (eventName, ...args) => {
if (this.subscriptions[eventName]) {
this.subscriptions[eventName].lastMessage = args;
super.emit.call(this, eventName, ...args);
}
});
this.ddpConnection._stream.on('reset', () => {
super.emit.call(this, '__reconnect__');
});
}
get name() {
return this._name;
}
set name(name) {
this._name = name;
}
get subscriptionName() {
return `stream-${this.name}`;
}
get useCollection() {
return this._useCollection;
}
set useCollection(useCollection) {
this._useCollection = useCollection;
}
stop(eventName) {
if (this.subscriptions[eventName] && this.subscriptions[eventName].subscription) {
this.subscriptions[eventName].subscription.stop();
}
this.unsubscribe(eventName);
}
stopAll() {
for (let eventName in this.subscriptions) {
if (this.subscriptions.hasOwnProperty(eventName)) {
this.stop(eventName);
}
}
}
unsubscribe(eventName) {
this.removeAllListeners(eventName);
delete this.subscriptions[eventName];
}
subscribe(eventName, args) {
let subscribe;
Tracker.nonreactive(() => {
subscribe = this.ddpConnection.subscribe(this.subscriptionName, eventName, { useCollection: this.useCollection, args }, {
onStop: () => {
this.unsubscribe(eventName);
}
});
});
return subscribe;
}
onReconnect(fn) {
if (typeof fn === 'function') {
super.on('__reconnect__', fn);
}
}
getLastMessageFromEvent(eventName) {
const subscription = this.subscriptions[eventName];
if (subscription && subscription.lastMessage) {
return subscription.lastMessage;
}
}
once(eventName, ...args) {
const callback = args.pop();
if (!this.subscriptions[eventName]) {
this.subscriptions[eventName] = {
subscription: this.subscribe(eventName, args)
};
}
super.once(eventName, callback);
}
on(eventName, ...args) {
const callback = args.pop();
if (!this.subscriptions[eventName]) {
this.subscriptions[eventName] = {
subscription: this.subscribe(eventName, args)
};
}
super.on(eventName, callback);
}
emit(...args) {
this.ddpConnection.call(this.subscriptionName, ...args);
}
};
};
export {
streamerClient
};