bel/schemas/bel.py
# Standard Library
import copy
import enum
import json
import re
from typing import Any, List, Mapping, Optional, Tuple, Union
# Third Party
from loguru import logger
from pydantic import BaseModel, Field, root_validator
# Local
import bel.core.settings as settings
import bel.db.arangodb
import bel.terms.orthologs
import bel.terms.terms
from bel.core.utils import namespace_quoting, split_key_label
from bel.resources.namespace import get_namespace_metadata
from bel.schemas.constants import AnnotationTypesEnum, EntityTypesEnum
from bel.schemas.terms import Term
Key = str # Type alias for NsVal Key values
NamespacePattern = r"[\w\.]+" # Regex for Entity Namespace
class SpanTypesEnum(str, enum.Enum):
function = "function"
function_name = "function_name"
function_args = "function_args"
relation = "relation"
ns_arg = "ns_arg"
namespace = "namespace"
ns_id = "ns_id"
ns_label = "ns_label"
string_arg = "string_arg"
string = "string"
start_paren = "start_paren"
end_paren = "end_paren"
class Span(BaseModel):
"""Used for collecting string spans
The spans are collect by the index of the first char of the span and the non-inclusive
end of the last span character.
For example:
'cat' with a span of [0, 2] results in 'ca'
You can use -1 as the span end for 1 beyond the last character of the string.
"""
start: int = Field(..., title="Span Start")
end: int = Field(..., title="Span End")
span_str: str = ""
type: Optional[SpanTypesEnum]
class NsArgSpan(Span):
"""Namespace Arg Span"""
namespace: Span
id: Span
label: Optional[Span]
class FunctionSpan(Span):
name: Span # function name span
args: Optional[Span] # parentheses span
class Pair(BaseModel):
"""Paired characters
Used for collecting matched quotes and parentheses
"""
start: Union[int, None] = Field(..., description="index of first in paired chars")
end: Union[int, None] = Field(..., description="Index of second of paired chars")
class ErrorLevelEnum(str, enum.Enum):
Good = "Good"
Error = "Error"
Warning = "Warning"
Processing = "Processing"
class ValidationErrorType(str, enum.Enum):
Nanopub = "Nanopub"
Assertion = "Assertion"
Annotation = "Annotation"
class ValidationError(BaseModel):
type: ValidationErrorType
severity: ErrorLevelEnum
label: str = Field(
"",
description="Label used in search - combination of type and severity, e.g. Assertion-Warning",
)
msg: str
visual: Optional[str] = Field(
None,
description="Visualization of the location of the error in the Assertion string or Annotation using html span tags",
)
visual_pairs: Optional[List[Tuple[int, int]]] = Field(
None,
description="Used when the Assertion string isn't available. You can then post-process these pairs to create the visual field.",
)
index: int = Field(
0,
description="Index to sort validation errors - e.g. for multiple errors in Assertions - start at the beginning of the string.",
)
@root_validator(pre=True)
def set_label(cls, values):
label, type_, severity = (values.get("label"), values.get("type"), values.get("severity"))
if not label:
label = f"{type_}-{severity}"
values["label"] = label.strip()
return values
class ValidationErrors(BaseModel):
status: Optional[ErrorLevelEnum] = "Good"
errors: Optional[List[ValidationError]]
validation_target: Optional[str]
class AssertionStr(BaseModel):
"""Assertion string object - to handle either SRO format or simple string of full assertion"""
entire: str = Field(
"",
description="Will be dynamically created from the SRO fields if null/empty when initialized.",
)
subject: str = ""
relation: str = ""
object: str = ""
@root_validator(pre=True)
def set_entire(cls, values):
entire, subject, relation, object_ = (
values.get("entire"),
values.get("subject"),
values.get("relation"),
values.get("object"),
)
if subject is None:
subject = ""
if relation is None:
relation = ""
if object_ is None:
object_ = ""
if not entire:
entire = f"{subject} {relation} {object_}"
values["entire"] = entire.strip()
return values
class NsVal(object):
"""Namespaced value"""
def __init__(
self, key_label: str = "", namespace: str = "", key: str = "", id: str = "", label: str = ""
):
"""Preferentially use key_label to extract namespace:id!Optional[label]"""
self.key = key
if key_label:
(namespace, id, label) = split_key_label(key_label)
elif key:
(namespace, id) = key.split(":", 1)
self.namespace: str = namespace
self.id: str = namespace_quoting(id)
if not self.key:
self.key = f"{self.namespace}:{self.id}"
self.label = ""
if label:
self.label: str = namespace_quoting(label)
# Add key_label to NsVal
self.update_key_label()
def add_label(self):
if not self.label:
self.update_label()
return self
def update_label(self):
term = bel.terms.terms.get_term(self.key)
if term and term.label:
self.label = namespace_quoting(term.label)
return self
def db_key(self):
"""Used for arangodb key"""
return bel.db.arangodb.arango_id_to_key(self.key)
def update_key_label(self):
"""Return key with label if available"""
self.add_label()
if self.label:
self.key_label = f"{self.namespace}:{self.id}!{self.label}"
else:
self.key_label = f"{self.namespace}:{self.id}"
return self.key_label
def to_string(self):
return __str__(self)
def to_json(self):
return self.key_label
def __str__(self):
if self.label:
return f"{self.namespace}:{self.id}!{self.label}"
else:
return f"{self.namespace}:{self.id}"
__repr__ = __str__
def __len__(self):
return len(self.__str__())
class BelEntity(object):
"""BEL Term - supports original NsVal ns:id!label plus (de)canonicalization and orthologs"""
def __init__(self, term_key: Key = "", nsval: Optional[NsVal] = None):
"""Create BelEntity via a term_key or a NsVal object
You cannot provide a term_key_label string (e.g. NS:ID:LABEL) as a term_key
"""
self.term: Optional[Term] = None
self.canonical: Optional[NsVal] = None
self.decanonical: Optional[NsVal] = None
self.species_key: Key = None
self.entity_types = []
self.orthologs: Mapping[Key, dict] = {}
self.orthologized: bool = False
self.original_species_key: Optional[Key] = None
# NOTE - self.nsval is overridden when orthologized
if term_key:
self.original_term_key = term_key
self.term = bel.terms.terms.get_term(term_key)
if self.term:
self.species_key = self.term.species_key
self.original_species_key = self.species_key
self.nsval: NsVal = NsVal(
namespace=self.term.namespace, id=self.term.id, label=self.term.label
)
self.original_nsval = self.nsval
elif nsval:
self.nsval = nsval
self.original_nsval = nsval
else:
self.nsval = None
self.namespace_metadata = get_namespace_metadata().get(self.nsval.namespace, None)
if self.namespace_metadata is not None and self.namespace_metadata.entity_types:
self.entity_types = self.namespace_metadata.entity_types
self.add_term()
def add_term(self):
"""Add term info"""
if self.namespace_metadata and self.namespace_metadata.namespace_type == "complete":
self.term = bel.terms.terms.get_term(self.nsval.key)
if self.term and self.nsval.key != self.term.key:
self.nsval = NsVal(
namespace=self.term.namespace, id=self.term.id, label=self.term.label
)
if self.term and self.term.entity_types:
self.entity_types = self.term.entity_types
if self.term and self.term.species_key:
self.species_key = self.term.species_key
return self
def add_species(self):
"""Add species if not already set"""
if self.species_key:
return self
if not self.term:
self.add_term()
if self.term and self.term.species_key:
self.species_key = self.term.species_key
elif self.namespace_metadata and self.namespace_metadata.species_key:
self.species_key = self.namespace_metadata.species_key
return self
def add_entity_types(self):
"""get entity_types to BEL Entity"""
if self.entity_types:
return self
entity_types = []
if self.term:
entity_types = self.term.entity_types
elif self.namespace_metadata and self.namespace_metadata.namespace_type == "complete":
self.term = bel.terms.terms.get_term(self.nsval.key)
if self.term:
entity_types = self.term.entity_types
elif self.namespace_metadata and self.namespace_metadata.entity_types:
entity_types = self.namespace_metadata.entity_types
self.entity_types = [et.name for et in entity_types]
return self
def get_entity_types(self):
if not self.entity_types:
self.add_entity_types()
return self.entity_types
def normalize(
self,
canonical_targets: Mapping[str, List[str]] = settings.BEL_CANONICALIZE,
decanonical_targets: Mapping[str, List[str]] = settings.BEL_DECANONICALIZE,
):
"""Collect (de)canonical forms"""
if self.canonical and self.decanonical:
return self
if self.namespace_metadata and self.namespace_metadata.namespace_type != "complete":
self.canonical = self.nsval
self.decanonical = self.nsval
return self
normalized = bel.terms.terms.get_normalized_terms(
self.nsval.key,
canonical_targets=canonical_targets,
decanonical_targets=decanonical_targets,
term=self.term,
)
if normalized["original"] != normalized["normalized"]:
self.nsval = NsVal(key_label=normalized["normalized"])
if self.original_nsval.label:
self.nsval.label = self.original_nsval.label
self.canonical = copy.deepcopy(self.nsval)
if normalized["canonical"]:
self.canonical = NsVal(key_label=normalized["canonical"])
self.canonical.label = ""
self.canonical.key_label = self.canonical.key
self.decanonical = self.nsval
if normalized["decanonical"]:
self.decanonical = NsVal(key_label=normalized["decanonical"])
return self
def canonicalize(
self,
canonical_targets: Mapping[str, List[str]] = settings.BEL_CANONICALIZE,
decanonical_targets: Mapping[str, List[str]] = settings.BEL_DECANONICALIZE,
):
"""Canonicalize BEL Entity
Must set both targets if not using defaults as the underlying normalization handles
both canonical and decanonical forms in the same query
"""
if self.orthologized:
self.nsval = self.orthologs[self.species_key]["canonical"]
else:
self.normalize(
canonical_targets=settings.BEL_CANONICALIZE,
decanonical_targets=settings.BEL_DECANONICALIZE,
)
self.nsval = self.canonical
return self
def decanonicalize(
self,
canonical_targets: Mapping[str, List[str]] = settings.BEL_CANONICALIZE,
decanonical_targets: Mapping[str, List[str]] = settings.BEL_DECANONICALIZE,
):
"""Decanonicalize BEL Entity
Must set both targets if not using defaults as the underlying normalization handles
both canonical and decanonical forms in the same query
"""
if self.orthologized:
self.nsval = self.orthologs[self.species_key]["decanonical"]
else:
self.normalize(
canonical_targets=settings.BEL_CANONICALIZE,
decanonical_targets=settings.BEL_DECANONICALIZE,
)
self.nsval = self.decanonical
return self
def collect_orthologs(self, species_keys: List[Key] = settings.BEL_ORTHOLOGIZE_TARGETS):
"""Get orthologs for BelEntity is orthologizable"""
self.add_entity_types()
self.normalize()
self.add_species()
# Do not run if no species or already exists
if not self.species_key or self.orthologs:
return self
# Only collect orthologs if it's the right entity type
self.add_entity_types()
if not list(set(self.entity_types) & set(["Gene", "RNA", "Micro_RNA", "Protein", "all"])):
return self
orthologs = bel.terms.orthologs.get_orthologs(self.canonical.key, species_keys=species_keys)
for ortholog_species_key in orthologs:
ortholog_key = orthologs[ortholog_species_key]
normalized = bel.terms.terms.get_normalized_terms(ortholog_key)
ortholog_dict = {}
if normalized["canonical"]:
ortholog_dict["canonical"] = NsVal(
key=normalized["canonical"], label=normalized["label"]
)
if normalized["decanonical"]:
ortholog_dict["decanonical"] = NsVal(
key=normalized["decanonical"], label=normalized["label"]
)
self.orthologs[ortholog_species_key] = copy.copy(ortholog_dict)
return self
def orthologize(self, species_key: Key):
"""Orthologize BEL entity - results in canonical form"""
self.add_entity_types()
self.normalize()
self.add_species()
# Do not run if no species or already exists
if not self.species_key:
return self
# Only collect orthologs if it's the right entity type
self.add_entity_types()
if not list(set(self.entity_types) & set(["Gene", "RNA", "Micro_RNA", "Protein", "all"])):
return self
if not self.orthologs:
self.collect_orthologs(species_keys=[species_key])
if species_key not in self.orthologs:
self.orthologized = False
self.nsval = self.canonical
return self
self.orthologized = True
self.species_key = species_key
self.nsval = self.orthologs[species_key]["canonical"]
return self
def orthologizable(self, species_key: Key) -> bool:
"""Is this BEL Entity/NSArg orthologizable?"""
self.add_entity_types()
self.normalize()
self.add_species()
# Only collect orthologs if it's the right entity type
if not list(set(self.entity_types) & set(["Gene", "RNA", "Micro_RNA", "Protein", "all"])):
return None
# Do not run if no species or already exists
if not self.species_key:
return False
if not self.orthologs:
self.collect_orthologs()
if species_key not in self.orthologs:
return False
return True
def all(self):
"""Fully flesh out BEL Entity"""
self.add_species()
self.add_entity_types()
self.normalize()
self.collect_orthologs()
return self
def __str__(self):
return str(self.nsval)
__repr__ = __str__