nuts-foundation/go-leia

View on GitHub
index.go

Summary

Maintainability
A
1 hr
Test Coverage
A
96%
/*
 * go-leia
 * Copyright (C) 2021 Nuts community
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */

package leia

import (
    "bytes"
    "errors"

    "go.etcd.io/bbolt"
)

// Index describes an index. An index is based on a json path and has a path.
// The path is used for storage but also as identifier in search options.
type Index interface {
    // Name returns the path of this index
    Name() string
    // Add indexes the document. It uses a sub-bucket of the given bucket.
    // It will only be indexed if the complete index matches.
    Add(bucket *bbolt.Bucket, ref Reference, doc Document) error
    // Delete document from the index
    Delete(bucket *bbolt.Bucket, ref Reference, doc Document) error
    // IsMatch determines if this index can be used for the given query. The higher the return value, the more likely it is useful.
    // return values lie between 0.0 and 1.0, where 1.0 is the most useful.
    IsMatch(query Query) float64
    // Iterate over the key/value pairs given a query. Entries that match the query are passed to the iteratorFn.
    // it will not filter out double values
    Iterate(bucket *bbolt.Bucket, query Query, fn iteratorFn) error
    // BucketName returns the bucket path for this index
    BucketName() []byte
    // QueryPartsOutsideIndex selects the queryParts that are not covered by the index.
    QueryPartsOutsideIndex(query Query) []QueryPart
    // Depth returns the number of indexed fields
    Depth() int
    // Keys returns the scalars found in the document at the location specified by the FieldIndexer
    Keys(fi FieldIndexer, document Document) ([]Scalar, error)
}

// iteratorFn defines a function that is used as a callback when an IterateIndex query finds results. The function is called for each result entry.
// the key will be the indexed value and the value will contain an Entry
type iteratorFn DocumentWalker

type index struct {
    name       string
    indexParts []FieldIndexer
    collection Collection
}

func (i *index) Name() string {
    return i.name
}

func (i *index) BucketName() []byte {
    return []byte(i.Name())
}

func (i *index) Depth() int {
    return len(i.indexParts)
}

func (i *index) Add(bucket *bbolt.Bucket, ref Reference, doc Document) error {
    cBucket, _ := bucket.CreateBucketIfNotExists(i.BucketName())
    return i.addDocumentR(cBucket, i.indexParts, Key{}, ref, doc)
}

// addDocumentR, like Add but recursive
func (i *index) addDocumentR(bucket *bbolt.Bucket, parts []FieldIndexer, cKey Key, ref Reference, doc Document) error {
    // current part
    ip := parts[0]

    matches, err := i.Keys(ip, doc)
    if err != nil {
        return err
    }

    // exit condition
    if len(parts) == 1 {
        // all matches to be added to current bucket
        for _, m := range matches {
            key := ComposeKey(cKey, m.Bytes())
            _ = addRefToBucket(bucket, key, ref)
        }
        if len(matches) == 0 {
            key := ComposeKey(cKey, []byte{})
            _ = addRefToBucket(bucket, key, ref)
        }
        return nil
    }

    // continue recursion
    for _, m := range matches {
        nKey := ComposeKey(cKey, m.Bytes())
        if err = i.addDocumentR(bucket, parts[1:], nKey, ref, doc); err != nil {
            return err
        }
    }

    // no matches for the document and this part of the index
    // add key with an empty byte slice as value
    if len(matches) == 0 {
        nKey := ComposeKey(cKey, []byte{})
        return i.addDocumentR(bucket, parts[1:], nKey, ref, doc)
    }

    return nil
}

