T4rk1n/tarkjs

View on GitHub
src/persistance/mem-stores.js

Summary

Maintainability
A
1 hr
Test Coverage
/**
* Created by T4rk on 7/12/2017.
*/
 
import { EventBus, valueChanged, changeNotifier } from '../event-bus/event-bus'
import { toCancelable } from '../extensions/prom-extensions'
import { objMapReducer } from '../extensions/obj-extensions'
import { Deque } from '../containers/deque'
 
const fulfilled = (action) => `${action}_fulfilled`
const rejected = (action) => `${action}_rejected`
const pending = (action) => `${action}_pending`
 
// eslint-disable-next-line no-console
const onDispatchError = console.log
 
/**
* Create an object with all the states of a promise.
* @param {string} action
* @return {{base: string, fulfilled: string, rejected: string, pending: string}}
*/
export const createPromiseStates = (action) => ({
base: action,
fulfilled: fulfilled(action),
rejected: rejected(action),
pending: pending(action)
})
 
/**
* Wrap a promise to send events on success or failure.
*
* Events sent:
* - `${action}_pending` no payload
* - `${action}_fulfilled` payload {value}
* - `${action}_rejected` payload {error}
* @param {!EventBus} eventBus
* @param {!string} action
* @param {!Promise} promise
*/
export const promiseNotifier = (eventBus, action, promise) => {
eventBus.dispatch({event: pending(action)}).promise.catch(onDispatchError)
promise.then(
value => eventBus.dispatch({event: fulfilled(action), payload: value}).promise.catch(onDispatchError)
).catch(
error => eventBus.dispatch({event: rejected(action), payload: error}).promise.catch(onDispatchError)
)
}
 
/**
* A function that returns a promise
* @typedef {function} promiseAction
* @param {?*} options
* @return {!Promise}
*/
 
/**
* @example
* import { PromiseStore, EventBus, fetchRequest } from 'tarkjs'
*
* const eventBus = new EventBus()
* const store = new PromiseStore({fetch_text: (url) => fetchRequest(url)}, eventBus)
* const elem = document.getElementById('text-result')
*
* // using subscribe
* store.subscribe('fetch_text', (e) => {
* if (e.event === 'fetch_text_result_value_changed'))
* elem.innerHTML = e.payload.newValue
* })
*
* // using the promise state events
* eventBus.addEventHandler('fetch_text_fulfilled', (e) => elem.innerHTML = e.payload)
*
* store.actions.fetch_text('text.txt')
*
*/
export class PromiseStore {
/**
* @param {Object} actions
* @param {?EventBus} eventBus
*/
Function `constructor` has 42 lines of code (exceeds 25 allowed). Consider refactoring.
constructor(actions, eventBus=null) {
this._eventBus = eventBus || new EventBus()
const actionKeys = Object.keys(actions)
/**
* Centralized stores
* @type {Object}
*/
this.actionStore = actionKeys.map(k => [k, {
store: changeNotifier({
result: null,
pending: false,
rejected: false,
fulfilled: false,
error: null
}, this._eventBus, {prefix: k}),
subscribe: (sub) => [
'result', 'pending', 'rejected', 'fulfilled', 'error'
].forEach(i => this._eventBus.addEventHandler(valueChanged(i, k), sub)),
STATES: createPromiseStates(k)
}]).reduce(objMapReducer, {})
 
/**
* The actions as direct function calls.
* Each action key will have a function with one optional param that returns a CancelablePromise.
* @type {Object}
*/
this.actions = actionKeys.reduce((m, k) => {
/**
* Wrapped promise call.
* @param [options]
* @return {CancelablePromise}
*/
m[k] = (options={}) => {
const prom = actions[k](options)
promiseNotifier(this._eventBus, k, prom)
return toCancelable(prom)
}
return m
}, {})
 
// Handle the states changes.
actionKeys.forEach(k => {
const store = this.actionStore[k].store
this._eventBus.addEventHandler(fulfilled(k), (event) => {
store.fulfilled = true
store.result = event.payload
store.pending = false
})
this._eventBus.addEventHandler(pending(k), () => {
store.pending = true
store.rejected = false
store.fulfilled = false
store.error = null
})
this._eventBus.addEventHandler(rejected(k), (event) => {
store.pending = false
store.rejected = true
store.error = event.payload
})
})
}
 
/**
* Subscribe to a store value change event.
* @param {string} action
* @param {function} sub
*/
subscribe(action, sub) {
this.actionStore[action].subscribe(sub)
const states = [fulfilled(action), pending(action), rejected(action)]
states.forEach(s => this._eventBus.addEventHandler(s, sub))
}
 
/**
* Remove the handler from the internal {@link EventBus}.
* @param {!string} action
* @param {!function} sub
*/
unsubscribe(action, sub) {
[
'result', 'pending', 'rejected', 'fulfilled', 'error'
].forEach(s => this._eventBus.removeEventHandler(valueChanged(s, action), sub))
const states = [fulfilled(action), pending(action), rejected(action)]
states.forEach(s => this._eventBus.removeEventHandler(s, sub))
}
}
 
