dragonchain/dragonchain

View on GitHub
dragonchain/lib/database/redisearch.py

Summary

Maintainability
D
2 days
Test Coverage
C
75%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import re
import os
import enum
import json
from typing import cast, Dict, Any, Iterable, Optional, TYPE_CHECKING

import redis
import redisearch

from dragonchain.lib.database import redis as dragonchain_redis
from dragonchain.lib.interfaces import storage
from dragonchain.lib.dto import l1_block_model
from dragonchain.lib.dto import l2_block_model
from dragonchain.lib.dto import l3_block_model
from dragonchain.lib.dto import l4_block_model
from dragonchain.lib.dto import l5_block_model
from dragonchain.lib.dto import transaction_model
from dragonchain.lib.dto import smart_contract_model
from dragonchain.lib.dao import transaction_type_dao
from dragonchain.lib.dto import transaction_type_model
from dragonchain.lib import namespace
from dragonchain import logger

if TYPE_CHECKING:
    from dragonchain.lib.types import custom_index
    from dragonchain.lib.dto import model  # noqa: F401

_log = logger.get_logger()

BROADCAST_ENABLED = os.environ["BROADCAST"].lower() != "false"
LEVEL = os.environ["LEVEL"]
ENABLED = not (LEVEL != "1" and os.environ.get("USE_REDISEARCH") == "false")
if ENABLED:
    REDISEARCH_ENDPOINT = os.environ["REDISEARCH_ENDPOINT"]
    REDIS_PORT = int(os.environ["REDIS_PORT"]) or 6379

INDEX_L5_VERIFICATION_GENERATION_KEY = "dc:l5_index_generation_complete"
INDEX_GENERATION_KEY = "dc:index_generation_complete"
L5_BLOCK_MIGRATION_KEY = "dc:migrations:l5_block"
BLOCK_MIGRATION_KEY = "dc:migrations:block"
TXN_MIGRATION_KEY = "dc:migrations:txn"
L5_NODES = "dc:nodes:l5"

_escape_transformation = str.maketrans(
    {
        ",": "\\,",
        ".": "\\.",
        "<": "\\<",
        ">": "\\>",
        "{": "\\{",
        "}": "\\}",
        "[": "\\[",
        "]": "\\]",
        '"': '\\"',
        "'": "\\'",
        ":": "\\:",
        ";": "\\;",
        "!": "\\!",
        "@": "\\@",
        "#": "\\#",
        "$": "\\$",
        "%": "\\%",
        "^": "\\^",
        "&": "\\&",
        "*": "\\*",
        "(": "\\(",
        ")": "\\)",
        "-": "\\-",
        "+": "\\+",
        "=": "\\=",
        "~": "\\~",
        " ": "\\ ",
    }
)


class Indexes(enum.Enum):
    block = "bk"
    smartcontract = "sc"
    transaction = "tx"
    verification = "ver"


_redis_connection = None


def _get_redisearch_index_client(index: str) -> redisearch.Client:
    """Get an initialized redisearch client for an index
    Args:
        index: Enum for the relevant index
    Returns:
        Initialized redisearch client for given index
    """
    global _redis_connection
    if _redis_connection is None:
        if not ENABLED:
            raise RuntimeError("Redisearch was attempted to be used, but is disabled")
        _redis_connection = dragonchain_redis._initialize_redis(host=REDISEARCH_ENDPOINT, port=REDIS_PORT)
    return redisearch.Client(index, conn=_redis_connection)


def _get_custom_field_from_input(custom_index_input: "custom_index") -> redisearch.client.Field:
    input_type = custom_index_input["type"]
    field_name = custom_index_input["field_name"]
    options = custom_index_input.get("options")
    if input_type == "text":
        weight = 1.0
        sortable = False
        no_stem = False
        no_index = False
        if options:
            sortable = bool(options.get("sortable"))
            no_stem = bool(options.get("no_stem"))
            no_index = bool(options.get("no_index"))
            cust_weight = options.get("weight")
            if isinstance(cust_weight, (int, float)) and cust_weight >= 0 and cust_weight <= 1:
                weight = float(cust_weight)
        return redisearch.TextField(field_name, weight=weight, sortable=sortable, no_stem=no_stem, no_index=no_index)
    elif input_type == "tag":
        separator = ","
        no_index = False
        if options:
            separator = options.get("separator") or ","
            no_index = bool(options.get("no_index"))
        return redisearch.TagField(field_name, separator=separator, no_index=no_index)
    elif input_type == "number":
        sortable = False
        no_index = False
        if options:
            sortable = bool(options.get("sortable"))
            no_index = bool(options.get("no_index"))
        return redisearch.NumericField(field_name, sortable=sortable, no_index=no_index)
    else:
        raise RuntimeError(f"Index type {input_type} is not supported")


