rescribet/link-lib

View on GitHub
src/RDFStore.ts

Summary

Maintainability
D
2 days
Test Coverage
A
92%
import rdfFactory, {
  DataFactory,
  Feature,
  NamedNode,
  Quad,
  QuadPosition,
  Quadruple,
  Term,
} from "@ontologies/core";
import * as ld from "@ontologies/ld";
import * as rdf from "@ontologies/rdf";
import * as rdfs from "@ontologies/rdfs";

import { equals, id } from "./factoryHelpers";
import ll from "./ontology/ll";
import {
    OptionalNamedNode,
    OptionalNode,
    OptionalTerm,
} from "./rdf";
import { deltaProcessor } from "./store/deltaProcessor";
import RDFIndex from "./store/RDFIndex";
import { ChangeBuffer, DeltaProcessor, SomeNode, StoreProcessor } from "./types";
import { doc, getPropBestLang, sortByBestLang } from "./utilities";
import { addChangeBufferCallbacks } from "./utilities/monkeys";

const EMPTY_ST_ARR: ReadonlyArray<Quad> = Object.freeze([]);

export interface RDFStoreOpts {
    deltaProcessorOpts?: { [k: string]: NamedNode[] };
    innerStore?: RDFIndex;
}

export interface Funlets {
    allRDFPropertyStatements: (obj: Quad[] | undefined, predicate: SomeNode) => Quad[];
    flushFilter: (changeStamp: number) => (s: Quad) => boolean;
    /** Builds a cache of types per resource. Can be omitted when compiled against a well known service. */
    processTypeQuad: (quad: Quad) => boolean;
    replaceQuadsPicker: (replacement: Quad) => ({ subject, predicate }: Quad) => boolean;
}
const memberPrefix = rdf.ns("_").value;

/**
 * Provides a clean consistent interface to stored (RDF) data.
 */
export class RDFStore implements ChangeBuffer, DeltaProcessor {
    public changeBuffer: Quad[] = new Array(100);
    public changeBufferCount: number = 0;
    /**
     * Record of the last time a resource was flushed.
     *
     * @note Not to be confused with the last change in the store, which might be later than the flush time.
     */
    public changeTimestamps: number[] = [];
    public typeCache: NamedNode[][] = [];
    public langPrefs: string[] = Array.from(typeof navigator !== "undefined"
        ? (navigator.languages || [navigator.language])
        : ["en"]);

    private deltas: Quadruple[][] = [];
    private deltaProcessor: StoreProcessor;
    private store: RDFIndex = new RDFIndex();
    private funlets: Funlets;

    public get rdfFactory(): DataFactory {
        return rdfFactory;
    }
    public set rdfFactory(_: DataFactory) {
        throw new Error("Factory is global (see @ontologies/core)");
    }

