NodeBB/NodeBB

View on GitHub
src/database/mongo/sorted/intersect.js

Summary

Maintainability
D
1 day
Test Coverage
'use strict';

module.exports = function (module) {
    module.sortedSetIntersectCard = async function (keys) {
        if (!Array.isArray(keys) || !keys.length) {
            return 0;
        }
        const objects = module.client.collection('objects');
        const counts = await countSets(keys, 50000);
        if (counts.minCount === 0) {
            return 0;
        }
        let items = await objects.find({ _key: counts.smallestSet }, {
            projection: { _id: 0, value: 1 },
        }).batchSize(counts.minCount + 1).toArray();

        const otherSets = keys.filter(s => s !== counts.smallestSet);
        for (let i = 0; i < otherSets.length; i++) {
            /* eslint-disable no-await-in-loop */
            const query = { _key: otherSets[i], value: { $in: items.map(i => i.value) } };
            if (i === otherSets.length - 1) {
                return await objects.countDocuments(query);
            }
            items = await objects.find(query, { projection: { _id: 0, value: 1 } }).batchSize(items.length + 1).toArray();
        }
    };

    async function countSets(sets, limit) {
        const objects = module.client.collection('objects');
        const counts = await Promise.all(
            sets.map(s => objects.countDocuments({ _key: s }, {
                limit: limit || 25000,
            }))
        );
        const minCount = Math.min(...counts);
        const index = counts.indexOf(minCount);
        const smallestSet = sets[index];
        return {
            minCount: minCount,
            smallestSet: smallestSet,
        };
    }

    module.getSortedSetIntersect = async function (params) {
        params.sort = 1;
        return await getSortedSetRevIntersect(params);
    };

    module.getSortedSetRevIntersect = async function (params) {
        params.sort = -1;
        return await getSortedSetRevIntersect(params);
    };

    async function getSortedSetRevIntersect(params) {
        params.start = params.hasOwnProperty('start') ? params.start : 0;
        params.stop = params.hasOwnProperty('stop') ? params.stop : -1;
        params.weights = params.weights || [];

        params.limit = params.stop - params.start + 1;
        if (params.limit <= 0) {
            params.limit = 0;
        }
        params.counts = await countSets(params.sets);
        if (params.counts.minCount === 0) {
            return [];
        }

        const simple = params.weights.filter(w => w === 1).length === 1 && params.limit !== 0;
        if (params.counts.minCount < 25000 && simple) {
            return await intersectSingle(params);
        } else if (simple) {
            return await intersectBatch(params);
        }
        return await intersectAggregate(params);
    }

    async function intersectSingle(params) {
        const objects = module.client.collection('objects');
        const sortSet = params.sets[params.weights.indexOf(1)];
        if (sortSet === params.counts.smallestSet) {
            return await intersectBatch(params);
        }

        const cursorSmall = objects.find({ _key: params.counts.smallestSet }, {
            projection: { _id: 0, value: 1 },
        });
        if (params.counts.minCount > 1) {
            cursorSmall.batchSize(params.counts.minCount + 1);
        }
        let items = await cursorSmall.toArray();
        const project = { _id: 0, value: 1 };
        if (params.withScores) {
            project.score = 1;
        }
        const otherSets = params.sets.filter(s => s !== params.counts.smallestSet);
        // move sortSet to the end of array
        otherSets.push(otherSets.splice(otherSets.indexOf(sortSet), 1)[0]);
        for (let i = 0; i < otherSets.length; i++) {
            /* eslint-disable no-await-in-loop */
            const cursor = objects.find({ _key: otherSets[i], value: { $in: items.map(i => i.value) } });
            cursor.batchSize(items.length + 1);
            // at the last step sort by sortSet
            if (i === otherSets.length - 1) {
                cursor.project(project).sort({ score: params.sort }).skip(params.start).limit(params.limit);
            } else {
                cursor.project({ _id: 0, value: 1 });
            }
            items = await cursor.toArray();
        }
        if (!params.withScores) {
            items = items.map(i => i.value);
        }
        return items;
    }

    async function intersectBatch(params) {
        const project = { _id: 0, value: 1 };
        if (params.withScores) {
            project.score = 1;
        }
        const sortSet = params.sets[params.weights.indexOf(1)];
        const batchSize = 10000;
        const cursor = await module.client.collection('objects')
            .find({ _key: sortSet }, { projection: project })
            .sort({ score: params.sort })
            .batchSize(batchSize);

        const otherSets = params.sets.filter(s => s !== sortSet);
        let inters = [];
        let done = false;
        while (!done) {
            /* eslint-disable no-await-in-loop */
            const items = [];
            while (items.length < batchSize) {
                const nextItem = await cursor.next();
                if (!nextItem) {
                    done = true;
                    break;
                }
                items.push(nextItem);
            }

            const members = await Promise.all(otherSets.map(async (s) => {
                const data = await module.client.collection('objects').find({
                    _key: s, value: { $in: items.map(i => i.value) },
                }, {
                    projection: { _id: 0, value: 1 },
                }).batchSize(items.length + 1).toArray();
                return new Set(data.map(i => i.value));
            }));
            inters = inters.concat(items.filter(item => members.every(arr => arr.has(item.value))));
            if (inters.length >= params.stop) {
                done = true;
                inters = inters.slice(params.start, params.stop + 1);
            }
        }
        if (!params.withScores) {
            inters = inters.map(item => item.value);
        }
        return inters;
    }

    async function intersectAggregate(params) {
        const aggregate = {};

        if (params.aggregate) {
            aggregate[`$${params.aggregate.toLowerCase()}`] = '$score';
        } else {
            aggregate.$sum = '$score';
        }
        const pipeline = [{ $match: { _key: { $in: params.sets } } }];

        params.weights.forEach((weight, index) => {
            if (weight !== 1) {
                pipeline.push({
                    $project: {
                        value: 1,
                        score: {
                            $cond: {
                                if: {
                                    $eq: ['$_key', params.sets[index]],
                                },
                                then: {
                                    $multiply: ['$score', weight],
                                },
                                else: '$score',
                            },
                        },
                    },
                });
            }
        });

        pipeline.push({ $group: { _id: { value: '$value' }, totalScore: aggregate, count: { $sum: 1 } } });
        pipeline.push({ $match: { count: params.sets.length } });
        pipeline.push({ $sort: { totalScore: params.sort } });

        if (params.start) {
            pipeline.push({ $skip: params.start });
        }

        if (params.limit > 0) {
            pipeline.push({ $limit: params.limit });
        }

        const project = { _id: 0, value: '$_id.value' };
        if (params.withScores) {
            project.score = '$totalScore';
        }
        pipeline.push({ $project: project });

        let data = await module.client.collection('objects').aggregate(pipeline).toArray();
        if (!params.withScores) {
            data = data.map(item => item.value);
        }
        return data;
    }
};