/**
* Options for the {@link SocketStore#constructor}.
* @typedef {Object} SocketStoreOptions
* @property {?EventBus} [eventBus=null] The eventBus to dispatch events to, if null instantiate a new one.
* @property {?string} [socketName=null] The name of the socket, if null take the url
* @property {Array} [protocols=[]] socket protocols
* @property {boolean} [start=false] Start the socket on initialization.
* @property {int} [capacity=100] The number of message to keep in the store.
* @property {boolean} [insertFront=false] Insert the elements at the front of the deque store.
* @property {function(err:*)} [onError] socket event handler
* @property {function(e:*)} [onOpen] socket event handler
* @property {function(e:*)} [onClose] socket event handler
* @property {function(data:*)} [transformMessage] Transform the message before inserting the data in the store
*/
 
/**
* @type {SocketStoreOptions}
*/
const defaultSocketStoreOptions = {
eventBus: null, protocols: [], start: false, capacity: 100, socketName: null, insertFront: false,
onOpen: () => {},
onError: () => {},
onClose: () => {},
transformMessage: (data) => data
}
 
/**
* Store the messages received by a socket and send events to handlers.
*/
export class SocketStore {
/**
* @param {!string} url
* @param {SocketStoreOptions} [options]
*/
constructor(url, options=defaultSocketStoreOptions) {
const {
eventBus, protocols, start, onOpen, capacity, onError, onClose, transformMessage, socketName, insertFront
} = {...defaultSocketStoreOptions, ...options}
this._eventBus = eventBus || new EventBus()
/**
* The name of the socket, if null take the url and events will have strange name but still valid.
* @type {string}
*/
this.socketName = socketName || url
/**
* Notifies of changes with event `${socket}_messages_value_changed`
* @type {{messages: Deque}}
*/
this.store = changeNotifier({messages: new Deque({capacity})}, this._eventBus, {prefix: this.socketName})
/**
* @type {string}
*/
this.url = url
/**
* @type {Array<string>}
*/
this.protocols = protocols
/**
* @type {WebSocket}
* @private
*/
this._socket = null
/**
* Socket event handler, must set before start.
* @type {function(e: *)}
*/
this.onOpen = onOpen
/**
* Socket event handler, must set before start.
* @type {function(e: *)}
*/
this.onError = onError
/**
* Socket event handler, must set before start.
* @type {function(e: *)}
*/
this.onClose = onClose
/**
* Transform the message before inserting the data in the store
* @type {function(data: *)}
*/
this.transformMessage = transformMessage
this._front = insertFront
if (start) this.start()
}
 
/**
* Initialize the socket and its handlers.
*/
start() {
//noinspection JSCheckFunctionSignatures
this._socket = new WebSocket(this.url, this.protocols)
const socketEvent = (e, func, event) => {
func(e)
this._eventBus.dispatch({event, payload: e}).promise.catch(onDispatchError)
}
this._socket.onopen = (e) => socketEvent(e, this.onOpen, this.socket_open)
this._socket.onerror = (e) => socketEvent(e, this.onError, this.socket_error)
this._socket.onclose = (e) => socketEvent(e, this.onClose, this.socket_close)
this._socket.onmessage = (event) => {
const data = this.transformMessage(event.data)
// emits change events - remove the notifier as it is redundant ?
this.store.messages = this.store.messages.insert(data, this._front)
this._eventBus.dispatch({
event: this.socket_message_received,
payload: {data, store: this.store.messages}
})
}
}
 
/**
* Send a message to the server.
* @param {string} message
*/
send(message) {
if (!this._socket || this._socket.readyState !== WebSocket.OPEN) throw new Error('Socket not open')
this._socket.send(message)
}
 
/**
* Close the internal socket.
* @param {{code: number, reason: string}} [options={code: 1000, reason: ''}] optional reason
*/
close(options={code: 1000, reason: ''}) {
if (this._socket && this._socket.readyState === WebSocket.OPEN) this._socket.close(options.code, options.reason)
}
 
/**
* Subscribe to change in the store.
* Only one event is subscribed to `${socketName}_message_received`
* @param {function(e: TEvent)} sub
*/
subscribe(sub) {
this._eventBus.addEventHandler(this.socket_message_received, sub)
}
 
/**
* Remove the handler from the internal {@link EventBus}.
* @param {!function} sub
*/
unsubscribe(sub) {
this._eventBus.removeEventHandler(this.socket_message_received, sub)
}
 
/**
* @return {string}
*/
get socket_close() { return `${this.socketName}_close` }
/**
* @return {string}
*/
get socket_open() { return `${this.socketName}_open` }
/**
* @return {string}
*/
get socket_error() { return `${this.socketName}_error` }
/**
* @return {string}
*/
get socket_message_received() { return `${this.socketName}_message_received` }
}