def get_escaped_redisearch_string(unescaped_string: str) -> str:
    return unescaped_string.translate(_escape_transformation)


def create_transaction_index(index: str, custom_indexes: Optional[Iterable["custom_index"]] = None, force: bool = True) -> None:
    """Create (and overwrite if necessary) index for a transaction type with optional custom_indexes"""
    # Delete the index with this name if necessary
    if force:
        delete_index(index)
    client = _get_redisearch_index_client(index)
    # Set standard transaction indexes
    index_fields = [
        redisearch.TextField("tag"),
        redisearch.NumericField("timestamp", sortable=True),
        redisearch.NumericField("block_id", sortable=True),
        redisearch.TagField("invoker"),
    ]
    # Add custom indexes if they exist
    if custom_indexes:
        for idx in custom_indexes:
            index_fields.append(_get_custom_field_from_input(idx))
    # Create the actual index
    client.create_index(index_fields)


def delete_index(index: str) -> None:
    """Force delete an index (and drop all its documents)"""
    client = _get_redisearch_index_client(index)
    try:
        client.drop_index()
    except redis.exceptions.ResponseError as e:
        if not str(e).startswith("Unknown Index name"):  # Don't care if the index doesn't exist when trying to delete
            raise


# Redisearch
def search(
    index: str,
    query_str: str,
    only_id: Optional[bool] = None,
    verbatim: Optional[bool] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    sort_by: Optional[str] = None,
    sort_asc: Optional[bool] = None,
) -> redisearch.Result:
    """Do a search on the redisearch indexes
    Args:
        index: The index to search
        query_str: Redisearch search query syntax: https://oss.redislabs.com/redisearch/Query_Syntax.html
        only_id: whether or not to only fetch document ids, and not their contents (default true)
        verbatim: whether or not to use stemming for query expansion in the query_str
        offset: the offset to start the query from, this can be used for pagination (defaults to 0, aka the start of the query)
        limit: the number of results to fetch from the query (can be set to 0 to simply get a count of the query results)
        sort_by: the sortable field name to sort by for this query
        sort_asc: (Only relevant if sort_by is set), sort the results in ascending order if true, descending if false
    Returns:
        redisearch result object
    """
    # Set some sensible defaults
    only_id = True if only_id is None else only_id
    verbatim = False if verbatim is None else verbatim
    offset = 0 if offset is None else offset
    limit = 10 if limit is None else limit
    sort_by = "" if sort_by is None else sort_by
    sort_asc = True if sort_asc is None else sort_asc

    client = _get_redisearch_index_client(index)
    query = redisearch.Query(query_str).paging(offset, limit)
    if only_id:
        query.no_content()
    if verbatim:
        query.verbatim()
    if sort_by:
        query.sort_by(sort_by, sort_asc)
    return client.search(query)


def get_document(index: str, doc_name: str) -> redisearch.Document:
    """Get a document by id explicitly
    Args:
        index: The index to search
        doc_name: The document to fetch
    Returns:
        A redisearch document of the fetch
    NOTE:
        This will NEVER raise any sort of not found, the returned document will simply not have any fields besides id (and .payload = None)
        If using this function to check for document existence, you MUST check yourself
    """
    client = _get_redisearch_index_client(index)
    return client.load_document(doc_name)


def get_document_count(index: str) -> int:
    """Get the number of documents for an index
    Args:
        index: The index to search
    Returns:
        Number of documents in the index
    """
    client = _get_redisearch_index_client(index)
    return int(client.info()["num_docs"])


def put_document(index: str, doc_name: str, fields: Dict[str, Any], upsert: bool = False, partial_update: bool = False) -> None:
    """Add a document to an index
    Args:
        index: The index to add the document to
        doc_name: the name of the document to add. NOTE: This must be GLOBALLY unique for all indexes
        fields: Dictionary of fields to add (according to the index). i.e. {'block_id':1234,'timestamp':54321,'tag':'sometag'} for transactions
        upsert: If false, an exception will be thrown if the document already exists. If true, existing documents will be completely overwritten
        partial_update: If true and the document already exists, only update the provided fields, merging with the index's existing fields
            Note: upsert and partial_update are mutually exclusive
    """
    client = _get_redisearch_index_client(index)
    if upsert and partial_update:
        raise RuntimeError("Upsert and partial_update are mutually exclusive")
    client.add_document(doc_name, replace=upsert or partial_update, partial=partial_update, **fields)


