Source code for indra.preassembler.ontology_mapper

import os
import rdflib
import logging
# Python 3
try:
    from functools import lru_cache
# Python 2
except ImportError:
    from functools32 import lru_cache


logger = logging.getLogger(__name__)


[docs]class OntologyMapper(object): """A class to map between ontologies in grounded arguments of Statements. Parameters ---------- statements : list[indra.statement.Statement] A list of INDRA Statements to map mappings : Optional[list[tuple]] A list of tuples that map ontology entries to one another symmetric : Optional[bool] If True, the mappings are interpreted as symmetric and will be applied in both directions scored : Optional[bool] If True, the mappings are assumed to be scored and the scores are propagated into the mapped groundings. If False, the scores don't need to be provided in the mappings and even if they are, they are ignored. Default: False """ def __init__(self, statements, mappings=None, symmetric=True, scored=False): self.statements = statements if mappings is None: self.mappings = _load_default_mappings() else: self.mappings = mappings self.symmetric = symmetric if self.symmetric: self._add_reverse_map() self.scored = scored
[docs] def map_statements(self): """Run the ontology mapping on the statements.""" for stmt in self.statements: for agent in stmt.agent_list(): if agent is None: continue all_mappings = [] for db_name, db_id in agent.db_refs.items(): if isinstance(db_id, list): db_id = db_id[0][0] mappings = self._map_id(db_name, db_id) all_mappings += mappings for map_db_name, map_db_id, score, orig_db_name in all_mappings: if map_db_name in agent.db_refs: continue if self.scored: # If the original one is a scored grounding, # we take that score and multiply it with the mapping # score. Otherwise we assume the original score is 1. try: orig_score = agent.db_refs[orig_db_name][0][1] except Exception: orig_score = 1.0 agent.db_refs[map_db_name] = \ [(map_db_id, score * orig_score)] else: if map_db_name in ('UN', 'HUME'): agent.db_refs[map_db_name] = [(map_db_id, 1.0)] else: agent.db_refs[map_db_name] = map_db_id
def _add_reverse_map(self): for m1, m2 in self.mappings: if (m2, m1) not in self.mappings: self.mappings.append((m2, m1)) @lru_cache(maxsize=100000) def _map_id(self, db_name, db_id): mappings = [] # TODO: This lookup should be optimized using a dict for mapping in self.mappings: if self.scored: m1, m2, score = mapping else: m1, m2 = mapping[:2] score = 1.0 if m1 == (db_name, db_id) or \ ((not isinstance(m1, list)) and (m1 == (db_name, db_id.lower()))): mappings.append((m2[0], m2[1], score, db_name)) return mappings
def _load_default_mappings(): return [(('UN', 'entities/x'), ('HUME', 'entities/y'))] def _load_wm_map(exclude_auto=None): """Load an ontology map for world models. exclude_auto : None or list[tuple] A list of ontology mappings for which automated mappings should be excluded, e.g. [(HUME, UN)] would result in not using mappings from HUME to UN. """ exclude_auto = [] if not exclude_auto else exclude_auto path_here = os.path.dirname(os.path.abspath(__file__)) ontomap_file = os.path.join(path_here, '../resources/wm_ontomap.tsv') mappings = {} def make_hume_prefix_map(): hume_ont = os.path.join(path_here, '../sources/hume/hume_ontology.rdf') graph = rdflib.Graph() graph.parse(os.path.abspath(hume_ont), format='nt') entry_map = {} for node in graph.all_nodes(): entry = node.split('#')[1] # Handle "event" and other top-level entries if '/' not in entry: entry_map[entry] = None continue parts = entry.split('/') prefix, real_entry = parts[0], '/'.join(parts[1:]) entry_map[real_entry] = prefix return entry_map hume_prefix_map = make_hume_prefix_map() def add_hume_prefix(hume_entry): """We need to do this because the HUME prefixes are missing""" prefix = hume_prefix_map[hume_entry] return '%s/%s' % (prefix, hume_entry) def map_entry(reader, entry): """Remap the readers and entries to match our internal standards.""" if reader == 'eidos': namespace = 'UN' entry = entry.replace(' ', '_') entry_id = entry elif reader == 'BBN': namespace = 'HUME' entry = entry.replace(' ', '_') entry_id = add_hume_prefix(entry) elif reader == 'sofia': namespace = 'SOFIA' # First chop off the Event/Entity prefix parts = entry.split('/')[1:] # Now we split each part by underscore and capitalize # each piece of each part parts = ['_'.join([p.capitalize() for p in part.split('_')]) for part in parts] # Finally we stick the entry back together separated by slashes entry_id = '/'.join(parts) else: return reader, entry return namespace, entry_id with open(ontomap_file, 'r') as fh: for line in fh.readlines(): # Get each entry from the line s, se, t, te, score = line.strip().split('\t') score = float(score) # Map the entries to our internal naming standards s, se = map_entry(s, se) t, te = map_entry(t, te) # Skip automated mappings when they should be excluded if (s, t) not in exclude_auto: # We first do the forward mapping if (s, se, t) in mappings: if mappings[(s, se, t)][1] < score: mappings[(s, se, t)] = ((t, te), score) else: mappings[(s, se, t)] = ((t, te), score) # Then we add the reverse mapping if (t, s) not in exclude_auto: if (t, te, s) in mappings: if mappings[(t, te, s)][1] < score: mappings[(t, te, s)] = ((s, se), score) else: mappings[(t, te, s)] = ((s, se), score) ontomap = [] for s, ts in mappings.items(): ontomap.append(((s[0], s[1]), ts[0], ts[1])) # Now apply the Hume -> Eidos override override_file = os.path.join(path_here, '../resources/wm_ontomap.bbn.tsv') override_mappings = [] with open(override_file, 'r') as fh: for row in fh.readlines(): if 'BBN' not in row: continue # Order is target first, source second _, te, _, se = row.strip().split('\t') # Map the entries to our internal naming standards s = 'HUME' t = 'UN' se = se.replace(' ', '_') te = te.replace(' ', '_') if se.startswith('/'): se = se[1:] override_mappings.append((s, se, t, te)) for s, se, t, te in override_mappings: found = False for idx, ((so, seo), (eo, teo), score) in enumerate(ontomap): if (s, se, t) == (so, seo, eo): # Override when a match is found ontomap[idx] = ((s, se), (t, te), 1.0) found = True if not found: ontomap.append(((s, se), (t, te), 1.0)) return ontomap try: wm_ontomap = _load_wm_map() except Exception as e: wm_ontomap = []