src/query.js
'use strict'
var _ = require('./lodash')
module.exports = function (service) {
var reductiofy = require('./reductiofy')(service)
var filters = require('./filters')(service)
var postAggregation = require('./postAggregation')(service)
var postAggregationMethods = _.keys(postAggregation)
return function doQuery(queryObj) {
var queryHash = JSON.stringify(queryObj)
// Attempt to reuse an exact copy of this query that is present elsewhere
for (var i = 0; i < service.columns.length; i++) {
for (var j = 0; j < service.columns[i].queries.length; j++) {
if (service.columns[i].queries[j].hash === queryHash) {
return new Promise(function (resolve, reject) { // eslint-disable-line no-loop-func
try {
resolve(service.columns[i].queries[j])
} catch (err) {
reject(err)
}
})
}
}
}
var query = {
// Original query passed in to query method
original: queryObj,
hash: queryHash,
}
// Default queryObj
if (_.isUndefined(query.original)) {
query.original = {}
}
// Default select
if (_.isUndefined(query.original.select)) {
query.original.select = {
$count: true,
}
}
// Default to groupAll
query.original.groupBy = query.original.groupBy || true
// Attach the query api to the query object
query = newQueryObj(query)
return createColumn(query)
.then(makeCrossfilterGroup)
.then(buildRequiredColumns)
.then(setupDataListeners)
.then(applyQuery)
function createColumn(query) {
// Ensure column is created
return service.column({
key: query.original.groupBy,
type: _.isUndefined(query.type) ? null : query.type,
array: Boolean(query.array),
})
.then(function () {
// Attach the column to the query
var column = service.column.find(query.original.groupBy)
query.column = column
column.queries.push(query)
column.removeListeners.push(function () {
return query.clear()
})
return query
})
}
function makeCrossfilterGroup(query) {
// Create the grouping on the columns dimension
// Using Promise Resolve allows support for crossfilter async
// TODO check if query already exists, and use the same base query // if possible
return Promise.resolve(query.column.dimension.group())
.then(function (g) {
query.group = g
return query
})
}
function buildRequiredColumns(query) {
var requiredColumns = filters.scanForDynamicFilters(query.original)
// We need to scan the group for any filters that would require
// the group to be rebuilt when data is added or removed in any way.
if (requiredColumns.length) {
return Promise.all(_.map(requiredColumns, function (columnKey) {
return service.column({
key: columnKey,
dynamicReference: query.group,
})
}))
.then(function () {
return query
})
}
return query
}
function setupDataListeners(query) {
// Here, we create a listener to recreate and apply the reducer to
// the group anytime underlying data changes
var stopDataListen = service.onDataChange(function () {
return applyQuery(query)
})
query.removeListeners.push(stopDataListen)
// This is a similar listener for filtering which will (if needed)
// run any post aggregations on the data after each filter action
var stopFilterListen = service.onFilter(function () {
return postAggregate(query)
})
query.removeListeners.push(stopFilterListen)
return query
}
function applyQuery(query) {
return buildReducer(query)
.then(applyReducer)
.then(attachData)
.then(postAggregate)
}
function buildReducer(query) {
return reductiofy(query.original)
.then(function (reducer) {
query.reducer = reducer
return query
})
}
function applyReducer(query) {
return Promise.resolve(query.reducer(query.group))
.then(function () {
return query
})
}
function attachData(query) {
return Promise.resolve(query.group.all())
.then(function (data) {
query.data = data
return query
})
}
function postAggregate(query) {
if (query.postAggregations.length > 1) {
// If the query is used by 2+ post aggregations, we need to lock
// it against getting mutated by the post-aggregations
query.locked = true
}
return Promise.all(_.map(query.postAggregations, function (post) {
return post()
}))
.then(function () {
return query
})
}
function newQueryObj(q, parent) {
var locked = false
if (!parent) {
parent = q
q = {}
locked = true
}
// Assign the regular query properties
_.assign(q, {
// The Universe for continuous promise chaining
universe: service,
// Crossfilter instance
crossfilter: service.cf,
// parent Information
parent: parent,
column: parent.column,
dimension: parent.dimension,
group: parent.group,
reducer: parent.reducer,
original: parent.original,
hash: parent.hash,
// It's own removeListeners
removeListeners: [],
// It's own postAggregations
postAggregations: [],
// Data method
locked: locked,
lock: lock,
unlock: unlock,
// Disposal method
clear: clearQuery,
})
_.forEach(postAggregationMethods, function (method) {
q[method] = postAggregateMethodWrap(postAggregation[method])
})
return q
function lock(set) {
if (!_.isUndefined(set)) {
q.locked = Boolean(set)
return
}
q.locked = true
}
function unlock() {
q.locked = false
}
function clearQuery() {
_.forEach(q.removeListeners, function (l) {
l()
})
return new Promise(function (resolve, reject) {
try {
resolve(q.group.dispose())
} catch (err) {
reject(err)
}
})
.then(function () {
q.column.queries.splice(q.column.queries.indexOf(q), 1)
// Automatically recycle the column if there are no queries active on it
if (!q.column.queries.length) {
return service.clear(q.column.key)
}
})
.then(function () {
return service
})
}
function postAggregateMethodWrap(postMethod) {
return function () {
var args = Array.prototype.slice.call(arguments)
var sub = {}
newQueryObj(sub, q)
args.unshift(sub, q)
q.postAggregations.push(function () {
Promise.resolve(postMethod.apply(null, args))
.then(postAggregateChildren)
})
return Promise.resolve(postMethod.apply(null, args))
.then(postAggregateChildren)
function postAggregateChildren() {
return postAggregate(sub)
.then(function () {
return sub
})
}
}
}
}
}
}