// removeDocumentR, like Delete but recursive
func (i *index) removeDocumentR(bucket *bbolt.Bucket, parts []FieldIndexer, cKey Key, ref Reference, doc Document) error {
    // current part
    ip := parts[0]

    matches, err := i.Keys(ip, doc)
    if err != nil {
        return err
    }

    // exit condition
    if len(parts) == 1 {
        for _, m := range matches {
            key := ComposeKey(cKey, m.Bytes())
            _ = removeRefFromBucket(bucket, key, ref)
        }
        return nil
    }

    // continue recursion
    for _, m := range matches {
        nKey := ComposeKey(cKey, m.Bytes())
        return i.removeDocumentR(bucket, parts[1:], nKey, ref, doc)
    }

    // no matches for the document and this part of the index
    return nil
}

func (i *index) Delete(bucket *bbolt.Bucket, ref Reference, doc Document) error {
    cBucket := bucket.Bucket(i.BucketName())
    if cBucket == nil {
        return nil
    }

    return i.removeDocumentR(cBucket, i.indexParts, Key{}, ref, doc)
}

// addRefToBucket adds the reference to the correct key in the bucket. It handles multiple reference on the same location
func addRefToBucket(bucket *bbolt.Bucket, key Key, ref Reference) error {
    // first check if there's a sub-bucket
    subBucket, err := bucket.CreateBucketIfNotExists(key)
    if err != nil {
        return err
    }
    return subBucket.Put(ref, []byte{})
}

// removeRefFromBucket removes the reference from the bucket. It handles multiple reference on the same location
func removeRefFromBucket(bucket *bbolt.Bucket, key Key, ref Reference) error {
    // first check if there's a sub-bucket
    subBucket := bucket.Bucket(key)
    if subBucket == nil {
        return nil
    }
    return subBucket.Delete(ref)
}

func (i *index) IsMatch(query Query) float64 {
    hitcount := 0

    parts := i.matchingParts(query)

outer:
    for thc, ip := range i.indexParts {
        for _, qp := range parts {
            if ip.Equals(qp) {
                hitcount++
            }
        }
        // if a miss is encountered, do not continue. You can't skip an index lvl
        if hitcount == thc {
            break outer
        }
    }

    return float64(hitcount) / float64(len(i.indexParts))
}

// matchingParts returns the queryParts that match the index.
// it also sorts them in the right order. If multiple matches exist a index position, the first is returned.
func (i *index) matchingParts(query Query) []QueryPart {
    var sorted = make([]QueryPart, len(i.indexParts))
outer:
    for _, qp := range query.parts {
        for j, ip := range i.indexParts {
            if ip.Equals(qp) {
                if sorted[j] == nil {
                    sorted[j] = qp
                    continue outer
                }
            }
        }
    }

    // only use till the first nil value
    for i, s := range sorted {
        if s == nil {
            sorted = sorted[:i]
            break
        }
    }
    return sorted
}

func (i *index) QueryPartsOutsideIndex(query Query) []QueryPart {
    matchingParts := i.matchingParts(query)
    resultingParts := make([]QueryPart, 0)
    visitedParts := make([]QueryPart, 0)

outer:
    for _, qp := range query.parts {
        for _, mp := range matchingParts {
            if mp.Equals(qp) {
                for _, hp := range visitedParts {
                    if hp.Equals(qp) { // already excluded once
                        resultingParts = append(resultingParts, qp)
                        continue outer
                    }
                }
                // exclude and continue
                visitedParts = append(visitedParts, mp)
                continue outer
            }
        }
        // no hit in index parts
        resultingParts = append(resultingParts, qp)
    }

    return resultingParts
}

func (i *index) Iterate(bucket *bbolt.Bucket, query Query, fn iteratorFn) error {
    var err error

    cBucket := bucket.Bucket(i.BucketName())
    if cBucket == nil {
        return err
    }

    // Sort the parts of the Query to conform to the index key building order
    sortedQueryParts := i.matchingParts(query)

    if len(sortedQueryParts) == 0 {
        return errors.New("unable to iterate over index without matching keys")
    }

    // extract tokenizer and transform to here
    matchers := i.matchers(sortedQueryParts)

    _, err = findR(cBucket.Cursor(), Key{}, matchers, fn, []byte{})
    return err
}

