NodeBB/NodeBB

View on GitHub
src/database/postgres/main.js

Summary

Maintainability
C
1 day
Test Coverage
'use strict';

module.exports = function (module) {
    const helpers = require('./helpers');

    module.flushdb = async function () {
        await module.pool.query(`DROP SCHEMA "public" CASCADE`);
        await module.pool.query(`CREATE SCHEMA "public"`);
    };

    module.emptydb = async function () {
        await module.pool.query(`DELETE FROM "legacy_object"`);
    };

    module.exists = async function (key) {
        if (!key) {
            return;
        }

        // Redis/Mongo consider empty zsets as non-existent, match that behaviour
        const type = await module.type(key);
        if (type === 'zset') {
            if (Array.isArray(key)) {
                const members = await Promise.all(key.map(key => module.getSortedSetRange(key, 0, 0)));
                return members.map(member => member.length > 0);
            }
            const members = await module.getSortedSetRange(key, 0, 0);
            return members.length > 0;
        }

        if (Array.isArray(key)) {
            const res = await module.pool.query({
                name: 'existsArray',
                text: `
                SELECT o."_key" k
                  FROM "legacy_object_live" o
                 WHERE o."_key" = ANY($1::TEXT[])`,
                values: [key],
            });
            return key.map(k => res.rows.some(r => r.k === k));
        }
        const res = await module.pool.query({
            name: 'exists',
            text: `
            SELECT EXISTS(SELECT *
                    FROM "legacy_object_live"
                   WHERE "_key" = $1::TEXT
                   LIMIT 1) e`,
            values: [key],
        });
        return res.rows[0].e;
    };

    module.scan = async function (params) {
        let { match } = params;
        if (match.startsWith('*')) {
            match = `%${match.substring(1)}`;
        }
        if (match.endsWith('*')) {
            match = `${match.substring(0, match.length - 1)}%`;
        }

        const res = await module.pool.query({
            text: `
        SELECT o."_key"
        FROM "legacy_object_live" o
        WHERE o."_key" LIKE '${match}'`,
        });

        return res.rows.map(r => r._key);
    };

    module.delete = async function (key) {
        if (!key) {
            return;
        }

        await module.pool.query({
            name: 'delete',
            text: `
DELETE FROM "legacy_object"
 WHERE "_key" = $1::TEXT`,
            values: [key],
        });
    };

    module.deleteAll = async function (keys) {
        if (!Array.isArray(keys) || !keys.length) {
            return;
        }

        await module.pool.query({
            name: 'deleteAll',
            text: `
DELETE FROM "legacy_object"
 WHERE "_key" = ANY($1::TEXT[])`,
            values: [keys],
        });
    };

    module.get = async function (key) {
        if (!key) {
            return;
        }

        const res = await module.pool.query({
            name: 'get',
            text: `
SELECT s."data" t
  FROM "legacy_object_live" o
 INNER JOIN "legacy_string" s
         ON o."_key" = s."_key"
        AND o."type" = s."type"
 WHERE o."_key" = $1::TEXT
 LIMIT 1`,
            values: [key],
        });

        return res.rows.length ? res.rows[0].t : null;
    };

    module.mget = async function (keys) {
        if (!keys || !Array.isArray(keys) || !keys.length) {
            return [];
        }

        const res = await module.pool.query({
            name: 'mget',
            text: `
SELECT s."data", s."_key"
  FROM "legacy_object_live" o
 INNER JOIN "legacy_string" s
         ON o."_key" = s."_key"
        AND o."type" = s."type"
 WHERE o."_key" = ANY($1::TEXT[])
 LIMIT 1`,
            values: [keys],
        });
        const map = {};
        res.rows.forEach((d) => {
            map[d._key] = d.data;
        });
        return keys.map(k => (map.hasOwnProperty(k) ? map[k] : null));
    };


    module.set = async function (key, value) {
        if (!key) {
            return;
        }

        await module.transaction(async (client) => {
            await helpers.ensureLegacyObjectType(client, key, 'string');
            await client.query({
                name: 'set',
                text: `
INSERT INTO "legacy_string" ("_key", "data")
VALUES ($1::TEXT, $2::TEXT)
ON CONFLICT ("_key")
DO UPDATE SET "data" = $2::TEXT`,
                values: [key, value],
            });
        });
    };

    module.increment = async function (key) {
        if (!key) {
            return;
        }

        return await module.transaction(async (client) => {
            await helpers.ensureLegacyObjectType(client, key, 'string');
            const res = await client.query({
                name: 'increment',
                text: `
INSERT INTO "legacy_string" ("_key", "data")
VALUES ($1::TEXT, '1')
ON CONFLICT ("_key")
DO UPDATE SET "data" = ("legacy_string"."data"::NUMERIC + 1)::TEXT
RETURNING "data" d`,
                values: [key],
            });
            return parseFloat(res.rows[0].d);
        });
    };

    module.rename = async function (oldKey, newKey) {
        await module.transaction(async (client) => {
            await client.query({
                name: 'deleteRename',
                text: `
    DELETE FROM "legacy_object"
     WHERE "_key" = $1::TEXT`,
                values: [newKey],
            });
            await client.query({
                name: 'rename',
                text: `
UPDATE "legacy_object"
SET "_key" = $2::TEXT
WHERE "_key" = $1::TEXT`,
                values: [oldKey, newKey],
            });
        });
    };

    module.type = async function (key) {
        const res = await module.pool.query({
            name: 'type',
            text: `
SELECT "type"::TEXT t
  FROM "legacy_object_live"
 WHERE "_key" = $1::TEXT
 LIMIT 1`,
            values: [key],
        });

        return res.rows.length ? res.rows[0].t : null;
    };

    async function doExpire(key, date) {
        await module.pool.query({
            name: 'expire',
            text: `
UPDATE "legacy_object"
   SET "expireAt" = $2::TIMESTAMPTZ
 WHERE "_key" = $1::TEXT`,
            values: [key, date],
        });
    }

    module.expire = async function (key, seconds) {
        await doExpire(key, new Date(((Date.now() / 1000) + seconds) * 1000));
    };

    module.expireAt = async function (key, timestamp) {
        await doExpire(key, new Date(timestamp * 1000));
    };

    module.pexpire = async function (key, ms) {
        await doExpire(key, new Date(Date.now() + parseInt(ms, 10)));
    };

    module.pexpireAt = async function (key, timestamp) {
        await doExpire(key, new Date(timestamp));
    };

    async function getExpire(key) {
        const res = await module.pool.query({
            name: 'ttl',
            text: `
SELECT "expireAt"::TEXT
  FROM "legacy_object"
 WHERE "_key" = $1::TEXT
 LIMIT 1`,
            values: [key],
        });

        return res.rows.length ? new Date(res.rows[0].expireAt).getTime() : null;
    }

    module.ttl = async function (key) {
        return Math.round((await getExpire(key) - Date.now()) / 1000);
    };

    module.pttl = async function (key) {
        return await getExpire(key) - Date.now();
    };
};