api/src/data/resolvers/mutations/conversations.ts
import * as strip from 'strip';
import * as _ from 'underscore';
import {
Configs,
Conformities,
ConversationMessages,
Conversations,
Customers,
Integrations,
Products
} from '../../../db/models';
import { getCollection } from '../../../db/models/boardUtils';
import Messages from '../../../db/models/ConversationMessages';
import {
KIND_CHOICES,
MESSAGE_TYPES,
NOTIFICATION_CONTENT_TYPES,
NOTIFICATION_TYPES
} from '../../../db/models/definitions/constants';
import { IMessageDocument } from '../../../db/models/definitions/conversationMessages';
import { IConversationDocument } from '../../../db/models/definitions/conversations';
import { IUserDocument } from '../../../db/models/definitions/users';
import { debugError } from '../../../debuggers';
import messageBroker from '../../../messageBroker';
import { graphqlPubsub } from '../../../pubsub';
import { AUTO_BOT_MESSAGES, RABBITMQ_QUEUES } from '../../constants';
import {
ACTIVITY_LOG_ACTIONS,
putActivityLog,
putUpdateLog
} from '../../logUtils';
import { checkPermission, requireLogin } from '../../permissions/wrappers';
import { IContext } from '../../types';
import utils, { splitStr } from '../../utils';
import QueryBuilder, { IListArgs } from '../queries/conversationQueryBuilder';
import { itemsAdd } from './boardUtils';
import { CONVERSATION_STATUSES } from '../../../db/models/definitions/constants';
export interface IConversationMessageAdd {
conversationId: string;
content: string;
mentionedUserIds?: string[];
internal?: boolean;
attachments?: any;
facebookMessageTag?: string;
}
interface IReplyFacebookComment {
conversationId: string;
commentId: string;
content: string;
}
interface IConversationConvert {
_id: string;
type: string;
itemId: string;
stageId: string;
itemName: string;
bookingProductId?: string;
}
/**
* Send conversation to integrations
*/
const sendConversationToIntegrations = async (
type: string,
integrationId: string,
conversationId: string,
requestName: string,
doc: IConversationMessageAdd,
dataSources: any,
action?: string,
facebookMessageTag?: string
) => {
if (type === 'facebook') {
const regex = new RegExp('<img[^>]* src="([^"]*)"', 'g');
const images: string[] = (doc.content.match(regex) || []).map(m =>
m.replace(regex, '$1')
);
images.forEach(img => {
doc.attachments.push({ type: 'image', url: img });
});
const content = strip(doc.content);
try {
await messageBroker().sendRPCMessage(
RABBITMQ_QUEUES.RPC_API_TO_INTEGRATIONS,
{
action,
type,
payload: JSON.stringify({
integrationId,
conversationId,
content: content.replace(/&/g, '&'),
attachments: doc.attachments || [],
tag: facebookMessageTag
})
}
);
} catch (e) {
throw new Error(
`Your message not sent Error: ${e.message}. Go to integrations list and fix it`
);
}
}
if (dataSources && dataSources.IntegrationsAPI && requestName) {
return dataSources.IntegrationsAPI[requestName]({
conversationId,
integrationId,
content: strip(doc.content),
attachments: doc.attachments || []
});
}
};
/**
* conversation notrification receiver ids
*/
export const conversationNotifReceivers = (
conversation: IConversationDocument,
currentUserId: string,
exclude: boolean = true
): string[] => {
let userIds: string[] = [];
// assigned user can get notifications
if (conversation.assignedUserId) {
userIds.push(conversation.assignedUserId);
}
// participated users can get notifications
if (
conversation.participatedUserIds &&
conversation.participatedUserIds.length > 0
) {
userIds = _.union(userIds, conversation.participatedUserIds);
}
// exclude current user
if (exclude) {
userIds = _.without(userIds, currentUserId);
}
return userIds;
};
/**
* Using this subscription to track conversation detail's assignee, tag, status
* changes
*/
export const publishConversationsChanged = (
_ids: string[],
type: string
): string[] => {
for (const _id of _ids) {
graphqlPubsub.publish('conversationChanged', {
conversationChanged: { conversationId: _id, type }
});
}
return _ids;
};
/**
* Publish admin's message
*/
export const publishMessage = async (
message: IMessageDocument,
customerId?: string
) => {
graphqlPubsub.publish('conversationMessageInserted', {
conversationMessageInserted: message
});
// widget is listening for this subscription to show notification
// customerId available means trying to notify to client
if (customerId) {
const unreadCount = await Messages.widgetsGetUnreadMessagesCount(
message.conversationId
);
graphqlPubsub.publish('conversationAdminMessageInserted', {
conversationAdminMessageInserted: {
customerId,
unreadCount
}
});
}
};
const sendNotifications = async ({
user,
conversations,
type,
mobile,
messageContent
}: {
user: IUserDocument;
conversations: IConversationDocument[];
type: string;
mobile?: boolean;
messageContent?: string;
}) => {
for (const conversation of conversations) {
const doc = {
createdUser: user,
link: `/inbox/index?_id=${conversation._id}`,
title: 'Conversation updated',
content: messageContent
? messageContent
: conversation.content || 'Conversation updated',
notifType: type,
receivers: conversationNotifReceivers(conversation, user._id),
action: 'updated conversation',
contentType: NOTIFICATION_CONTENT_TYPES.CONVERSATION,
contentTypeId: conversation._id
};
switch (type) {
case NOTIFICATION_TYPES.CONVERSATION_ADD_MESSAGE:
doc.action = `sent you a message`;
doc.receivers = conversationNotifReceivers(conversation, user._id);
break;
case NOTIFICATION_TYPES.CONVERSATION_ASSIGNEE_CHANGE:
doc.action = 'has assigned you to conversation ';
break;
case 'unassign':
doc.notifType = NOTIFICATION_TYPES.CONVERSATION_ASSIGNEE_CHANGE;
doc.action = 'has removed you from conversation';
break;
case NOTIFICATION_TYPES.CONVERSATION_STATE_CHANGE:
doc.action = `changed conversation status to ${(
conversation.status || ''
).toUpperCase()}`;
break;
}
await utils.sendNotification(doc);
if (mobile) {
// send mobile notification ======
try {
await utils.sendMobileNotification({
title: doc.title,
body: strip(doc.content),
receivers: conversationNotifReceivers(conversation, user._id, false),
customerId: conversation.customerId,
conversationId: conversation._id
});
} catch (e) {
debugError(`Failed to send mobile notification: ${e.message}`);
}
}
}
};
const getConversationById = async selector => {
const oldConversations = await Conversations.find(selector).lean();
const oldConversationById = {};
for (const conversation of oldConversations) {
oldConversationById[conversation._id] = conversation;
}
return { oldConversationById, oldConversations };
};
// check booking convert
const checkBookingConvert = async (productId: string) => {
const product = await Products.getProduct({ _id: productId });
let dealUOM = await Configs.find({ code: 'dealUOM' }).distinct('value');
let dealCurrency = await Configs.find({
code: 'dealCurrency'
}).distinct('value');
if (dealUOM.length > 0) {
dealUOM = dealUOM[0];
} else {
throw new Error('Please choose UNIT OF MEASUREMENT from general settings!');
}
if (dealCurrency.length > 0) {
dealCurrency = dealCurrency[0];
} else {
throw new Error('Please choose currency from general settings!');
}
return {
product,
dealUOM,
dealCurrency
};
};
const conversationMutations = {
/**
* Create new message in conversation
*/
async conversationMessageAdd(
_root,
doc: IConversationMessageAdd,
{ user, dataSources }: IContext
) {
const conversation = await Conversations.getConversation(
doc.conversationId
);
const integration = await Integrations.getIntegration({
_id: conversation.integrationId
});
await sendNotifications({
user,
conversations: [conversation],
type: NOTIFICATION_TYPES.CONVERSATION_ADD_MESSAGE,
mobile: true,
messageContent: doc.content
});
// do not send internal message to third service integrations
if (doc.internal) {
const messageObj = await ConversationMessages.addMessage(doc, user._id);
// publish new message to conversation detail
publishMessage(messageObj);
return messageObj;
}
const kind = integration.kind;
const integrationId = integration.id;
const conversationId = conversation.id;
const facebookMessageTag = doc.facebookMessageTag;
const customer = await Customers.findOne({ _id: conversation.customerId });
// if conversation's integration kind is form then send reply to
// customer's email
const email = customer ? customer.primaryEmail : '';
if (kind === KIND_CHOICES.LEAD && email) {
utils.sendEmail({
toEmails: [email],
title: 'Reply',
template: {
data: doc.content
}
});
}
let requestName;
let type;
let action;
if (kind === KIND_CHOICES.FACEBOOK_POST) {
type = 'facebook';
action = 'reply-post';
return sendConversationToIntegrations(
type,
integrationId,
conversationId,
requestName,
doc,
dataSources,
action
);
}
const message = await ConversationMessages.addMessage(doc, user._id);
/**
* Send SMS only when:
* - integration is of kind telnyx
* - customer has primary phone filled
* - customer's primary phone is valid
*/
if (
kind === KIND_CHOICES.TELNYX &&
customer &&
customer.primaryPhone &&
customer.phoneValidationStatus === 'valid'
) {
/**
* SMS part is limited to 160 characters, so we split long content by 160 characters.
* See below for details.
* https://developers.telnyx.com/docs/v2/messaging/configuration-and-limitations/character-and-rate-limits
*/
const chunks =
doc.content.length > 160 ? splitStr(doc.content, 160) : [doc.content];
for (let i = 0; i < chunks.length; i++) {
await messageBroker().sendMessage(
'erxes-api:integrations-notification',
{
action: 'sendConversationSms',
payload: JSON.stringify({
conversationMessageId: `${message._id}-part${i + 1}`,
conversationId,
integrationId,
toPhone: customer.primaryPhone,
content: strip(chunks[i])
})
}
);
}
}
// send reply to facebook
if (kind === KIND_CHOICES.FACEBOOK_MESSENGER) {
type = 'facebook';
action = 'reply-messenger';
}
// send reply to chatfuel
if (kind === KIND_CHOICES.CHATFUEL) {
requestName = 'replyChatfuel';
}
if (kind === KIND_CHOICES.TWITTER_DM) {
requestName = 'replyTwitterDm';
}
if (kind.includes('smooch')) {
requestName = 'replySmooch';
}
// send reply to whatsapp
if (kind === KIND_CHOICES.WHATSAPP) {
requestName = 'replyWhatsApp';
}
await sendConversationToIntegrations(
type,
integrationId,
conversationId,
requestName,
doc,
dataSources,
action,
facebookMessageTag
);
const dbMessage = await ConversationMessages.getMessage(message._id);
await utils.sendToWebhook('create', 'userMessages', dbMessage);
// Publishing both admin & client
publishMessage(dbMessage, conversation.customerId);
return dbMessage;
},
async conversationsReplyFacebookComment(
_root,
doc: IReplyFacebookComment,
{ user, dataSources }: IContext
) {
const conversation = await Conversations.getConversation(
doc.conversationId
);
const integration = await Integrations.getIntegration({
_id: conversation.integrationId
});
await sendNotifications({
user,
conversations: [conversation],
type: NOTIFICATION_TYPES.CONVERSATION_ADD_MESSAGE,
mobile: true,
messageContent: doc.content
});
const requestName = 'replyFacebookPost';
const integrationId = integration.id;
const conversationId = doc.commentId;
const type = 'facebook';
const action = 'reply-post';
await sendConversationToIntegrations(
type,
integrationId,
conversationId,
requestName,
doc,
dataSources,
action
);
},
async conversationsChangeStatusFacebookComment(
_root,
doc: IReplyFacebookComment,
{ dataSources }: IContext
) {
const requestName = 'replyFacebookPost';
const type = 'facebook';
const action = 'change-status-comment';
const conversationId = doc.commentId;
doc.content = '';
return sendConversationToIntegrations(
type,
'',
conversationId,
requestName,
doc,
dataSources,
action
);
},
/**
* Assign employee to conversation
*/
async conversationsAssign(
_root,
{
conversationIds,
assignedUserId
}: { conversationIds: string[]; assignedUserId: string },
{ user }: IContext
) {
const { oldConversationById } = await getConversationById({
_id: { $in: conversationIds }
});
const conversations: IConversationDocument[] = await Conversations.assignUserConversation(
conversationIds,
assignedUserId
);
// notify graphl subscription
publishConversationsChanged(conversationIds, 'assigneeChanged');
await sendNotifications({
user,
conversations,
type: NOTIFICATION_TYPES.CONVERSATION_ASSIGNEE_CHANGE
});
for (const conversation of conversations) {
await putUpdateLog(
{
type: 'conversation',
description: 'assignee Changed',
object: oldConversationById[conversation._id],
newData: { assignedUserId },
updatedDocument: conversation
},
user
);
}
return conversations;
},
/**
* Unassign employee from conversation
*/
async conversationsUnassign(
_root,
{ _ids }: { _ids: string[] },
{ user }: IContext
) {
const {
oldConversations,
oldConversationById
} = await getConversationById({ _id: { $in: _ids } });
const updatedConversations = await Conversations.unassignUserConversation(
_ids
);
await sendNotifications({
user,
conversations: oldConversations,
type: 'unassign'
});
// notify graphl subscription
publishConversationsChanged(_ids, 'assigneeChanged');
for (const conversation of updatedConversations) {
await putUpdateLog(
{
type: 'conversation',
description: 'unassignee',
object: oldConversationById[conversation._id],
newData: { assignedUserId: '' },
updatedDocument: conversation
},
user
);
}
return updatedConversations;
},
/**
* Change conversation status
*/
async conversationsChangeStatus(
_root,
{ _ids, status }: { _ids: string[]; status: string },
{ user }: IContext
) {
const { oldConversationById } = await getConversationById({
_id: { $in: _ids }
});
await Conversations.changeStatusConversation(_ids, status, user._id);
// notify graphl subscription
publishConversationsChanged(_ids, status);
const updatedConversations = await Conversations.find({
_id: { $in: _ids }
});
await sendNotifications({
user,
conversations: updatedConversations,
type: NOTIFICATION_TYPES.CONVERSATION_STATE_CHANGE
});
for (const conversation of updatedConversations) {
await putUpdateLog(
{
type: 'conversation',
description: 'change status',
object: oldConversationById[conversation._id],
newData: { status },
updatedDocument: conversation
},
user
);
}
return updatedConversations;
},
/**
* Resolve all conversations
*/
async conversationResolveAll(_root, params: IListArgs, { user }: IContext) {
// initiate query builder
const qb = new QueryBuilder(params, { _id: user._id });
await qb.buildAllQueries();
const query = qb.mainQuery();
const { oldConversationById } = await getConversationById(query);
const param = {
status: CONVERSATION_STATUSES.CLOSED,
closedUserId: user._id,
closedAt: new Date()
};
const updated = await Conversations.resolveAllConversation(query, param);
const updatedConversations = await Conversations.find({
_id: { $in: Object.keys(oldConversationById) }
}).lean();
for (const conversation of updatedConversations) {
await putUpdateLog(
{
type: 'conversation',
description: 'resolve all',
object: oldConversationById[conversation._id],
newData: param,
updatedDocument: conversation
},
user
);
}
return updated.nModified || 0;
},
/**
* Conversation mark as read
*/
async conversationMarkAsRead(
_root,
{ _id }: { _id: string },
{ user }: IContext
) {
return Conversations.markAsReadConversation(_id, user._id);
},
async conversationDeleteVideoChatRoom(
_root,
{ name },
{ dataSources }: IContext
) {
try {
return await dataSources.IntegrationsAPI.deleteDailyVideoChatRoom(name);
} catch (e) {
debugError(e.message);
throw new Error(e.message);
}
},
async conversationCreateVideoChatRoom(
_root,
{ _id },
{ dataSources, user }: IContext
) {
let message;
try {
const doc = {
conversationId: _id,
internal: false,
contentType: MESSAGE_TYPES.VIDEO_CALL
};
message = await ConversationMessages.addMessage(doc, user._id);
const videoCallData = await dataSources.IntegrationsAPI.createDailyVideoChatRoom(
{
erxesApiConversationId: _id,
erxesApiMessageId: message._id
}
);
const updatedMessage = { ...message._doc, videoCallData };
// publish new message to conversation detail
publishMessage(updatedMessage);
return videoCallData;
} catch (e) {
debugError(e.message);
await ConversationMessages.deleteOne({ _id: message._id });
throw new Error(e.message);
}
},
async changeConversationOperator(
_root,
{ _id, operatorStatus }: { _id: string; operatorStatus: string }
) {
const message = await Messages.createMessage({
conversationId: _id,
botData: [
{
type: 'text',
text: AUTO_BOT_MESSAGES.CHANGE_OPERATOR
}
]
});
graphqlPubsub.publish('conversationMessageInserted', {
conversationMessageInserted: message
});
return Conversations.updateOne({ _id }, { $set: { operatorStatus } });
},
async conversationsSaveVideoRecordingInfo(
_root,
{
conversationId,
recordingId
}: { conversationId: string; recordingId: string },
{ dataSources }: IContext
) {
try {
const response = await dataSources.IntegrationsAPI.saveDailyRecordingInfo(
{
erxesApiConversationId: conversationId,
recordingId
}
);
return response.status;
} catch (e) {
debugError(e);
throw new Error(e.message);
}
},
async conversationConvertToCard(
_root,
params: IConversationConvert,
{ user, docModifier }: IContext
) {
const { _id, type, itemId, itemName, stageId, bookingProductId } = params;
const conversation = await Conversations.getConversation(_id);
const { collection, update, create } = getCollection(type);
if (itemId) {
const oldItem = await collection.findOne({ _id: itemId }).lean();
if (bookingProductId) {
const { product, dealUOM, dealCurrency } = await checkBookingConvert(
bookingProductId
);
oldItem.productsData.push({
productId: product._id,
unitPrice: product.unitPrice,
uom: dealUOM,
currency: dealCurrency,
quantity: product.productCount
});
}
const doc = oldItem;
if (conversation.assignedUserId) {
const assignedUserIds = oldItem.assignedUserIds || [];
assignedUserIds.push(conversation.assignedUserId);
doc.assignedUserIds = assignedUserIds;
}
const sourceConversationIds: string[] =
oldItem.sourceConversationIds || [];
sourceConversationIds.push(conversation._id);
doc.sourceConversationIds = sourceConversationIds;
const item = await update(oldItem._id, doc);
item.userId = user._id;
await putActivityLog({
action: ACTIVITY_LOG_ACTIONS.CREATE_BOARD_ITEM,
data: { item, contentType: type }
});
const relTypeIds: string[] = [];
sourceConversationIds.forEach(async conversationId => {
const con = await Conversations.getConversation(conversationId);
if (con.customerId) {
relTypeIds.push(con.customerId);
}
});
if (conversation.customerId) {
await Conformities.addConformity({
mainType: type,
mainTypeId: item._id,
relType: 'customer',
relTypeId: conversation.customerId
});
}
return item._id;
} else {
const doc: any = {};
doc.name = itemName;
doc.stageId = stageId;
doc.sourceConversationIds = [_id];
doc.customerIds = [conversation.customerId];
doc.assignedUserIds = [conversation.assignedUserId];
if (bookingProductId) {
const { product, dealUOM, dealCurrency } = await checkBookingConvert(
bookingProductId
);
doc.productsData = [
{
productId: product._id,
unitPrice: product.unitPrice,
uom: dealUOM,
currency: dealCurrency,
quantity: product.productCount
}
];
}
const item = await itemsAdd(doc, type, create, user, docModifier);
return item._id;
}
},
async conversationEditCustomFields(
_root,
{ _id, customFieldsData }: { _id: string; customFieldsData: any }
) {
await Conversations.updateConversation(_id, { customFieldsData });
return Conversations.getConversation(_id);
}
};
requireLogin(conversationMutations, 'conversationMarkAsRead');
requireLogin(conversationMutations, 'conversationDeleteVideoChatRoom');
requireLogin(conversationMutations, 'conversationCreateVideoChatRoom');
requireLogin(conversationMutations, 'conversationsSaveVideoRecordingInfo');
requireLogin(conversationMutations, 'conversationConvertToCard');
checkPermission(
conversationMutations,
'conversationMessageAdd',
'conversationMessageAdd'
);
checkPermission(
conversationMutations,
'conversationsAssign',
'assignConversation'
);
checkPermission(
conversationMutations,
'conversationsUnassign',
'assignConversation'
);
checkPermission(
conversationMutations,
'conversationsChangeStatus',
'changeConversationStatus'
);
checkPermission(
conversationMutations,
'conversationResolveAll',
'conversationResolveAll'
);
export default conversationMutations;