func (i *index) matchers(sortedQueryParts []QueryPart) []matcher {
    // extract tokenizer and transform to here
    matchers := make([]matcher, len(sortedQueryParts))
    for j, cPart := range sortedQueryParts {
        terms := make([]Scalar, 0)
        for _, token := range i.indexParts[j].Tokenize(cPart.Seek()) {
            seek := i.indexParts[j].Transform(token)
            terms = append(terms, seek)
        }
        matchers[j] = matcher{
            queryPart: cPart,
            terms:     terms,
            transform: i.indexParts[j].Transform,
        }
    }
    return matchers
}

func (i *index) Keys(j FieldIndexer, document Document) ([]Scalar, error) {
    // first get the raw values from the query path
    rawKeys, err := i.collection.ValuesAtPath(document, j.QueryPath())
    if err != nil {
        return nil, err
    }

    // run the tokenizer
    tokenized := make([]Scalar, 0)
    for _, rawKey := range rawKeys {
        tokens := j.Tokenize(rawKey)
        tokenized = append(tokenized, tokens...)
    }

    // run the transformer
    transformed := make([]Scalar, len(tokenized))
    for i, rawKey := range tokenized {
        transformed[i] = j.Transform(rawKey)
    }

    return transformed, nil
}

type matcher struct {
    queryPart QueryPart
    terms     []Scalar
    transform Transform
}

func findR(cursor *bbolt.Cursor, sKey Key, matchers []matcher, fn iteratorFn, lastCursorPosition []byte) ([]byte, error) {
    var err error
    returnKey := lastCursorPosition
    cPart := matchers[0].queryPart
    //outer:
    for _, seekTerm := range matchers[0].terms {
        // new location in cursor to skip to
        seek := ComposeKey(sKey, seekTerm.Bytes())
        condition := true

        // do not go back to prevent infinite loops. The cursor may only go forward.
        if bytes.Compare(seek, lastCursorPosition) < 0 {
            seek = lastCursorPosition
        }

        for cKey, _ := cursor.Seek(seek); cKey != nil && bytes.HasPrefix(cKey, sKey) && condition; {
            // remove prefix (+1), Split and take first
            pf := cKey[len(sKey)+1:]
            if len(sKey) == 0 {
                pf = cKey
            }
            pfk := Key(pf)
            newp := pfk.Split()[0]

            // check of current (partial) key still matches with query
            condition = cPart.Condition(newp, matchers[0].transform)
            if condition {
                if len(matchers) > 1 {
                    // (partial) key still matches, continue to next index part
                    nKey := ComposeKey(sKey, newp)
                    // on success the cursor is moved forward, the latest key is returned, continue with that key
                    // if keys haven't changed: break
                    var subKey []byte
                    subKey, err = findR(cursor, nKey, matchers[1:], fn, cKey)
                    if bytes.Equal(subKey, cKey) {
                        // the nested search could not advance the cursor, so we do it here before continuing the loop
                        cKey, _ = cursor.Next()
                        returnKey = cKey
                        continue
                    }
                    cKey = subKey
                } else {
                    // all index parts applied to key construction, retrieve results.
                    err = iterateOverDocuments(cursor, cKey, fn)
                    // this position was a success, hopefully the next as well
                    cKey, _ = cursor.Next()
                }
                if err != nil {
                    return nil, err
                }
            }
            returnKey = cKey
        }
    }
    return returnKey, nil
}

func iterateOverDocuments(cursor *bbolt.Cursor, cKey []byte, fn iteratorFn) error {
    subBucket := cursor.Bucket().Bucket(cKey)
    if subBucket != nil {
        subCursor := subBucket.Cursor()
        for k, _ := subCursor.Seek([]byte{}); k != nil; k, _ = subCursor.Next() {
            if err := fn(cKey, k); err != nil {
                return err
            }
        }
    }
    return nil
}