Source code for indra.preassembler.hierarchy_manager

from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
import os
import rdflib
import logging
try:
    from functools import lru_cache
except ImportError:
    from functools32 import lru_cache

from indra.preassembler.make_entity_hierarchy import ns_map

logger = logging.getLogger(__name__)


[docs]class HierarchyManager(object): """Store hierarchical relationships between different types of entities. Used to store, e.g., entity hierarchies (proteins and protein families) and modification hierarchies (serine phosphorylation vs. phosphorylation). Parameters ---------- rdf_file : string Path to the RDF file containing the hierarchy. build_closure : Optional[bool] If True, the transitive closure of the hierarchy is generated up from to speed up processing. Default: True uri_as_name: Optional[bool] If True, entries are accessed directly by their URIs. If False entries are accessed by finding their name through the hasName relationship. Default: True Attributes ---------- graph : instance of `rdflib.Graph` The RDF graph containing the hierarchy. """ prefixes = """ PREFIX rn: <http://sorger.med.harvard.edu/indra/relations/> """ def __init__(self, rdf_file=None, build_closure=True, uri_as_name=True): """Initialize with the path to an RDF file""" self.build_closure = build_closure self.uri_as_name = uri_as_name self.relations_prefix = \ 'http://sorger.med.harvard.edu/indra/relations/' self.isa_closure = {} self.partof_closure = {} self.isa_or_partof_closure = {} self.components = {} self._children = {} self.component_counter = 0 # If an RDF file was given, we build up the internal data structures. # Otherwise we defer initialization until later. if rdf_file: self.load_from_rdf_file(rdf_file) else: self.graph = None
[docs] def load_from_rdf_file(self, rdf_file): """Initialize given an RDF input file representing the hierarchy." Parameters ---------- rdf_file : str Path to an RDF file. """ self.graph = rdflib.Graph() self.graph.parse(os.path.abspath(rdf_file), format='nt') self.initialize()
[docs] def load_from_rdf_string(self, rdf_str): """Initialize given an RDF string representing the hierarchy." Parameters ---------- rdf_str : str An RDF string. """ self.graph = rdflib.Graph() self.graph.parse(data=rdf_str, format='nt') self.initialize()
[docs] def load_from_rdf_graph(self, rdf_graph): """Initialize given an RDF Graph representing the hierarchy." Parameters ---------- rdf_graph : rdflib.Graph An rdflib Graph representing the hierarchy. """ self.graph = rdf_graph self.initialize()
def initialize(self): if self.build_closure: self.build_transitive_closures() # Build reverse lookup dict from the hierarchy # First get all URIs that correspond to parents all_parents = {parent for parents in self.isa_or_partof_closure.values() for parent in parents} # We use the inverse relation here rel_fun = lambda node, graph: self.isa_or_partof_objects(node, inverse=True) # Now for each parent we get the inverse transitive closure to # get all its children nodes self._children = {} for parent in all_parents: children = self.graph.transitiveClosure(rel_fun, rdflib.term.URIRef(parent)) self._children[parent] = list(set(c.toPython() for c in children))
[docs] def extend_with(self, rdf_file): """Extend the RDF graph of this HierarchyManager with another RDF file. Parameters ---------- rdf_file : str An RDF file which is parsed such that the current graph and the graph described by the file are merged. """ self.graph.parse(os.path.abspath(rdf_file), format='nt') self.initialize()
[docs] def build_transitive_closures(self): """Build the transitive closures of the hierarchy. This method constructs dictionaries which contain terms in the hierarchy as keys and either all the "isa+" or "partof+" related terms as values. """ self.component_counter = 0 for rel, tc_dict in ((self.isa_objects, self.isa_closure), (self.partof_objects, self.partof_closure), (self.isa_or_partof_objects, self.isa_or_partof_closure)): self.build_transitive_closure(rel, tc_dict)
[docs] def build_transitive_closure(self, rel, tc_dict): """Build a transitive closure for a given relation in a given dict.""" # Make a function with the righ argument structure rel_fun = lambda node, graph: rel(node) for x in self.graph.all_nodes(): rel_closure = self.graph.transitiveClosure(rel_fun, x) xs = x.toPython() for y in rel_closure: ys = y.toPython() if xs == ys: continue try: tc_dict[xs].append(ys) except KeyError: tc_dict[xs] = [ys] if rel == self.isa_or_partof_objects: self._add_component(xs, ys)
def _add_component(self, xs, ys): xcomp = self.components.get(xs) ycomp = self.components.get(ys) if xcomp is None: if ycomp is None: # Neither x nor y are in a component so we start a # new component and assign x and y to the same # component self.components[xs] = self.component_counter self.components[ys] = self.component_counter self.component_counter += 1 else: # Because y is already part of an existing component # we assign its component to x self.components[xs] = ycomp else: if ycomp is None: # Because x is already part of an existing component # we assign its component to y self.components[ys] = xcomp else: # This is a special case in which both x and y are # parts of components # If they are in the same component then there's # nothing further to do if xcomp != ycomp: remove_component = max(xcomp, ycomp) joint_component = min(xcomp, ycomp) for k, v in self.components.items(): if v == remove_component: self.components[k] = joint_component
[docs] @lru_cache(maxsize=100000) def find_entity(self, x): """ Get the entity that has the specified name (or synonym). Parameters ---------- x : string Name or synonym for the target entity. """ qstr = self.prefixes + """ SELECT ?x WHERE {{ ?x rn:hasName "{0}" . }} """.format(x) res = self.graph.query(qstr) if list(res): en = list(res)[0][0].toPython() return en else: return None
def isa_objects(self, node, inverse=False): # Normally we look for objects of the relation, but if inverted, # we look for the subject predicate = rdflib.term.URIRef(self.relations_prefix + 'isa') partner_list = self.graph.subjects(predicate, node) if inverse else \ self.graph.objects(node, predicate) for o in partner_list: yield o def partof_objects(self, node, inverse=False): # Normally we look for objects of the relation, but if inverted, # we look for the subject predicate = rdflib.term.URIRef(self.relations_prefix + 'partof') partner_list = self.graph.subjects(predicate, node) if inverse else \ self.graph.objects(node, predicate) for o in partner_list: yield o def isa_or_partof_objects(self, node, inverse=False): for o in self.isa_objects(node, inverse): yield o for o in self.partof_objects(node, inverse): yield o
[docs] def isa(self, ns1, id1, ns2, id2): """Return True if one entity has an "isa" relationship to another. Parameters ---------- ns1 : str Namespace code for an entity. id1 : string URI for an entity. ns2 : str Namespace code for an entity. id2 : str URI for an entity. Returns ------- bool True if t1 has an "isa" relationship with t2, either directly or through a series of intermediates; False otherwise. """ rel_fun = lambda node, graph: self.isa_objects(node) return self.directly_or_indirectly_related(ns1, id1, ns2, id2, self.isa_closure, rel_fun)
[docs] def partof(self, ns1, id1, ns2, id2): """Return True if one entity is "partof" another. Parameters ---------- ns1 : str Namespace code for an entity. id1 : str URI for an entity. ns2 : str Namespace code for an entity. id2 : str URI for an entity. Returns ------- bool True if t1 has a "partof" relationship with t2, either directly or through a series of intermediates; False otherwise. """ rel_fun = lambda node, graph: self.partof_objects(node) return self.directly_or_indirectly_related(ns1, id1, ns2, id2, self.partof_closure, rel_fun)
[docs] def isa_or_partof(self, ns1, id1, ns2, id2): """Return True if two entities are in an "isa" or "partof" relationship Parameters ---------- ns1 : str Namespace code for an entity. id1 : str URI for an entity. ns2 : str Namespace code for an entity. id2 : str URI for an entity. Returns ------- bool True if t1 has a "isa" or "partof" relationship with t2, either directly or through a series of intermediates; False otherwise. """ rel_fun = lambda node, graph: self.isa_or_partof_objects(node) return self.directly_or_indirectly_related(ns1, id1, ns2, id2, self.isa_or_partof_closure, rel_fun)
[docs] def is_opposite(self, ns1, id1, ns2, id2): """Return True if two entities are in an "is_opposite" relationship Parameters ---------- ns1 : str Namespace code for an entity. id1 : str URI for an entity. ns2 : str Namespace code for an entity. id2 : str URI for an entity. Returns ------- bool True if t1 has an "is_opposite" relationship with t2. """ u1 = self.get_uri(ns1, id1) u2 = self.get_uri(ns2, id2) t1 = rdflib.term.URIRef(u1) t2 = rdflib.term.URIRef(u2) rel = rdflib.term.URIRef(self.relations_prefix + 'is_opposite') to = self.graph.objects(t1, rel) if t2 in to: return True return False
[docs] def get_parents(self, uri, type='all'): """Return parents of a given entry. Parameters ---------- uri : str The URI of the entry whose parents are to be returned. See the get_uri method to construct this URI from a name space and id. type : str 'all': return all parents irrespective of level; 'immediate': return only the immediate parents; 'top': return only the highest level parents """ # First do a quick dict lookup to see if there are any parents all_parents = set(self.isa_or_partof_closure.get(uri, [])) # If there are no parents or we are looking for all, we can return here if not all_parents or type == 'all': return all_parents # If we need immediate parents, we search again, this time knowing that # the uri is definitely in the graph since it has some parents if type == 'immediate': node = rdflib.term.URIRef(uri) immediate_parents = list(set(self.isa_or_partof_objects(node))) return [p.toPython() for p in immediate_parents] elif type == 'top': top_parents = [p for p in all_parents if not self.isa_or_partof_closure.get(p)] return top_parents
[docs] def get_children(self, uri): """Return all (not just immediate) children of a given entry. Parameters ---------- uri : str The URI of the entry whose children are to be returned. See the get_uri method to construct this URI from a name space and id. """ children = self._children.get(uri, []) return children
@lru_cache(maxsize=100000) def query_rdf(self, id1, rel, id2): term1 = self.find_entity(id1) term2 = self.find_entity(id2) qstr = self.prefixes + """ SELECT (COUNT(*) as ?s) WHERE {{ <{}> {} <{}> . }} """.format(term1, rel, term2) res = self.graph.query(qstr) count = [r[0] for r in res][0] if count.toPython() == 1: return True else: return False @staticmethod def get_uri(ns, id): if ns == 'HGNC': return 'http://identifiers.org/hgnc.symbol/' + id elif ns == 'UP': return 'http://identifiers.org/uniprot/' + id elif ns == 'FPLX': return 'http://identifiers.org/fplx/' + id elif ns in ['UN', 'WDI', 'FAO', 'HUME']: return \ 'https://github.com/clulab/eidos/wiki/JSON-LD/Grounding#' + id elif ns == 'SOFIA': return \ 'http://cs.cmu.edu/sofia/' + id elif ns == 'CWMS': if id.lower().startswith('ont::'): id = id[5:] return 'http://trips.ihmc.us/concepts/' + id.lower() elif ns == 'INDRA_ACTIVITIES': return 'http://sorger.med.harvard.edu/indra/activities/' + id elif ns == 'INDRA_MODS': return 'http://sorger.med.harvard.edu/indra/modifications/' + id elif ns == 'INDRA_LOCATIONS': return 'http://sorger.med.harvard.edu/indra/locations/' + id else: return ns + id @staticmethod def ns_id_from_uri(uri): sep_ix = uri.rfind('/') + 1 ag_ns = uri[0:sep_ix] ag_id = uri[sep_ix:] ag_ns_name = ns_map.get(ag_ns) if ag_ns_name is None: raise UnknownNamespaceException('Unknown namespace %s' % ag_ns) return (ag_ns_name, ag_id)
[docs]class YamlHierarchyManager(HierarchyManager): def __init__(self, root, yaml_to_rdf): self.yaml_root = root self.yaml_to_rdf = yaml_to_rdf super(YamlHierarchyManager, self).__init__(None, True, True) G = self.yaml_to_rdf(self.yaml_root) self.load_from_rdf_graph(G) def add_entry(self, entry, examples=None): # TODO: Add the entry by finding the right place in the YAML object examples = examples if examples else [] parts = entry.split('/') root = self.yaml_root for idx, part in enumerate(parts): new_root = None for element in root: # If this is an OntologyNode if 'OntologyNode' in element: if element['name'] == part: new_root = element break else: assert len(element) == 1 key = list(element.keys())[0] if key == part: new_root = element[key] break if new_root is None: if idx == len(parts) - 1: root.append({'OntologyNode': None, 'name': part, 'examples': examples}) break else: root.append({part: []}) new_root = root[-1][part] root = new_root G = self.yaml_to_rdf(self.yaml_root) self.load_from_rdf_graph(G)
def get_bio_hierarchies(from_pickle=True): if from_pickle: import pickle hierarchy_file = os.path.dirname(os.path.abspath(__file__)) + \ '/../resources/bio_hierarchies.pkl' with open(hierarchy_file, 'rb') as fh: hierarchies = pickle.load(fh) return hierarchies # Load the default entity and modification hierarchies entity_file_path = os.path.join(os.path.dirname(__file__), '../resources/entity_hierarchy.rdf') mod_file_path = os.path.join(os.path.dirname(__file__), '../resources/modification_hierarchy.rdf') act_file_path = os.path.join(os.path.dirname(__file__), '../resources/activity_hierarchy.rdf') ccomp_file_path = os.path.join(os.path.dirname(__file__), '../resources/cellular_component_hierarchy.rdf') # Default entity hierarchy loaded from the RDF file at # `resources/entity_hierarchy.rdf`. entity_hierarchy = HierarchyManager(entity_file_path, build_closure=True, uri_as_name=True) # Default modification hierarchy loaded from the RDF file at # `resources/modification_hierarchy.rdf`. modification_hierarchy = HierarchyManager(mod_file_path, build_closure=True, uri_as_name=True) # Default activity hierarchy loaded from the RDF file at # `resources/activity_hierarchy.rdf`. activity_hierarchy = HierarchyManager(act_file_path, build_closure=True, uri_as_name=True) # Default cellular_component hierarchy loaded from the RDF file at # `resources/cellular_component_hierarchy.rdf`. ccomp_hierarchy = HierarchyManager(ccomp_file_path, build_closure=False, uri_as_name=False) hierarchies = {'entity': entity_hierarchy, 'modification': modification_hierarchy, 'activity': activity_hierarchy, 'cellular_component': ccomp_hierarchy} return hierarchies hierarchies = get_bio_hierarchies() def get_wm_hierarchies(): eidos_ont = os.path.join(os.path.dirname(__file__), '../sources/eidos/eidos_ontology.rdf') hume_ont = os.path.join(os.path.dirname(__file__), '../sources/hume/hume_ontology.rdf') trips_ont = os.path.join(os.path.dirname(__file__), '../sources/cwms/trips_ontology.rdf') sofia_ont = os.path.join(os.path.dirname(__file__), '../sources/sofia/sofia_ontology.rdf') hm = HierarchyManager(eidos_ont, build_closure=True, uri_as_name=True) hm.extend_with(hume_ont) hm.extend_with(trips_ont) hm.extend_with(sofia_ont) wm_hierarchies = {'entity': hm} return wm_hierarchies
[docs]class UnknownNamespaceException(Exception): pass