def put_many_documents(index: str, documents: Dict[str, Dict[str, Any]], upsert: bool = False, partial_update: bool = False) -> None:
    """Add many documents to an index at once. This should be used for efficiency when many documents will be created at once
    Args:
        index: The index to add the documents to
        documents: dictionary of document names, with a value of their field/value dictionaries.
            i.e. {'bad04998-e028-4cde-b807-4feaea4efdb8':{'block_id':1234,'timestamp':54321,'tag':'sometag'}} for a transaction
        upsert: If false, an exception will be thrown if the document already exists. If true, existing documents will be completely overwritten
        partial_update: If true and the document already exists, only update the provided fields, merging with the index's existing fields
            Note: upsert and partial_update are mutually exclusive
    """
    client = _get_redisearch_index_client(index)
    batch_indexer = client.batch_indexer(chunk_size=1000)
    if upsert and partial_update:
        raise RuntimeError("Upsert and partial_update are mutually exclusive")
    for key, value in documents.items():
        batch_indexer.add_document(key, replace=upsert or partial_update, partial=partial_update, **value)
    batch_indexer.commit()


def delete_document(index: str, doc_name: str) -> None:
    """Remove an existing document from an index
    Args:
        index: The index to remove the document from
        doc_name: The document to remove
    """
    client = _get_redisearch_index_client(index)
    client.delete_document(doc_name)


def generate_indexes_if_necessary() -> None:
    """Initialize redisearch with necessary indexes and fill them from storage if migration has not been marked as complete"""
    redisearch_redis_client = _get_redisearch_index_client("").redis
    needs_generation = not bool(redisearch_redis_client.get(INDEX_GENERATION_KEY))
    needs_l5_generation = not bool(redisearch_redis_client.get(INDEX_L5_VERIFICATION_GENERATION_KEY))
    # No-op if indexes are marked as already generated
    if not needs_generation and not needs_l5_generation:
        return

    if needs_l5_generation:
        # Create L5 verification indexes
        _generate_l5_verification_indexes()
        # Mark index generation as complete
        redisearch_redis_client.delete(L5_BLOCK_MIGRATION_KEY)
        redisearch_redis_client.set(INDEX_L5_VERIFICATION_GENERATION_KEY, "a")

    if needs_generation:
        # Create block index
        _log.info("Creating block indexes")
        _generate_block_indexes()
        # Create indexes for transactions
        _log.info("Creating transaction indexes")
        _generate_transaction_indexes()
        # Create smart contract index
        _log.info("Creating smart contract indexes")
        _generate_smart_contract_indexes()
        # Mark index generation as complete
        _log.info("Marking redisearch index generation complete")
        redisearch_redis_client.delete(BLOCK_MIGRATION_KEY)
        redisearch_redis_client.delete(TXN_MIGRATION_KEY)
        redisearch_redis_client.set(INDEX_GENERATION_KEY, "a")


def _generate_l5_verification_indexes() -> None:
    client = _get_redisearch_index_client(Indexes.verification.value)
    try:
        client.create_index(
            [
                redisearch.NumericField("block_id", sortable=True),
                redisearch.NumericField("prev_id", sortable=True),
                redisearch.NumericField("timestamp", sortable=True),
                redisearch.TagField("dc_id"),
            ]
        )
    except redis.exceptions.ResponseError as e:
        if not str(e).startswith("Index already exists"):  # We don't care if index already exists
            raise
    _log.info("Listing all blocks in storage")
    block_paths = storage.list_objects("BLOCK/")
    pattern = re.compile(r"BLOCK\/([0-9]+)-([Ll])5(.*)$")
    for block_path in block_paths:
        if LEVEL == "1" and BROADCAST_ENABLED and re.search(pattern, block_path):
            if not client.redis.sismember(L5_BLOCK_MIGRATION_KEY, block_path):
                raw_block = storage.get_json_from_object(block_path)
                block = l5_block_model.new_from_at_rest(raw_block)
                storage_location = block_path.split("/")[1]
                try:
                    put_document(Indexes.verification.value, storage_location, block.export_as_search_index())
                except redis.exceptions.ResponseError as e:
                    if not str(e).startswith("Document already exists"):
                        raise
                    else:
                        _log.info(f"Document {storage_location} already exists")
                client.redis.sadd(L5_NODES, block.dc_id)
                client.redis.sadd(L5_BLOCK_MIGRATION_KEY, block_path)
            else:
                _log.info(f"Skipping already indexed L5 block {block_path}")


