apps/meteor/app/ui-utils/client/lib/RoomHistoryManager.ts
import type { IMessage, IRoom, ISubscription } from '@rocket.chat/core-typings';
import { Emitter } from '@rocket.chat/emitter';
import differenceInMilliseconds from 'date-fns/differenceInMilliseconds';
import { ReactiveVar } from 'meteor/reactive-var';
import { Tracker } from 'meteor/tracker';
import type { MutableRefObject } from 'react';
import { v4 as uuidv4 } from 'uuid';
import type { MinimongoCollection } from '../../../../client/definitions/MinimongoCollection';
import { onClientMessageReceived } from '../../../../client/lib/onClientMessageReceived';
import { callWithErrorHandling } from '../../../../client/lib/utils/callWithErrorHandling';
import { getConfig } from '../../../../client/lib/utils/getConfig';
import { waitForElement } from '../../../../client/lib/utils/waitForElement';
import { ChatMessage, ChatSubscription } from '../../../models/client';
import { getUserPreference } from '../../../utils/client';
export async function upsertMessage(
{
msg,
subscription,
}: {
msg: IMessage & { ignored?: boolean };
subscription?: ISubscription;
},
collection: MinimongoCollection<IMessage> = ChatMessage,
) {
const userId = msg.u?._id;
if (subscription?.ignored?.includes(userId)) {
msg.ignored = true;
}
if (msg.t === 'e2e' && !msg.file) {
msg.e2e = 'pending';
}
msg = (await onClientMessageReceived(msg)) || msg;
const { _id } = msg;
return collection.upsert({ _id }, msg);
}
export function upsertMessageBulk(
{ msgs, subscription }: { msgs: IMessage[]; subscription?: ISubscription },
collection: MinimongoCollection<IMessage> = ChatMessage,
) {
const { queries } = collection;
collection.queries = [];
msgs.forEach((msg, index) => {
if (index === msgs.length - 1) {
collection.queries = queries;
}
void upsertMessage({ msg, subscription }, collection);
});
}
const defaultLimit = parseInt(getConfig('roomListLimit') ?? '50') || 50;
const waitAfterFlush = (fn: () => void) => setTimeout(() => Tracker.afterFlush(fn), 10);
class RoomHistoryManagerClass extends Emitter {
private lastRequest?: Date;
private histories: Record<
IRoom['_id'],
{
hasMore: ReactiveVar<boolean>;
hasMoreNext: ReactiveVar<boolean>;
isLoading: ReactiveVar<boolean>;
unreadNotLoaded: ReactiveVar<number>;
firstUnread: ReactiveVar<IMessage | undefined>;
loaded: number | undefined;
oldestTs?: Date;
}
> = {};
private requestsList: string[] = [];
public getRoom(rid: IRoom['_id']) {
if (!this.histories[rid]) {
this.histories[rid] = {
hasMore: new ReactiveVar(true),
hasMoreNext: new ReactiveVar(false),
isLoading: new ReactiveVar(false),
unreadNotLoaded: new ReactiveVar(0),
firstUnread: new ReactiveVar(undefined),
loaded: undefined,
};
}
return this.histories[rid];
}
private async queue(): Promise<void> {
return new Promise((resolve) => {
const requestId = uuidv4();
const done = () => {
this.lastRequest = new Date();
resolve();
};
if (this.requestsList.length === 0) {
return this.run(done);
}
this.requestsList.push(requestId);
this.once(requestId, done);
});
}
private run(fn: () => void) {
const difference = this.lastRequest ? differenceInMilliseconds(new Date(), this.lastRequest) : Infinity;
if (difference > 500) {
return fn();
}
return setTimeout(fn, 500 - difference);
}
private unqueue() {
const requestId = this.requestsList.pop();
if (!requestId) {
return;
}
this.run(() => this.emit(requestId));
}
public async getMore(rid: IRoom['_id'], limit = defaultLimit): Promise<void> {
const room = this.getRoom(rid);
if (Tracker.nonreactive(() => room.hasMore.get()) !== true) {
return;
}
room.isLoading.set(true);
await this.queue();
let ls = undefined;
const subscription = ChatSubscription.findOne({ rid });
if (subscription) {
({ ls } = subscription);
}
const showThreadsInMainChannel = getUserPreference(Meteor.userId(), 'showThreadsInMainChannel', false);
const result = await callWithErrorHandling(
'loadHistory',
rid,
room.oldestTs,
limit,
ls ? String(ls) : undefined,
showThreadsInMainChannel,
);
if (!result) {
throw new Error('loadHistory returned nothing');
}
this.unqueue();
let previousHeight: number | undefined;
let scroll: number | undefined;
const { messages = [] } = result;
room.unreadNotLoaded.set(result.unreadNotLoaded);
room.firstUnread.set(result.firstUnread);
if (messages.length > 0) {
room.oldestTs = messages[messages.length - 1].ts;
}
const wrapper = await waitForElement('.messages-box .wrapper .rc-scrollbars-view');
if (wrapper) {
previousHeight = wrapper.scrollHeight;
scroll = wrapper.scrollTop;
}
upsertMessageBulk({
msgs: messages.filter((msg) => msg.t !== 'command'),
subscription,
});
if (!room.loaded) {
room.loaded = 0;
}
const visibleMessages = messages.filter((msg) => !msg.tmid || showThreadsInMainChannel || msg.tshow);
room.loaded += visibleMessages.length;
if (messages.length < limit) {
room.hasMore.set(false);
}
if (room.hasMore.get() && (visibleMessages.length === 0 || room.loaded < limit)) {
return this.getMore(rid);
}
waitAfterFlush(() => {
this.emit('loaded-messages');
const heightDiff = wrapper.scrollHeight - (previousHeight ?? NaN);
wrapper.scrollTop = (scroll ?? NaN) + heightDiff;
});
room.isLoading.set(false);
}
public async getMoreNext(rid: IRoom['_id'], atBottomRef: MutableRefObject<boolean>) {
const room = this.getRoom(rid);
if (Tracker.nonreactive(() => room.hasMoreNext.get()) !== true) {
return;
}
await this.queue();
atBottomRef.current = false;
room.isLoading.set(true);
const lastMessage = ChatMessage.findOne({ rid, _hidden: { $ne: true } }, { sort: { ts: -1 } });
const subscription = ChatSubscription.findOne({ rid });
if (lastMessage?.ts) {
const { ts } = lastMessage;
const result = await callWithErrorHandling('loadNextMessages', rid, ts, defaultLimit);
upsertMessageBulk({
msgs: Array.from(result.messages).filter((msg) => msg.t !== 'command'),
subscription,
});
room.isLoading.set(false);
if (!room.loaded) {
room.loaded = 0;
}
room.loaded += result.messages.length;
if (result.messages.length < defaultLimit) {
room.hasMoreNext.set(false);
}
}
this.unqueue();
}
public hasMore(rid: IRoom['_id']) {
const room = this.getRoom(rid);
return room.hasMore.get();
}
public hasMoreNext(rid: IRoom['_id']) {
const room = this.getRoom(rid);
return room.hasMoreNext.get();
}
public getMoreIfIsEmpty(rid: IRoom['_id']) {
const room = this.getRoom(rid);
if (room.loaded === undefined) {
return this.getMore(rid);
}
}
public isLoading(rid: IRoom['_id']) {
const room = this.getRoom(rid);
return room.isLoading.get();
}
public async clear(rid: IRoom['_id']) {
const room = this.getRoom(rid);
ChatMessage.remove({ rid });
room.isLoading.set(true);
room.hasMore.set(true);
room.hasMoreNext.set(false);
room.oldestTs = undefined;
room.loaded = undefined;
}
public async getSurroundingMessages(message?: Pick<IMessage, '_id' | 'rid'> & { ts?: Date }) {
if (!message?.rid) {
return;
}
const messageAlreadyLoaded = Boolean(ChatMessage.findOne({ _id: message._id, _hidden: { $ne: true } }));
if (messageAlreadyLoaded) {
return;
}
const room = this.getRoom(message.rid);
void this.clear(message.rid);
const subscription = ChatSubscription.findOne({ rid: message.rid });
const result = await callWithErrorHandling('loadSurroundingMessages', message, defaultLimit);
if (!result) {
return;
}
upsertMessageBulk({ msgs: Array.from(result.messages).filter((msg) => msg.t !== 'command'), subscription });
Tracker.afterFlush(async () => {
this.emit('loaded-messages');
room.isLoading.set(false);
});
if (!room.loaded) {
room.loaded = 0;
}
room.loaded += result.messages.length;
room.hasMore.set(result.moreBefore);
room.hasMoreNext.set(result.moreAfter);
}
}
export const RoomHistoryManager = new RoomHistoryManagerClass();