    constructor({ deltaProcessorOpts, innerStore }: RDFStoreOpts = {}) {
        this.processDelta = this.processDelta.bind(this);

        const g = innerStore || new RDFIndex();
        this.store = addChangeBufferCallbacks(g, this);

        const defaults =  {
            addGraphIRIS: [ll.add, ld.add],
            purgeGraphIRIS: [ll.purge, ld.purge],
            removeGraphIRIS: [ll.remove, ld.remove],
            replaceGraphIRIS: [
                ll.replace,
                ld.replace,
                rdfFactory.defaultGraph(),
            ],
            sliceGraphIRIS: [ll.slice, ld.slice],
        };

        this.deltaProcessor = deltaProcessor(
            deltaProcessorOpts?.addGraphIRIS || defaults.addGraphIRIS,
            deltaProcessorOpts?.replaceGraphIRIS || defaults.replaceGraphIRIS,
            deltaProcessorOpts?.removeGraphIRIS || defaults.removeGraphIRIS,
            deltaProcessorOpts?.purgeGraphIRIS || defaults.purgeGraphIRIS,
            deltaProcessorOpts?.sliceGraphIRIS || defaults.sliceGraphIRIS,
        )(this.store);

        const defaultFunlets: Funlets = Object.freeze({
            allRDFPropertyStatements: (obj: Quad[] | undefined, predicate: SomeNode): Quad[] => {
                if (typeof obj === "undefined") {
                    return [];
                }

                if (equals(predicate, rdfs.member)) {
                    return obj.filter((s) =>
                        equals(s.predicate, rdfs.member) || s.predicate.value.startsWith(memberPrefix));
                }

                const t = [];
                for (let i = 0; i < obj.length; i++) {
                    if (equals(predicate, obj[i].predicate)) {
                        t.push(obj[i]);
                    }
                }
                return t;
            },
            flushFilter: (changeStamp: number): any => (s: Quad): boolean => {
                this.changeTimestamps[id(s.subject)] = changeStamp;
                this.changeTimestamps[id(doc(s.subject))] = changeStamp;
                return equals(s.predicate, rdf.type);
            },
            processTypeQuad: (quad: Quad): boolean => {
                if (!equals(quad.predicate, rdf.type)) {
                    return false;
                }
                const subjId = id(this.canon(quad.subject));
                if (!Array.isArray(this.typeCache[subjId])) {
                    this.typeCache[subjId] = [];
                }
                this.typeCache[subjId] = this.quadsFor((quad.subject as NamedNode))
                    .filter((s) => equals(s.predicate, rdf.type))
                    .map((s) => s.object as NamedNode);
                return false;
            },
            replaceQuadsPicker: (replacement: Quad): any => ({ subject, predicate }: Quad): boolean =>
                equals(subject, replacement.subject) && equals(predicate, replacement.predicate),
        });

        const idFunlets: Funlets = Object.freeze({
            allRDFPropertyStatements: (obj: Quad[] | undefined, predicate: SomeNode): Quad[] => {
                if (typeof obj === "undefined") {
                    return [];
                }

                if (predicate === rdfs.member) {
                    return obj.filter((s) => s.predicate === rdfs.member || s.predicate.value.startsWith(memberPrefix));
                }

                const t = [];
                for (let i = 0; i < obj.length; i++) {
                    if (equals(predicate, obj[i].predicate)) {
                        t.push(obj[i]);
                    }
                }
                return t;
            },
            flushFilter: (changeStamp: number): any => (s: Quad): boolean => {
                this.changeTimestamps[s.subject.id as number] = changeStamp;
                this.changeTimestamps[doc(s.subject).id as number] = changeStamp;
                return s.predicate === rdf.type;
            },
            processTypeQuad: (quad: Quad): boolean => {
                if (quad.predicate !== rdf.type) {
                    return false;
                }
                const subjId = this.canon(quad.subject).id as number;
                if (!Array.isArray(this.typeCache[subjId])) {
                    this.typeCache[subjId] = [];
                }
                this.typeCache[subjId] = this.quadsFor((quad.subject as NamedNode))
                    .filter((s) => s.predicate === rdf.type)
                    .map((s) => s.object as NamedNode);
                return false;
            },
            replaceQuadsPicker: (replacement: Quad): any => ({ subject, predicate }: Quad): boolean =>
                subject === replacement.subject && predicate === replacement.predicate,
        });

        this.funlets = rdfFactory.supports[Feature.identity] ? idFunlets : defaultFunlets;

        g.addDataCallback(this.funlets.processTypeQuad.bind(this));
    }

    /**
     * Add statements to the store.
     * @param data Data to parse and add to the store.
     */
    public addQuads(data: Quad[]): Quad[] {
        if (!Array.isArray(data)) {
            throw new TypeError("An array of quads must be passed to addQuads");
        }

        return data.map((q) => this.store.add(q.subject, q.predicate, q.object, q.graph));
    }

    public addQuadruples(data: Quadruple[]): Quad[] {
        const statements = new Array(data.length);
        for (let i = 0, len = data.length; i < len; i++) {
            statements[i] = this.store.add(data[i][0], data[i][1], data[i][2], data[i][3]);
        }

        return statements;
    }

    public canon<T = Term>(term: T): T {
        return this.store.canon(term);
    }

    public defaultGraph(): SomeNode {
        return rdfFactory.defaultGraph();
    }

    /**
     * Find the first quad matching the given arguments.
     * Use null or undefined as a wild-card.
     */
    public find(subj: OptionalNode,
                pred: OptionalNamedNode,
                obj: OptionalTerm,
                graph: OptionalNode): Quad | undefined {
        return this.match(subj, pred, obj, graph, true)[0];
    }

    /**
     * Flushes the change buffer to the return value.
     * @return Statements held in memory since the last flush.
     */
    public flush(): Quad[] {
        const deltas = this.deltas;
        this.deltas = [];

        for (let i = 0; i < deltas.length; i++) {
            this.processDelta(deltas[i]);
        }

        if (this.changeBufferCount === 0) {
            return EMPTY_ST_ARR as Quad[];
        }
        const processingBuffer = this.changeBuffer;
        this.changeBuffer = new Array(100);
        this.changeBufferCount = 0;
        const changeStamp = Date.now();
        processingBuffer
            .filter(this.funlets.flushFilter(changeStamp))
            .map(this.funlets.processTypeQuad);

        return processingBuffer;
    }

    /** @private */
    public getInternalStore(): RDFIndex {
        return this.store;
    }

