fluidtrends/carmel

View on GitHub
core/lib/Channel.js

Summary

Maintainability
A
2 hrs
Test Coverage
import debug from 'debug';
let LOG = debug("carmel:channel");
export class Channel {
    constructor(id, config, station) {
        this._id = id;
        LOG = debug(`carmel:channel:${id}`);
        this._isOpen = false;
        this._station = station;
        this._config = config || {};
        this._events = this.config.events || {};
        this._onEvent = this.onEvent.bind(this);
        this._registerEvent = this.registerEvent.bind(this);
        this._listenForEvent = this.listenForEvent.bind(this);
        this._eventHandler = this.eventHandler.bind(this);
        this._sendQueue = [];
    }
    static Id(id, event, type) {
        return `${Channel.PREFIX}:${id}:${event}@${type}`;
    }
    get events() {
        return this._events;
    }
    get config() {
        return this._config;
    }
    get isOpen() {
        return this._isOpen;
    }
    get id() {
        return this._id;
    }
    get station() {
        return this._station;
    }
    get sendQueue() {
        return this._sendQueue;
    }
    async flush() {
        this._sendQueue = await this.station.session.cache.get("session/sendqueue") || [];
        if (this.sendQueue.length === 0)
            return;
        LOG(`flushing send queue [events=${this.sendQueue.length}]`);
        await Promise.all(this.sendQueue.map((m) => this.sendEvent(m.id, m.data)));
        this._sendQueue = [];
        await this.station.session.cache.put("session/sendqueue", []);
        LOG(`send queue completely flushed`);
    }
    eventHandler(id) {
        if (!this.events || !this.events[id]) {
            return;
        }
        if (!this.station.session.functions || !this.station.session.functions[this.events[id]]) {
            return;
        }
        return this.station.session.functions[this.events[id]];
    }
    async onEvent(id, data, type, from) {
        const log = debug(`carmel:event:${this.id}:${id}`);
        log(`<- received [${id}] event (${type})`);
        const handler = this._eventHandler(id);
        if (!handler) {
            log(`   [ skipped ] unrecognized event`);
            return;
        }
        try {
            if (type === Channel.REQUEST_EVENT && handler[type]) {
                log(`Processing ...`);
                const result = await handler[type]({ log, session: this.station.session, channel: this, id, data, from });
                log(`   processed:`, result);
                if (data.sender && data.sender.id) {
                    const response = await this.sendEvent(`${id}`, result, `${Channel.RESPONSE_EVENT}:${data.sender.id}`);
                    log(`   response:`, response);
                }
            }
            else if (type === Channel.RESPONSE_EVENT && handler[type]) {
                const result = await handler[type]({ log, session: this.station.session, channel: this, id, data, from });
                log(`   processed:`, result);
            }
        }
        catch (e) {
            log(`   Error:`, e);
        }
    }
    async queueEvent(e) {
        this._sendQueue = await this.station.session.cache.get("session/sendqueue") || [];
        this.sendQueue.push(e);
        await this.station.session.cache.put("session/sendqueue", this.sendQueue);
    }
    async sendEvent(id, data, type = Channel.REQUEST_EVENT) {
        if (!id || !this.events[id])
            return;
        if (!this.station.session.isConnected) {
            LOG(`-> delaying sending [${id}] event until connection is established`);
            await this.queueEvent({ id, data });
            return { message: "event queued" };
        }
        this.station.session.gateway.ipfs.pubsub.publish(`${Channel.PREFIX}:${this.id}:${id}@${type}`, new TextEncoder().encode(JSON.stringify({
            ...data,
            sender: {
                id: this.station.session.id,
            }
        })));
        LOG(`-> sent [${id}] event`);
        return { message: "event sent" };
    }
    async listenForEvent(id, type, log) {
        log(`registered [${type}] handler`);
        this.station.session.gateway.ipfs.pubsub.subscribe(`${Channel.PREFIX}:${this.id}:${id}@${type}${type === 'response' ? ':' + this.station.session.id : ''}`, (msg) => {
            try {
                const { from, data } = msg;
                const e = Buffer.from(data).toString();
                if (from === this.station.session.gateway.cid)
                    return;
                this._onEvent(id, JSON.parse(e), type, from);
            }
            catch (err) {
                log(err);
            }
        });
    }
    async registerEvent(id) {
        if (!id || !this.events[id])
            return;
        const log = debug(`carmel:event:${this.id}:${id}`);
        LOG(`adding [${id}] event ...`);
        if (id === Channel.ACCEPT_EVENT_ID) {
            this._listenForEvent(id, "request", log);
            LOG(`added [${id}] event`);
            return;
        }
        const handler = this.eventHandler(id);
        if (!handler) {
            LOG(`   [ skipped ] no handler found`);
            return;
        }
        Object.keys(handler).map((h) => this._listenForEvent(id, h, log));
        LOG(`added [${id}] event`);
    }
    async addEvent(id, f) {
        this._events[id] = f;
        await this.registerEvent(id);
    }
    async open() {
        LOG(`opening channel ...`);
        await Promise.all(Object.keys(this.events).map(this._registerEvent));
        this._isOpen = true;
        LOG(`channel ready`);
    }
    async close() {
        LOG(`closing channel ...`);
        this._isOpen = false;
        LOG(`channel closed`);
    }
}
Channel.PREFIX = "#carmel:channel";
Channel.SYSTEM_ID = "sys";
Channel.SYSTEM_MAIN_ID = "sys:main";
Channel.SYSTEM_OPERATORS_ID = "sys:ops";
Channel.SYSTEM_PEERS_ID = "sys:peers";
Channel.ACCEPT_EVENT_ID = "accept";
Channel.RESPONSE_EVENT = "response";
Channel.RESPONSE_ALL_EVENT = "responseAll";
Channel.REQUEST_EVENT = "request";
Channel.EVENT = {
    OPERATOR_ACCEPT: Channel.Id(Channel.SYSTEM_OPERATORS_ID, Channel.ACCEPT_EVENT_ID, Channel.REQUEST_EVENT),
    PEER_ACCEPT: Channel.Id(Channel.SYSTEM_PEERS_ID, Channel.ACCEPT_EVENT_ID, Channel.REQUEST_EVENT)
};
//# sourceMappingURL=Channel.js.map