api/src/data/modules/segments/queryBuilder.ts
import * as _ from 'underscore';
import {
Boards,
Conformities,
Pipelines,
Segments,
Stages
} from '../../../db/models';
import {
SEGMENT_DATE_OPERATORS,
SEGMENT_NUMBER_OPERATORS
} from '../../../db/models/definitions/constants';
import { ICondition, ISegment } from '../../../db/models/definitions/segments';
import { fetchElk, fetchElkScroll } from '../../../elasticsearch';
import { getEsTypes } from '../coc/utils';
type IOptions = {
associatedCustomers?: boolean;
returnFields?: string[];
returnFullDoc?: boolean;
returnSelector?: boolean;
returnCount?: boolean;
defaultMustSelector?: any[];
pipelineId?: string;
page?: number;
perPage?: number;
sortField?: string;
sortDirection?: number;
scroll?: boolean;
};
export const isInSegment = async (
segmentId: string,
idToCheck: string,
options: IOptions = {}
): Promise<boolean> => {
options.returnCount = true;
options.defaultMustSelector = [
{
match: {
_id: idToCheck
}
}
];
const segment = await Segments.getSegment(segmentId);
const count = await fetchSegment(segment, options);
return count > 0;
};
export const fetchSegment = async (
segment,
options: IOptions = {}
): Promise<any> => {
const { contentType } = segment;
let index = getIndexByContentType(contentType);
let selector = { bool: {} };
await generateQueryBySegment({
segment,
selector: selector.bool,
options,
isInitialCall: true
});
if (
['company', 'deal', 'task', 'ticket'].includes(contentType) &&
options.associatedCustomers
) {
index = 'customers';
const itemsResponse = await fetchElk({
action: 'search',
index: getIndexByContentType(segment.contentType),
body: {
query: selector,
_source: '_id'
},
defaultValue: { hits: { hits: [] } }
});
const items = itemsResponse.hits.hits;
const itemIds = items.map(i => i._id);
const customerIds = await Conformities.filterConformity({
mainType: segment.contentType,
mainTypeIds: itemIds,
relType: 'customer'
});
selector = {
bool: {
must: [
{
terms: {
_id: customerIds
}
}
]
}
};
}
if (options.returnSelector) {
return selector;
}
// count entries
if (options.returnCount) {
const countResponse = await fetchElk({
action: 'count',
index,
body: {
query: selector
},
defaultValue: { count: -1 }
});
return countResponse.count;
}
const { sortField, sortDirection, page, perPage } = options;
let pagination = {};
if (page && perPage) {
pagination = {
from: (page - 1) * perPage,
size: perPage
};
}
if (sortField && sortDirection) {
pagination = {
...pagination,
sort: {
[sortField]: {
order: sortDirection
? sortDirection === -1
? 'desc'
: 'asc'
: 'desc'
}
}
};
}
const fetchOptions: any = {
action: 'search',
index,
body: {
_source: options.returnFields || options.returnFullDoc || false,
query: selector,
...pagination
},
defaultValue: { hits: { hits: [] } }
};
if (options.scroll && options.perPage) {
// keep the search results "scrollable" for 1 minute
fetchOptions.scroll = '1m';
fetchOptions.size = perPage;
const results: any[] = [];
const resp: any[] = [];
const initialResponse = await fetchElk(fetchOptions);
resp.push(initialResponse);
while (resp.length) {
const { hits = {} } = resp.shift();
if (hits.hits) {
hits.hits.forEach(hit => {
results.push(hit._id);
});
}
/* istanbul ignore next */
if (hits.total && hits.total.value === results.length) {
// check to see if we have collected all the documents
break;
}
/* istanbul ignore next */
if (initialResponse._scroll_id) {
// get the next response if there are more to fetch
resp.push(await fetchElkScroll(initialResponse._scroll_id));
}
}
return results;
}
const response = await fetchElk(fetchOptions);
if (options.returnFullDoc || options.returnFields) {
return response.hits.hits.map(hit => ({ _id: hit._id, ...hit._source }));
}
return response.hits.hits.map(hit => hit._id);
};
export const generateQueryBySegment = async (args: {
segment: ISegment;
selector: any;
options?: IOptions;
isInitialCall?: boolean;
}) => {
const { segment, selector, options = {}, isInitialCall } = args;
const { contentType } = segment;
const { defaultMustSelector } = options;
const typesMap = getEsTypes(contentType);
const defaultSelector =
isInitialCall && defaultMustSelector
? defaultMustSelector.map(s => ({ ...s }))
: [];
const cj = segment.conditionsConjunction || 'and';
if (cj === 'and') {
selector.must = defaultSelector;
selector.must_not = [];
} else {
selector.must = [
{
bool: {
should: [],
must_not: []
}
},
...defaultSelector
];
}
const selectorPositiveList =
cj === 'and' ? selector.must : selector.must[0].bool.should;
const selectorNegativeList =
cj === 'and' ? selector.must_not : selector.must[0].bool.must_not;
const parentSegment = await Segments.findOne({ _id: segment.subOf });
if (parentSegment && (!segment._id || segment._id !== parentSegment._id)) {
selectorPositiveList.push({ bool: {} });
await generateQueryBySegment({
...args,
selector: selectorPositiveList[selectorPositiveList.length - 1].bool,
segment: parentSegment,
isInitialCall: false
});
}
const eventPositive: any = [];
const eventNegative: any = [];
const propertiesPositive: any = [];
const propertiesNegative: any = [];
if (['customer', 'lead', 'visitor'].includes(contentType)) {
propertiesNegative.push({
term: {
status: 'deleted'
}
});
}
if (['deal', 'task', 'ticket'].includes(contentType)) {
const stageIds = await generateConditionStageIds({
boardId: segment.boardId,
pipelineId: segment.pipelineId,
options
});
if (stageIds.length > 0) {
propertiesPositive.push({ terms: { stageId: stageIds } });
}
}
const propertyConditions: ICondition[] = [];
const eventConditions: ICondition[] = [];
const conditions = segment.conditions || [];
for (const condition of conditions) {
if (condition.type === 'property') {
if (condition.propertyType !== 'form_submission') {
propertyConditions.push(condition);
} else {
const formFieldCondition = { ...condition };
formFieldCondition.propertyName = 'formFieldId';
formFieldCondition.propertyValue = condition.propertyName;
if (
condition.propertyOperator &&
['is', 'ins'].indexOf(condition.propertyOperator) <= 0
) {
formFieldCondition.propertyOperator = 'e';
condition.propertyName = 'value';
propertyConditions.push(condition);
}
propertyConditions.push(formFieldCondition);
}
}
if (condition.type === 'event') {
eventConditions.push(condition);
}
if (
condition.type === 'subSegment' &&
(condition.subSegmentId || condition.subSegmentForPreview)
) {
let subSegment = condition.subSegmentForPreview;
if (condition.subSegmentId) {
subSegment = await Segments.getSegment(condition.subSegmentId);
}
selectorPositiveList.push({ bool: {} });
await generateQueryBySegment({
...args,
segment: subSegment || ({} as ISegment),
selector: selectorPositiveList[selectorPositiveList.length - 1].bool,
isInitialCall: false
});
}
}
for (const condition of propertyConditions) {
const field = condition.propertyName;
if (field && condition.propertyType) {
let [positiveQuery, negativeQuery] = elkConvertConditionToQuery({
field,
type: typesMap[field],
operator: condition.propertyOperator || '',
value: condition.propertyValue || ''
});
negativeQuery = negativeQuery;
if (isCardTyped(condition.propertyType)) {
const stageIds = await generateConditionStageIds({
boardId: condition.boardId,
pipelineId: condition.pipelineId
});
if (stageIds.length > 0) {
positiveQuery = {
bool: {
must: [
positiveQuery,
{
terms: {
stageId: stageIds
}
}
]
}
};
}
}
if (contentType === condition.propertyType) {
if (positiveQuery) {
propertiesPositive.push(positiveQuery);
}
if (negativeQuery) {
propertiesNegative.push(negativeQuery);
}
} else {
const ids = await associationPropertyFilter({
mainType: contentType,
propertyType: condition.propertyType,
positiveQuery,
negativeQuery
});
propertiesPositive.push({
terms: {
_id: ids
}
});
}
}
}
for (const condition of eventConditions) {
const {
eventOccurence,
eventName,
eventOccurenceValue,
eventAttributeFilters = []
} = condition;
eventPositive.push({
term: {
name: eventName
}
});
if (eventOccurence === 'exactly') {
eventPositive.push({
term: {
count: eventOccurenceValue
}
});
}
if (eventOccurence === 'atleast') {
eventPositive.push({
range: {
count: {
gte: eventOccurenceValue
}
}
});
}
if (eventOccurence === 'atmost') {
eventPositive.push({
range: {
count: {
lte: eventOccurenceValue
}
}
});
}
for (const filter of eventAttributeFilters) {
const [positiveQuery, negativeQuery] = elkConvertConditionToQuery({
field: `attributes.${filter.name}`,
operator: filter.operator,
value: filter.value
});
if (positiveQuery) {
eventPositive.push(positiveQuery);
}
if (negativeQuery) {
eventNegative.push(negativeQuery);
}
}
if (eventPositive.length > 0 || eventNegative.length > 0) {
const idsByEvents = await fetchByQuery({
index: 'events',
_source: contentType === 'company' ? 'companyId' : 'customerId',
positiveQuery: eventPositive,
negativeQuery: eventNegative
});
propertiesPositive.push({
terms: {
_id: idsByEvents
}
});
}
}
selectorPositiveList.push(...propertiesPositive);
selectorNegativeList.push(...propertiesNegative);
};
export const generateNestedQuery = (
kind: string,
field: string,
operator: string,
query: any
) => {
const fieldKey = field.replace(`${kind}.`, '');
let fieldValue = 'value';
if (SEGMENT_NUMBER_OPERATORS.includes(operator)) {
fieldValue = 'numberValue';
}
if (SEGMENT_DATE_OPERATORS.includes(operator)) {
fieldValue = 'dateValue';
}
let updatedQuery = query;
updatedQuery = JSON.stringify(updatedQuery).replace(
`${kind}.${fieldKey}`,
`${kind}.${fieldValue}`
);
updatedQuery = JSON.parse(updatedQuery);
return {
nested: {
path: kind,
query: {
bool: {
must: [
{
term: {
[`${kind}.field`]: fieldKey
}
},
updatedQuery
]
}
}
}
};
};
export function elkConvertConditionToQuery(args: {
field: string;
type?: any;
operator: string;
value: string;
}) {
const { field, type, operator, value } = args;
const fixedValue = (value || '').includes('now')
? value
: value.toLocaleLowerCase();
let positiveQuery;
let negativeQuery;
// equal
if (['e', 'numbere'].includes(operator)) {
if (['keyword', 'email'].includes(type) || operator === 'numbere') {
positiveQuery = {
term: { [field]: value }
};
} else {
positiveQuery = {
match_phrase: { [field]: value }
};
}
}
// does not equal
if (['dne', 'numberdne'].includes(operator)) {
if (['keyword', 'email'].includes(type) || operator === 'numberdne') {
negativeQuery = {
term: { [field]: value }
};
} else {
negativeQuery = {
match_phrase: { [field]: value }
};
}
}
// contains
if (operator === 'c') {
positiveQuery = {
wildcard: {
[field]: `*${fixedValue}*`
}
};
}
// does not contains
if (operator === 'dnc') {
negativeQuery = {
wildcard: {
[field]: `*${fixedValue}*`
}
};
}
// greater than equal
if (['igt', 'numberigt', 'dateigt'].includes(operator)) {
positiveQuery = {
range: {
[field]: {
gte: fixedValue
}
}
};
}
// less then equal
if (['ilt', 'numberilt', 'dateilt'].includes(operator)) {
positiveQuery = {
range: {
[field]: {
lte: fixedValue
}
}
};
}
// is true
if (operator === 'it') {
positiveQuery = {
term: {
[field]: true
}
};
}
// is false
if (operator === 'if') {
positiveQuery = {
term: {
[field]: false
}
};
}
// is set
if (operator === 'is') {
positiveQuery = {
exists: {
field
}
};
}
// is not set
if (operator === 'ins') {
negativeQuery = {
exists: {
field
}
};
}
if (['woam', 'wobm', 'woad', 'wobd'].includes(operator)) {
let gte = '';
let lte = '';
// will occur after on following n-th minute
if (operator === 'woam') {
gte = `now-${fixedValue}m/m`;
lte = `now-${fixedValue}m/m`;
}
// will occur before on following n-th minute
if (operator === 'wobm') {
gte = `now+${fixedValue}m/m`;
lte = `now+${fixedValue}m/m`;
}
// will occur after on following n-th day
if (operator === 'woad') {
gte = `now-${fixedValue}d/d`;
lte = `now-${fixedValue}d/d`;
}
// will occur before on following n-th day
if (operator === 'wobd') {
gte = `now+${fixedValue}d/d`;
lte = `now+${fixedValue}d/d`;
}
positiveQuery = { range: { [field]: { gte, lte } } };
}
// date relative less than
if (operator === 'drlt') {
positiveQuery = { range: { [field]: { lte: fixedValue } } };
}
// date relative greater than
if (operator === 'drgt') {
positiveQuery = { range: { [field]: { gte: fixedValue } } };
}
for (const nestedType of ['customFieldsData', 'trackedData', 'attributes']) {
if (field.includes(nestedType)) {
if (positiveQuery) {
positiveQuery = generateNestedQuery(
nestedType,
field,
operator,
positiveQuery
);
}
if (negativeQuery) {
negativeQuery = generateNestedQuery(
nestedType,
field,
operator,
negativeQuery
);
}
}
}
return [positiveQuery, negativeQuery];
}
export const getIndexByContentType = (contentType: string) => {
let index = 'customers';
if (contentType === 'company') {
index = 'companies';
}
if (contentType === 'deal') {
index = 'deals';
}
if (contentType === 'task') {
index = 'tasks';
}
if (contentType === 'ticket') {
index = 'tickets';
}
if (contentType === 'conversation') {
index = 'conversations';
}
if (contentType === 'user') {
index = 'users';
}
return index;
};
const fetchByQuery = async ({
index,
positiveQuery,
negativeQuery,
_source = '_id'
}: {
index: string;
_source?: string;
positiveQuery: any;
negativeQuery: any;
}) => {
const response = await fetchElk({
action: 'search',
index,
body: {
_source,
query: {
bool: {
must: positiveQuery,
must_not: negativeQuery
}
}
},
defaultValue: { hits: { hits: [] } }
});
return response.hits.hits.map(hit =>
_source === '_id' ? hit._id : hit._source[_source]
);
};
const associationPropertyFilter = async ({
mainType,
propertyType,
positiveQuery,
negativeQuery
}: {
mainType: string;
propertyType: string;
positiveQuery: any;
negativeQuery: any;
}) => {
let associatedTypes: string[] = [];
if (['customer', 'lead'].includes(mainType)) {
associatedTypes = ['company', 'deal', 'ticket', 'task'];
}
if (mainType === 'company') {
associatedTypes = ['customer', 'deal', 'ticket', 'task'];
}
if (mainType === 'deal') {
associatedTypes = ['customer', 'company', 'ticket', 'task'];
}
if (mainType === 'task') {
associatedTypes = ['customer', 'company', 'ticket', 'deal'];
}
if (mainType === 'ticket') {
associatedTypes = ['customer', 'company', 'deal', 'task'];
}
if (associatedTypes.includes(propertyType)) {
const mainTypeIds = await fetchByQuery({
index: getIndexByContentType(propertyType),
positiveQuery,
negativeQuery
});
return Conformities.filterConformity({
mainType: propertyType,
mainTypeIds,
relType: mainType === 'lead' ? 'customer' : mainType
});
}
if (propertyType === 'form_submission') {
return fetchByQuery({
index: 'form_submissions',
_source: 'customerId',
positiveQuery,
negativeQuery
});
}
};
const generateConditionStageIds = async ({
boardId,
pipelineId,
options
}: {
boardId?: string;
pipelineId?: string;
options?: IOptions;
}) => {
let pipelineIds: string[] = [];
if (options && options.pipelineId) {
pipelineIds = [options.pipelineId];
}
if (boardId && (!options || !options.pipelineId)) {
const board = await Boards.getBoard(boardId);
const pipelines = await Pipelines.find(
{
_id: {
$in: pipelineId ? [pipelineId] : board.pipelines || []
}
},
{ _id: 1 }
);
pipelineIds = pipelines.map(p => p._id);
}
const stages = await Stages.find({ pipelineId: pipelineIds }, { _id: 1 });
return stages.map(s => s._id);
};
const isCardTyped = (type: string) => ['deal', 'task', 'ticket'].includes(type);