src/persistance/mem-stores.js
/** * Created by T4rk on 7/12/2017. */ import { EventBus, valueChanged, changeNotifier } from '../event-bus/event-bus'import { toCancelable } from '../extensions/prom-extensions'import { objMapReducer } from '../extensions/obj-extensions'import { Deque } from '../containers/deque' const fulfilled = (action) => `${action}_fulfilled`const rejected = (action) => `${action}_rejected`const pending = (action) => `${action}_pending` // eslint-disable-next-line no-consoleconst onDispatchError = console.log /** * Create an object with all the states of a promise. * @param {string} action * @return {{base: string, fulfilled: string, rejected: string, pending: string}} */export const createPromiseStates = (action) => ({ base: action, fulfilled: fulfilled(action), rejected: rejected(action), pending: pending(action)}) /** * Wrap a promise to send events on success or failure. * * Events sent: * - `${action}_pending` no payload * - `${action}_fulfilled` payload {value} * - `${action}_rejected` payload {error} * @param {!EventBus} eventBus * @param {!string} action * @param {!Promise} promise */export const promiseNotifier = (eventBus, action, promise) => { eventBus.dispatch({event: pending(action)}).promise.catch(onDispatchError) promise.then( value => eventBus.dispatch({event: fulfilled(action), payload: value}).promise.catch(onDispatchError) ).catch( error => eventBus.dispatch({event: rejected(action), payload: error}).promise.catch(onDispatchError) )} /** * A function that returns a promise * @typedef {function} promiseAction * @param {?*} options * @return {!Promise} */ /** * @example * import { PromiseStore, EventBus, fetchRequest } from 'tarkjs' * * const eventBus = new EventBus() * const store = new PromiseStore({fetch_text: (url) => fetchRequest(url)}, eventBus) * const elem = document.getElementById('text-result') * * // using subscribe * store.subscribe('fetch_text', (e) => { * if (e.event === 'fetch_text_result_value_changed')) * elem.innerHTML = e.payload.newValue * }) * * // using the promise state events * eventBus.addEventHandler('fetch_text_fulfilled', (e) => elem.innerHTML = e.payload) * * store.actions.fetch_text('text.txt') * */export class PromiseStore { /** * @param {Object} actions * @param {?EventBus} eventBus */Function `constructor` has 42 lines of code (exceeds 25 allowed). Consider refactoring. constructor(actions, eventBus=null) { this._eventBus = eventBus || new EventBus() const actionKeys = Object.keys(actions) /** * Centralized stores * @type {Object} */ this.actionStore = actionKeys.map(k => [k, { store: changeNotifier({ result: null, pending: false, rejected: false, fulfilled: false, error: null }, this._eventBus, {prefix: k}), subscribe: (sub) => [ 'result', 'pending', 'rejected', 'fulfilled', 'error' ].forEach(i => this._eventBus.addEventHandler(valueChanged(i, k), sub)), STATES: createPromiseStates(k) }]).reduce(objMapReducer, {}) /** * The actions as direct function calls. * Each action key will have a function with one optional param that returns a CancelablePromise. * @type {Object} */ this.actions = actionKeys.reduce((m, k) => { /** * Wrapped promise call. * @param [options] * @return {CancelablePromise} */ m[k] = (options={}) => { const prom = actions[k](options) promiseNotifier(this._eventBus, k, prom) return toCancelable(prom) } return m }, {}) // Handle the states changes. actionKeys.forEach(k => { const store = this.actionStore[k].store this._eventBus.addEventHandler(fulfilled(k), (event) => { store.fulfilled = true store.result = event.payload store.pending = false }) this._eventBus.addEventHandler(pending(k), () => { store.pending = true store.rejected = false store.fulfilled = false store.error = null }) this._eventBus.addEventHandler(rejected(k), (event) => { store.pending = false store.rejected = true store.error = event.payload }) }) } /** * Subscribe to a store value change event. * @param {string} action * @param {function} sub */ subscribe(action, sub) { this.actionStore[action].subscribe(sub) const states = [fulfilled(action), pending(action), rejected(action)] states.forEach(s => this._eventBus.addEventHandler(s, sub)) } /** * Remove the handler from the internal {@link EventBus}. * @param {!string} action * @param {!function} sub */ unsubscribe(action, sub) { [ 'result', 'pending', 'rejected', 'fulfilled', 'error' ].forEach(s => this._eventBus.removeEventHandler(valueChanged(s, action), sub)) const states = [fulfilled(action), pending(action), rejected(action)] states.forEach(s => this._eventBus.removeEventHandler(s, sub)) }} /** * Options for the {@link SocketStore#constructor}. * @typedef {Object} SocketStoreOptions * @property {?EventBus} [eventBus=null] The eventBus to dispatch events to, if null instantiate a new one. * @property {?string} [socketName=null] The name of the socket, if null take the url * @property {Array} [protocols=[]] socket protocols * @property {boolean} [start=false] Start the socket on initialization. * @property {int} [capacity=100] The number of message to keep in the store. * @property {boolean} [insertFront=false] Insert the elements at the front of the deque store. * @property {function(err:*)} [onError] socket event handler * @property {function(e:*)} [onOpen] socket event handler * @property {function(e:*)} [onClose] socket event handler * @property {function(data:*)} [transformMessage] Transform the message before inserting the data in the store */ /** * @type {SocketStoreOptions} */const defaultSocketStoreOptions = { eventBus: null, protocols: [], start: false, capacity: 100, socketName: null, insertFront: false, onOpen: () => {}, onError: () => {}, onClose: () => {}, transformMessage: (data) => data} /** * Store the messages received by a socket and send events to handlers. */export class SocketStore { /** * @param {!string} url * @param {SocketStoreOptions} [options] */ constructor(url, options=defaultSocketStoreOptions) { const { eventBus, protocols, start, onOpen, capacity, onError, onClose, transformMessage, socketName, insertFront } = {...defaultSocketStoreOptions, ...options} this._eventBus = eventBus || new EventBus() /** * The name of the socket, if null take the url and events will have strange name but still valid. * @type {string} */ this.socketName = socketName || url /** * Notifies of changes with event `${socket}_messages_value_changed` * @type {{messages: Deque}} */ this.store = changeNotifier({messages: new Deque({capacity})}, this._eventBus, {prefix: this.socketName}) /** * @type {string} */ this.url = url /** * @type {Array<string>} */ this.protocols = protocols /** * @type {WebSocket} * @private */ this._socket = null /** * Socket event handler, must set before start. * @type {function(e: *)} */ this.onOpen = onOpen /** * Socket event handler, must set before start. * @type {function(e: *)} */ this.onError = onError /** * Socket event handler, must set before start. * @type {function(e: *)} */ this.onClose = onClose /** * Transform the message before inserting the data in the store * @type {function(data: *)} */ this.transformMessage = transformMessage this._front = insertFront if (start) this.start() } /** * Initialize the socket and its handlers. */ start() { //noinspection JSCheckFunctionSignatures this._socket = new WebSocket(this.url, this.protocols) const socketEvent = (e, func, event) => { func(e) this._eventBus.dispatch({event, payload: e}).promise.catch(onDispatchError) } this._socket.onopen = (e) => socketEvent(e, this.onOpen, this.socket_open) this._socket.onerror = (e) => socketEvent(e, this.onError, this.socket_error) this._socket.onclose = (e) => socketEvent(e, this.onClose, this.socket_close) this._socket.onmessage = (event) => { const data = this.transformMessage(event.data) // emits change events - remove the notifier as it is redundant ? this.store.messages = this.store.messages.insert(data, this._front) this._eventBus.dispatch({ event: this.socket_message_received, payload: {data, store: this.store.messages} }) } } /** * Send a message to the server. * @param {string} message */ send(message) { if (!this._socket || this._socket.readyState !== WebSocket.OPEN) throw new Error('Socket not open') this._socket.send(message) } /** * Close the internal socket. * @param {{code: number, reason: string}} [options={code: 1000, reason: ''}] optional reason */ close(options={code: 1000, reason: ''}) { if (this._socket && this._socket.readyState === WebSocket.OPEN) this._socket.close(options.code, options.reason) } /** * Subscribe to change in the store. * Only one event is subscribed to `${socketName}_message_received` * @param {function(e: TEvent)} sub */ subscribe(sub) { this._eventBus.addEventHandler(this.socket_message_received, sub) } /** * Remove the handler from the internal {@link EventBus}. * @param {!function} sub */ unsubscribe(sub) { this._eventBus.removeEventHandler(this.socket_message_received, sub) } /** * @return {string} */ get socket_close() { return `${this.socketName}_close` } /** * @return {string} */ get socket_open() { return `${this.socketName}_open` } /** * @return {string} */ get socket_error() { return `${this.socketName}_error` } /** * @return {string} */ get socket_message_received() { return `${this.socketName}_message_received` }}