    public match(subj: OptionalNode,
                 pred: OptionalNamedNode,
                 obj: OptionalTerm,
                 graph: OptionalNode,
                 justOne: boolean = false): Quad[] {
        return this.store.match(subj, pred, obj, graph, justOne) || [];
    }

    public processDelta(delta: Quadruple[]): Quad[] {
        const [
            addables,
            replacables,
            removables,
        ] = this.deltaProcessor(delta);

        this.removeQuads(removables);

        return this.replaceMatches(replacables).concat(this.addQuadruples(addables));
    }

    public removeResource(subject: SomeNode): void {
        const canSubj = this.canon(subject);
        this.touch(canSubj);
        this.typeCache[id(canSubj)] = [];
        this.removeQuads(this.quadsFor(subject));
    }

    public removeQuads(statements: Quad[]): void {
        this.store.removeQuads(statements);
    }

    /**
     * Removes an array of statements and inserts another.
     * Note: Be sure that the replacement contains the same subjects as the original to let the
     *  broadcast work correctly.
     * @access private This is in conflict with the typescript declaration due to the development of some experimental
     *                  features, but this method shouldn't be used nevertheless.
     * @param original The statements to remove from the store.
     * @param replacements The statements to add to the store.
     */
    public replaceQuads(original: Quad[], replacements: Quad[]): Quad[] {
        const uniqueStatements = new Array(replacements.length).filter(Boolean);
        for (const replacement of replacements) {
            const cond = original.some(this.funlets.replaceQuadsPicker);
            if (!cond) {
                uniqueStatements.push(replacement);
            }
        }

        this.removeQuads(original);
        // Remove statements not in the old object. Useful for replacing data loosely related to the main resource.
        for (let i = 0; i < uniqueStatements.length; i++) {
            this.store.removeMatches(
                uniqueStatements[i].subject,
                uniqueStatements[i].predicate,
                null,
                null,
            );
        }

        return this.addQuads(replacements);
    }

    public replaceMatches(statements: Quadruple[]): Quad[] {
        for (let i = 0; i < statements.length; i++) {
            this.removeQuads(this.match(
                statements[i][0],
                statements[i][1],
                null,
                null,
            ));
        }

        return this.addQuadruples(statements);
    }

    public getResourcePropertyRaw(subject: SomeNode, property: SomeNode | SomeNode[]): Quad[] {
        const props = this.quadsFor(subject);
        const allProps = this.funlets.allRDFPropertyStatements;
        let matched: Quad[] = [];

        if (Array.isArray(property)) {
            for (let i = 0; i < property.length; i++) {
                matched = matched.concat(allProps(props, property[i]));
            }
        } else {
            matched = allProps(props, property);
        }

        if (matched.length === 0) {
            return EMPTY_ST_ARR as Quad[];
        }

        return sortByBestLang(matched, this.langPrefs);
    }

    public getResourceProperties<TT extends Term = Term>(subject: SomeNode, property: SomeNode | SomeNode[]): TT[] {
        if (property === rdf.type) {
            return (this.typeCache[id(this.canon(subject))] || []) as TT[];
        }

        return this
            .getResourcePropertyRaw(subject, property)
            .map((s) => s.object as TT);
    }

    public getResourceProperty<T extends Term = Term>(
        subject: SomeNode,
        property: SomeNode | SomeNode[],
    ): T | undefined {

        if (!Array.isArray(property) && equals(property, rdf.type)) {
            const entry = this.typeCache[id(this.canon(subject))];

            return entry ? entry[0] as T : undefined;
        }
        const rawProp = this.getResourcePropertyRaw(subject, property);
        if (rawProp.length === 0) {
            return undefined;
        }

        return getPropBestLang<T>(rawProp, this.langPrefs);
    }

    public queueDelta(delta: Quadruple[]): void {
        this.deltas.push(delta);
    }

    /**
     * Searches the store for all the quads on {iri} (so not all statements relating to {iri}).
     * @param subject The identifier of the resource.
     */
    public quadsFor(subject: SomeNode): Quad[] {
        const sId = id(this.store.canon(subject));

        return typeof this.store.indices[QuadPosition.subject][sId] !== "undefined"
            ? this.store.indices[QuadPosition.subject][sId]
            : EMPTY_ST_ARR as Quad[];
    }

    public touch(iri: SomeNode): void {
        this.changeTimestamps[id(iri)] = Date.now();
        this.changeBuffer.push(rdfFactory.quad(iri, ll.nop, ll.nop));
        this.changeBufferCount++;
    }

    public workAvailable(): number {
        return this.deltas.length + this.changeBufferCount;
    }
}