def _generate_block_indexes() -> None:
    client = _get_redisearch_index_client(Indexes.block.value)
    try:
        client.create_index(
            [
                redisearch.NumericField("block_id", sortable=True),
                redisearch.NumericField("prev_id", sortable=True),
                redisearch.NumericField("timestamp", sortable=True),
            ]
        )
    except redis.exceptions.ResponseError as e:
        if not str(e).startswith("Index already exists"):  # We don't care if index already exists
            raise
    _log.info("Listing all blocks in storage")
    block_paths = storage.list_objects("BLOCK/")
    pattern = re.compile(r"BLOCK\/[0-9]+$")
    for block_path in block_paths:
        if re.search(pattern, block_path):
            # do a check to see if this block was already marked as indexed
            if not client.redis.sismember(BLOCK_MIGRATION_KEY, block_path):
                _log.info(f"Adding index for {block_path}")
                raw_block = storage.get_json_from_object(block_path)
                block = cast("model.BlockModel", None)
                if LEVEL == "1":
                    block = l1_block_model.new_from_stripped_block(raw_block)
                elif LEVEL == "2":
                    block = l2_block_model.new_from_at_rest(raw_block)
                elif LEVEL == "3":
                    block = l3_block_model.new_from_at_rest(raw_block)
                elif LEVEL == "4":
                    block = l4_block_model.new_from_at_rest(raw_block)
                elif LEVEL == "5":
                    block = l5_block_model.new_from_at_rest(raw_block)
                put_document(Indexes.block.value, block.block_id, block.export_as_search_index())
                client.redis.sadd(BLOCK_MIGRATION_KEY, block_path)
            else:
                _log.info(f"Skipping already indexed block {block_path}")


def _generate_smart_contract_indexes() -> None:
    delete_index(Indexes.smartcontract.value)  # Always generate smart contract indexes from scratch by dropping existing ones
    client = _get_redisearch_index_client(Indexes.smartcontract.value)
    client.create_index([redisearch.TagField("sc_name")])
    # Find what smart contracts exist in storage
    _log.info("Listing all smart contracts in storage")
    sc_object_paths = storage.list_objects("SMARTCONTRACT/")
    pattern = re.compile(r"SMARTCONTRACT\/.{36}\/metadata\.json$")
    for sc in sc_object_paths:
        if re.search(pattern, sc):
            sc_model = smart_contract_model.new_from_at_rest(storage.get_json_from_object(sc))
            _log.info(f"Adding index for smart contract {sc_model.id} ({sc_model.txn_type})")
            put_document(Indexes.smartcontract.value, sc_model.id, sc_model.export_as_search_index())


def _generate_transaction_indexes() -> None:  # noqa: C901
    # -- CREATE INDEXES FOR TRANSACTIONS --
    client = _get_redisearch_index_client(Indexes.transaction.value)
    try:
        client.create_index([redisearch.TagField("block_id")])  # Used for reverse-lookup of transactions by id (with no txn_type)
    except redis.exceptions.ResponseError as e:
        if not str(e).startswith("Index already exists"):  # We don't care if index already exists
            raise
    try:
        create_transaction_index(namespace.Namespaces.Contract.value, force=False)  # Create the reserved txn type index
    except redis.exceptions.ResponseError as e:
        if not str(e).startswith("Index already exists"):  # We don't care if index already exists
            raise
    txn_types_to_watch = {namespace.Namespaces.Contract.value: 1}  # Will be use when going through all stored transactions
    txn_type_models = {
        namespace.Namespaces.Contract.value: transaction_type_model.TransactionTypeModel(namespace.Namespaces.Contract.value, active_since_block="1")
    }
    for txn_type in transaction_type_dao.list_registered_transaction_types():
        txn_type_model = transaction_type_model.new_from_at_rest(txn_type)
        txn_type_models[txn_type_model.txn_type] = txn_type_model
        _log.info(f"Adding index for {txn_type_model.txn_type}")
        try:
            create_transaction_index(txn_type_model.txn_type, txn_type_model.custom_indexes, force=False)
        except redis.exceptions.ResponseError as e:
            if not str(e).startswith("Index already exists"):  # We don't care if index already exists
                raise
        txn_types_to_watch[txn_type_model.txn_type] = int(txn_type_model.active_since_block)

    # -- LIST AND INDEX ACTUAL TRANSACTIONS FROM STORAGE
    _log.info("Listing all full transactions")
    transaction_blocks = storage.list_objects("TRANSACTION/")
    for txn_path in transaction_blocks:
        # do a check to see if this block's transactions were already marked as indexed
        if not client.redis.sismember(TXN_MIGRATION_KEY, txn_path):
            _log.info(f"Indexing transactions for {txn_path}")
            for txn in storage.get(txn_path).split(b"\n"):
                if txn:
                    txn_model = transaction_model.new_from_at_rest_full(json.loads(txn)["txn"])
                    # Add general transaction index
                    put_document(Indexes.transaction.value, f"txn-{txn_model.txn_id}", {"block_id": txn_model.block_id}, upsert=True)
                    watch_block = txn_types_to_watch.get(txn_model.txn_type)
                    # Extract custom indexes if necessary
                    if watch_block and int(txn_model.block_id) >= watch_block:
                        txn_model.extract_custom_indexes(txn_type_models[txn_model.txn_type])
                        put_document(txn_model.txn_type, txn_model.txn_id, txn_model.export_as_search_index(), upsert=True)
            client.redis.sadd(TXN_MIGRATION_KEY, txn_path)
        else:
            _log.info(f"Skipping already indexed transaction {txn_path}")