Source code for indra.ontology.ontology_graph

import logging
import networkx
import functools
from collections import deque
from typing import Optional, Tuple

logger = logging.getLogger(__name__)


def with_initialize(func):
    @functools.wraps(func)
    def wrapper(obj, *args, **kwargs):
        if not obj._initialized:
            obj.initialize()
        return func(obj, *args, **kwargs)
    return wrapper


[docs]class IndraOntology(networkx.DiGraph): """A directed graph representing entities and their properties as nodes and ontological relationships between the entities as edges. Attributes ---------- name : str A prefix/name for the ontology, used for the purposes of caching. version : str A version for the ontology, used for the purposes of caching. """ version = None name = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._initialized = False self.name_to_grounding = {} self.transitive_closure = set() self._isa_counter = 0 self._isrel_counter = 0
[docs] def initialize(self): """Initialize the ontology by adding nodes and edges. By convention, ontologies are implemented such that the constructor does not add all the nodes and edges, which can take a long time. This function is called automatically when any of the user-facing methods ot IndraOntology is called. This way, the ontology is only fully constructed if it is used. """ raise NotImplementedError('The initialize method needs to be ' 'implemented when subclassing ' 'IndraOntology')
@with_initialize def _check_path(self, ns1, id1, ns2, id2, edge_types): try: target = (ns2, id2) if target in self._transitive_rel(ns1, id1, self.child_rel, edge_types, target): return True else: return False # This typically happens if the node is missing from # the graph. Is there a more specific error type? except networkx.NetworkXError: return False
[docs] @staticmethod def get_ns_id(node): """Return the name space and ID of a given node from its label. Parameters ---------- node : str A node's label. Returns ------- tuple(str, str) A tuple of the node's name space and ID. """ return IndraOntology.reverse_label(node)
[docs] @staticmethod def get_ns(node): """Return the name space of a given node from its label. Parameters ---------- node : str A node's label. Returns ------- str The node's name space. """ return IndraOntology.get_ns_id(node)[0]
[docs] @staticmethod def get_id(node): """Return the name ID a given node from its label. Parameters ---------- node : str A node's label. Returns ------- str The node's ID within its name space. """ return IndraOntology.get_ns_id(node)[1]
[docs] @with_initialize def isrel(self, ns1, id1, ns2, id2, rels): """Return True if the two entities are related with a given rel. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. rels : iterable of str A set of edge types to traverse when determining if the first entity is related to the second entity. Returns ------- bool True if the first entity is related to the second with a directed path containing edges with types in `rels` . Otherwise False. """ self._isrel_counter += 1 return self._check_path(ns1, id1, ns2, id2, rels)
[docs] @with_initialize def isa(self, ns1, id1, ns2, id2): """Return True if the first entity is related to the second as 'isa'. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. Returns ------- bool True if the first entity is related to the second with a directed path containing edges with type `isa`. Otherwise False. """ return self.isrel(ns1, id1, ns2, id2, rels={'isa'})
[docs] @with_initialize def partof(self, ns1, id1, ns2, id2): """Return True if the first entity is related to the second as 'partof'. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. Returns ------- bool True if the first entity is related to the second with a directed path containing edges with type `partof`. Otherwise False. """ return self.isrel(ns1, id1, ns2, id2, rels={'partof'})
[docs] @with_initialize def isa_or_partof(self, ns1, id1, ns2, id2): """Return True if the first entity is related to the second as 'isa' or `partof`. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. Returns ------- bool True if the first entity is related to the second with a directed path containing edges with type `isa` or `partof`. Otherwise False. """ self._isa_counter += 1 if self.transitive_closure: return (self.label(ns1, id1), self.label(ns2, id2)) in self.transitive_closure return self.isrel(ns1, id1, ns2, id2, rels={'isa', 'partof'})
[docs] @with_initialize def maps_to(self, ns1, id1, ns2, id2): """Return True if the first entity has an xref to the second. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. Returns ------- bool True if the first entity is related to the second with a directed path containing edges with type `xref`. Otherwise False. """ return self._check_path(ns1, id1, ns2, id2, {'xref'})
[docs] @with_initialize def map_to(self, ns1, id1, ns2): """Return an entity that is a unique xref of an entity in a given name space. This function first finds all mappings via `xrefs` edges from the given first entity to the given second name space. If exactly one such mapping target is found, the target is returned. Otherwise, None is returned. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. Returns ------- str The name space of the second entity str The ID of the second entity in the given name space. """ targets = [target for target in self.descendants_rel(ns1, id1, {'xref'}) if target[0] == ns2] if len(targets) == 1: return targets[0] return None
@with_initialize def _transitive_rel(self, ns, id, rel_fun, rel_types, target=None): source = (ns, id) visited = {source} queue = deque([(source, rel_fun(*source, rel_types))]) while queue: parent, children = queue[0] try: child = next(children) if target and child == target: return [target] if child not in visited: visited.add(child) queue.append((child, rel_fun(*child, rel_types))) except networkx.NetworkXError as e: logger.debug(e) return [] except StopIteration: queue.popleft() return list(visited - {source}) @with_initialize def descendants_rel(self, ns, id, rel_types): return self._transitive_rel(ns, id, self.child_rel, rel_types) @with_initialize def ancestors_rel(self, ns, id, rel_types): return self._transitive_rel(ns, id, self.parent_rel, rel_types) @with_initialize def child_rel(self, ns, id, rel_types): source = self.label(ns, id) # This is to handle the case where the node is not in the # graph try: succ_iter = self.successors(source) except networkx.NetworkXError: return [] for target in succ_iter: if self.edges[source, target]['type'] in rel_types: yield self.get_ns_id(target) @with_initialize def parent_rel(self, ns, id, rel_types): target = self.label(ns, id) # This is to handle the case where the node is not in the # graph try: pred_iter = self.predecessors(target) except networkx.NetworkXError: return [] for source in pred_iter: if self.edges[source, target]['type'] in rel_types: yield self.get_ns_id(source)
[docs] @with_initialize def get_children(self, ns, id, ns_filter=None): """Return all `isa` or `partof` children of a given entity. Importantly, `isa` and `partof` edges always point towards higher-level entities in the ontology but here "child" means lower-level entity i.e., ancestors in the graph. Parameters ---------- ns : str The name space of an entity. id : str The ID of an entity. ns_filter : Optional[set] If provided, only entities within the set of given name spaces are returned. Returns ------- list A list of entities (name space, ID pairs) that are the children of the given entity. """ children = self.ancestors_rel(ns, id, {'isa', 'partof'}) children = [(cns, cid) for cns, cid in children if ns_filter is None or cns in ns_filter] return children
[docs] @with_initialize def get_parents(self, ns, id): """Return all `isa` or `partof` parents of a given entity. Importantly, `isa` and `partof` edges always point towards higher-level entities in the ontology but here "parent" means higher-level entity i.e., descendants in the graph. Parameters ---------- ns : str The name space of an entity. id : str The ID of an entity. Returns ------- list A list of entities (name space, ID pairs) that are the parents of the given entity. """ return self.descendants_rel(ns, id, {'isa', 'partof'})
[docs] @with_initialize def get_top_level_parents(self, ns, id): """Return all top-level `isa` or `partof` parents of a given entity. Top level means that this function only returns parents which don't have any further `isa` or `partof` parents above them. Importantly, `isa` and `partof` edges always point towards higher-level entities in the ontology but here "parent" means higher-level entity i.e., descendants in the graph. Parameters ---------- ns : str The name space of an entity. id : str The ID of an entity. Returns ------- list A list of entities (name space, ID pairs) that are the top-level parents of the given entity. """ parents = self.get_parents(ns, id) return [p for p in parents if not self.get_parents(*p)]
[docs] @with_initialize def get_mappings(self, ns, id): """Return entities that are xrefs of a given entity. This function returns all mappings via `xrefs` edges from the given entity. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- list A list of entities (name space, ID pairs) that are direct or indirect xrefs of the given entity. """ return self.descendants_rel(ns, id, {'xref'})
[docs] @with_initialize def get_replacement(self, ns, id): """Return a replacement for a given entity or None if no replacement. A replacement is typically necessary if the given entity is obsolete and has been replaced by another entry. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- : tuple A tuple of the form (ns, id) of the replacement entity or None if no replacement. """ rep = list(self.child_rel(ns, id, {'replaced_by'})) if rep: return rep[0]
[docs] @with_initialize def get_name(self, ns, id): """Return the standard name of a given entity. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- str or None The name associated with the given entity or None if the node is not in the ontology or doesn't have a standard name. """ return self.get_node_property(ns, id, property='name')
[docs] @with_initialize def get_type(self, ns, id): """Return the type of a given entity. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- str or None The type associated with the given entity or None if the node is not in the ontology or doesn't have a type annotation. """ return self.get_node_property(ns, id, 'type')
[docs] @with_initialize def get_polarity(self, ns, id): """Return the polarity of a given entity. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- str or None The polarity associated with the given entity or None if the node is not in the ontology or doesn't have a polarity. """ return self.get_node_property(ns, id, property='polarity')
[docs] @with_initialize def get_node_property(self, ns, id, property): """Return a given property of a given entity. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. property : str The property to look for on the given node. Returns ------- str or None The name associated with the given entity or None if the node is not in the ontology or doesn't have the given property. """ try: return self.nodes[self.label(ns, id)][property] except KeyError: return None
[docs] @with_initialize def is_opposite(self, ns1, id1, ns2, id2): """Return True if the two entities are opposites of each other. Parameters ---------- ns1 : str The first entity's name space. id1 : str The first entity's ID. ns2 : str The second entity's name space. id2 : str The second entity's ID. Returns ------- bool True if the first entity is in an `is_opposite` relationship with the second. False otherwise. """ # FIXME: this assumes, as is the case in practice with our # ontologies that we have disjunct pairs of is_opposite entities # more generally, we may need to allow other edge types and # look at the overall "polarity" of the path. return self._check_path(ns1, id1, ns2, id2, {'is_opposite'})
[docs] @with_initialize def get_id_from_name(self, ns, name) -> Optional[Tuple[str, str]]: """Return an entity's ID given its name space and standard name. Parameters ---------- ns : str The name space in which the standard name is defined. name : str The standard name defined in the name space. Returns ------- : The pair of namespace and ID corresponding to the given standard name in the given name space or None if it's not available. """ if not self.name_to_grounding: self._build_name_lookup() return self.name_to_grounding.get((ns, name))
@with_initialize def _build_name_lookup(self): self.name_to_grounding = { (self.get_ns(node), data['name']): self.get_ns_id(node) for node, data in self.nodes(data=True) if 'name' in data and not data.get('obsolete', False) }
[docs] @with_initialize def nodes_from_suffix(self, suffix): """Return all node labels which have a given suffix. This is useful for finding entities in ontologies where the IDs consist of paths like a/b/c/... Parameters ---------- suffix : str A label suffix. Returns ------- list A list of node labels that have the given suffix. """ return [node for node in self.nodes if node.endswith(suffix)]
[docs] @staticmethod def label(ns, id): """Return the label corresponding to a given entity. This is mostly useful for constructing the ontology or when adding new nodes/edges. It can be overriden in subclasses to change the default mapping from ns / id to a label. Parameters ---------- ns : str An entity's name space. id : str An entity's ID. Returns ------- str The label corresponding to the given entity. """ return '%s:%s' % (ns, id)
[docs] @staticmethod def reverse_label(label): """Return the name space and ID from a given label. This is the complement of the `label` method which reverses a label into a name space and ID. Parameters ---------- label A node label. Returns ------- str The name space corresponding to the label. str The ID corresponding to the label. """ return tuple(label.split(':', maxsplit=1))
def _build_transitive_closure(self): if self.transitive_closure: return logger.info('Building transitive closure for faster ' 'isa/partof lookups...') self.transitive_closure = set() for node in self.nodes(): ns, id = self.get_ns_id(node) for pns, pid in self.descendants_rel(ns, id, rel_types={'isa', 'partof'}): self.transitive_closure.add((self.label(ns, id), self.label(pns, pid))) @with_initialize def print_stats(self): logger.info('Number of nodes: %d' % len(self.nodes)) logger.info('Number of edges: %d' % len(self.edges))