Source code for indra.sources.reach.processor

from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str

import os
import re
import logging
import objectpath

from indra.statements import *
from indra.util import read_unicode_csv
from indra.databases import hgnc_client
import indra.databases.uniprot_client as up_client
from collections import namedtuple

logger = logging.getLogger(__name__)

Site = namedtuple('Site', ['residue', 'position'])


[docs]class ReachProcessor(object): """The ReachProcessor extracts INDRA Statements from REACH parser output. Parameters ---------- json_dict : dict A JSON dictionary containing the REACH extractions. pmid : Optional[str] The PubMed ID associated with the extractions. This can be passed in case the PMID cannot be determined from the extractions alone.` Attributes ---------- tree : objectpath.Tree The objectpath Tree object representing the extractions. statements : list[indra.statements.Statement] A list of INDRA Statements that were extracted by the processor. citation : str The PubMed ID associated with the extractions. all_events : dict[str, str] The frame IDs of all events by type in the REACH extraction. """ def __init__(self, json_dict, pmid=None): self.tree = objectpath.Tree(json_dict) self.statements = [] self.citation = pmid if pmid is None: if self.tree is not None: self.citation =\ self.tree.execute("$.events.object_meta.doc_id") self.get_all_events()
[docs] def print_event_statistics(self): """Print the number of events in the REACH output by type.""" logger.info('All events by type') logger.info('-------------------') for k, v in self.all_events.items(): logger.info('%s, %s' % (k, len(v))) logger.info('-------------------')
[docs] def get_all_events(self): """Gather all event IDs in the REACH output by type. These IDs are stored in the self.all_events dict. """ self.all_events = {} events = self.tree.execute("$.events.frames") if events is None: return for e in events: event_type = e.get('type') frame_id = e.get('frame_id') try: self.all_events[event_type].append(frame_id) except KeyError: self.all_events[event_type] = [frame_id]
def print_regulations(self): qstr = "$.events.frames[(@.type is 'regulation')]" res = self.tree.execute(qstr) if res is None: return for r in res: print(r['subtype']) for a in r['arguments']: print(a['type'], '/', a['argument-type'], ':', a['text'])
[docs] def get_modifications(self): """Extract Modification INDRA Statements.""" # Find all event frames that are a type of protein modification qstr = "$.events.frames[(@.type is 'protein-modification')]" res = self.tree.execute(qstr) if res is None: return # Extract each of the results when possible for r in res: # The subtype of the modification modification_type = r.get('subtype') # Skip negated events (i.e. something doesn't happen) epistemics = self._get_epistemics(r) if epistemics.get('negated'): continue annotations, context = self._get_annot_context(r) frame_id = r['frame_id'] args = r['arguments'] site = None theme = None # Find the substrate (the "theme" agent here) and the # site and position it is modified on for a in args: if self._get_arg_type(a) == 'theme': theme = a['arg'] elif self._get_arg_type(a) == 'site': site = a['text'] theme_agent, theme_coords = self._get_agent_from_entity(theme) if site is not None: mods = self._parse_site_text(site) else: mods = [(None, None)] for mod in mods: # Add up to one statement for each site residue, pos = mod # Now we need to look for all regulation event to get to the # enzymes (the "controller" here) qstr = "$.events.frames[(@.type is 'regulation') and " + \ "(@.arguments[0].arg is '%s')]" % frame_id reg_res = self.tree.execute(qstr) reg_res = list(reg_res) for reg in reg_res: controller_agent, controller_coords = None, None for a in reg['arguments']: if self._get_arg_type(a) == 'controller': controller = a.get('arg') if controller is not None: controller_agent, controller_coords = \ self._get_agent_from_entity(controller) break # Check the polarity of the regulation and if negative, # flip the modification type. # For instance, negative-regulation of a phosphorylation # will become an (indirect) dephosphorylation reg_subtype = reg.get('subtype') if reg_subtype == 'negative-regulation': modification_type = \ modtype_to_inverse.get(modification_type) if not modification_type: logger.warning('Unhandled modification type: %s' % modification_type) continue sentence = reg['verbose-text'] annotations['agents']['coords'] = [controller_coords, theme_coords] ev = Evidence(source_api='reach', text=sentence, annotations=annotations, pmid=self.citation, context=context, epistemics=epistemics) args = [controller_agent, theme_agent, residue, pos, ev] # Here ModStmt is a sub-class of Modification ModStmt = modtype_to_modclass.get(modification_type) if ModStmt is None: logger.warning('Unhandled modification type: %s' % modification_type) else: # Handle this special case here because only # enzyme argument is needed if modification_type == 'autophosphorylation': args = [theme_agent, residue, pos, ev] self.statements.append(ModStmt(*args))
[docs] def get_regulate_amounts(self): """Extract RegulateAmount INDRA Statements.""" qstr = "$.events.frames[(@.type is 'transcription')]" res = self.tree.execute(qstr) all_res = [] if res is not None: all_res += list(res) qstr = "$.events.frames[(@.type is 'amount')]" res = self.tree.execute(qstr) if res is not None: all_res += list(res) for r in all_res: subtype = r.get('subtype') epistemics = self._get_epistemics(r) if epistemics.get('negated'): continue annotations, context = self._get_annot_context(r) frame_id = r['frame_id'] args = r['arguments'] theme = None for a in args: if self._get_arg_type(a) == 'theme': theme = a['arg'] break if theme is None: continue theme_agent, theme_coords = self._get_agent_from_entity(theme) qstr = "$.events.frames[(@.type is 'regulation') and " + \ "(@.arguments[0].arg is '%s')]" % frame_id reg_res = self.tree.execute(qstr) for reg in reg_res: controller_agent, controller_coords = None, None for a in reg['arguments']: if self._get_arg_type(a) == 'controller': controller_agent, controller_coords = \ self._get_controller_agent(a) sentence = reg['verbose-text'] annotations['agents']['coords'] = [controller_coords, theme_coords] ev = Evidence(source_api='reach', text=sentence, annotations=annotations, pmid=self.citation, context=context, epistemics=epistemics) args = [controller_agent, theme_agent, ev] subtype = reg.get('subtype') if subtype == 'positive-regulation': st = IncreaseAmount(*args) else: st = DecreaseAmount(*args) self.statements.append(st)
[docs] def get_complexes(self): """Extract INDRA Complex Statements.""" qstr = "$.events.frames[@.type is 'complex-assembly']" res = self.tree.execute(qstr) if res is None: return for r in res: epistemics = self._get_epistemics(r) if epistemics.get('negated'): continue # Due to an issue with the REACH output serialization # (though seemingly not with the raw mentions), sometimes # a redundant complex-assembly event is reported which can # be recognized by the missing direct flag, which we can filter # for here if epistemics.get('direct') is None: continue annotations, context = self._get_annot_context(r) args = r['arguments'] sentence = r['verbose-text'] members = [] agent_coordinates = [] for a in args: agent, coords = self._get_agent_from_entity(a['arg']) members.append(agent) agent_coordinates.append(coords) annotations['agents']['coords'] = agent_coordinates ev = Evidence(source_api='reach', text=sentence, annotations=annotations, pmid=self.citation, context=context, epistemics=epistemics) stmt = Complex(members, ev) self.statements.append(stmt)
[docs] def get_activation(self): """Extract INDRA Activation Statements.""" qstr = "$.events.frames[@.type is 'activation']" res = self.tree.execute(qstr) if res is None: return for r in res: epistemics = self._get_epistemics(r) if epistemics.get('negated'): continue sentence = r['verbose-text'] annotations, context = self._get_annot_context(r) ev = Evidence(source_api='reach', text=sentence, pmid=self.citation, annotations=annotations, context=context, epistemics=epistemics) args = r['arguments'] for a in args: if self._get_arg_type(a) == 'controller': controller_agent, controller_coords = \ self._get_controller_agent(a) if self._get_arg_type(a) == 'controlled': controlled = a['arg'] controlled_agent, controlled_coords = \ self._get_agent_from_entity(controlled) annotations['agents']['coords'] = [controller_coords, controlled_coords] if r['subtype'] == 'positive-activation': st = Activation(controller_agent, controlled_agent, evidence=ev) else: st = Inhibition(controller_agent, controlled_agent, evidence=ev) self.statements.append(st)
[docs] def get_translocation(self): """Extract INDRA Translocation Statements.""" qstr = "$.events.frames[@.type is 'translocation']" res = self.tree.execute(qstr) if res is None: return for r in res: epistemics = self._get_epistemics(r) if epistemics.get('negated'): continue sentence = r['verbose-text'] annotations, context = self._get_annot_context(r) args = r['arguments'] from_location = None to_location = None for a in args: if self._get_arg_type(a) == 'theme': agent, theme_coords = self._get_agent_from_entity(a['arg']) if agent is None: continue elif self._get_arg_type(a) == 'source': from_location = self._get_location_by_id(a['arg']) elif self._get_arg_type(a) == 'destination': to_location = self._get_location_by_id(a['arg']) annotations['agents']['coords'] = [theme_coords] ev = Evidence(source_api='reach', text=sentence, pmid=self.citation, annotations=annotations, context=context, epistemics=epistemics) st = Translocation(agent, from_location, to_location, evidence=ev) self.statements.append(st)
def _get_location_by_id(self, loc_id): qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % loc_id res = self.tree.execute(qstr) if res is None: return None try: entity_term = next(res) except StopIteration: logger.debug(' %s is not an entity' % loc_id) return None name = entity_term.get('text') go_id = None for xr in entity_term['xrefs']: ns = xr['namespace'] if ns == 'go': go_id = xr['id'] # Try to get valid location based on GO id if go_id is not None: try: loc = get_valid_location(go_id) return loc except InvalidLocationError: pass # See if the raw name is a valid cellular component try: loc = get_valid_location(name.lower()) return loc except InvalidLocationError: pass return None def _get_agent_from_entity(self, entity_id): qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % entity_id res = self.tree.execute(qstr) if res is None: return None, None try: entity_term = next(res) except StopIteration: logger.debug(' %s is not an entity' % entity_id) return None, None # This is the default name, which can be overwritten # below for specific database entries agent_name, db_refs = self._get_db_refs(entity_term) mod_terms = entity_term.get('modifications') mods, muts = self._get_mods_and_muts_from_mod_terms(mod_terms) # get sentence coordinates of the entity coords = self._get_entity_coordinates(entity_term) agent = Agent(agent_name, db_refs=db_refs, mods=mods, mutations=muts) return agent, coords @staticmethod def _get_db_refs(entity_term): agent_name = entity_term['text'] db_refs = {} for xr in entity_term['xrefs']: ns = xr['namespace'] if ns == 'uniprot': up_id = xr['id'] db_refs['UP'] = up_id # Look up official names in UniProt gene_name = up_client.get_gene_name(up_id) if gene_name is not None: agent_name = gene_name # If the gene name corresponds to an HGNC ID, add it to the # db_refs if up_client.is_human(up_id): hgnc_id = hgnc_client.get_hgnc_id(gene_name) if hgnc_id: db_refs['HGNC'] = hgnc_id elif ns == 'hgnc': hgnc_id = xr['id'] db_refs['HGNC'] = hgnc_id # Look up the standard gene symbol and set as name hgnc_name = hgnc_client.get_hgnc_name(hgnc_id) if hgnc_name: agent_name = hgnc_name # Look up the corresponding uniprot id up_id = hgnc_client.get_uniprot_id(hgnc_id) if up_id: db_refs['UP'] = up_id elif ns == 'pfam': be_id = famplex_map.get(('PF', xr['id'])) if be_id: db_refs['FPLX'] = be_id agent_name = be_id db_refs['PF'] = xr['id'] elif ns == 'interpro': be_id = famplex_map.get(('IP', xr['id'])) if be_id: db_refs['FPLX'] = be_id agent_name = be_id db_refs['IP'] = xr['id'] elif ns == 'chebi': db_refs['CHEBI'] = xr['id'] elif ns == 'pubchem': db_refs['PUBCHEM'] = xr['id'] elif ns == 'go': db_refs['GO'] = xr['id'] elif ns == 'mesh': db_refs['MESH'] = xr['id'] elif ns == 'hmdb': db_refs['HMDB'] = xr['id'] elif ns == 'simple_chemical': if xr['id'].startswith('HMDB'): db_refs['HMDB'] = xr['id'] elif ns == 'be': db_refs['FPLX'] = xr['id'] agent_name = db_refs['FPLX'] # These name spaces are ignored elif ns in ['uaz']: pass else: logger.warning('Unhandled xref namespace: %s' % ns) db_refs['TEXT'] = entity_term['text'] return agent_name, db_refs def _get_mods_and_muts_from_mod_terms(self, mod_terms): mods = [] muts = [] if mod_terms is not None: for m in mod_terms: if m['type'].lower() == 'mutation': # Evidence is usualy something like "V600E" # We could parse this to get the amino acid # change that happened. mutation_str = m.get('evidence') # TODO: sometimes mutation_str is "mutant", "Mutant", # "mutants" - this indicates that there is a mutation # but not the specific type. We should encode this # somehow as a "blank" mutation condition mut = self._parse_mutation(mutation_str) if mut is not None: muts.append(mut) else: mcs = self._get_mod_conditions(m) mods.extend(mcs) return mods, muts def _get_mod_conditions(self, mod_term): """Return a list of ModConditions given a mod term dict.""" site = mod_term.get('site') if site is not None: mods = self._parse_site_text(site) else: mods = [Site(None, None)] mcs = [] for mod in mods: mod_res, mod_pos = mod mod_type_str = mod_term['type'].lower() mod_state = agent_mod_map.get(mod_type_str) if mod_state is not None: mc = ModCondition(mod_state[0], residue=mod_res, position=mod_pos, is_modified=mod_state[1]) mcs.append(mc) else: logger.warning('Unhandled entity modification type: %s' % mod_type_str) return mcs def _get_entity_coordinates(self, entity_term): """Return sentence coordinates for a given entity. Given an entity term return the associated sentence coordinates as a tuple of the form (int, int). Returns None if for any reason the sentence coordinates cannot be found. """ # The following lines get the starting coordinate of the sentence # containing the entity. sent_id = entity_term.get('sentence') if sent_id is None: return None qstr = "$.sentences.frames[(@.frame_id is \'%s')]" % sent_id res = self.tree.execute(qstr) if res is None: return None try: sentence = next(res) except StopIteration: return None sent_start = sentence.get('start-pos') if sent_start is None: return None sent_start = sent_start.get('offset') if sent_start is None: return None # Get the entity coordinate in the entire text and subtract the # coordinate of the first character in the associated sentence to # get the sentence coordinate of the entity. Return None if entity # coordinates are missing entity_start = entity_term.get('start-pos') entity_stop = entity_term.get('end-pos') if entity_start is None or entity_stop is None: return None entity_start = entity_start.get('offset') entity_stop = entity_stop.get('offset') if entity_start is None or entity_stop is None: return None return (entity_start - sent_start, entity_stop - sent_start) def _get_annot_context(self, frame_term): annotations = {'found_by': frame_term['found_by'], 'agents': {}} try: context_id = frame_term['context'] except KeyError: return annotations, None # For backwards compatibility with older versions # of REACH if isinstance(context_id, dict): context_term = context_id species = context_term.get('Species') cell_type = context_term.get('CellType') cell_line = None location = None tissue = None organ = None else: qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % context_id[0] res = self.tree.execute(qstr) if res is None: return annotations, None context_frame = next(res) facets = context_frame['facets'] cell_line = facets.get('cell-line') cell_type = facets.get('cell-type') species = facets.get('organism') location = facets.get('location') tissue = facets.get('tissue_type') organ = facets.get('organ') def get_ref_context(lst): if not lst: return None db_name, db_id = lst[0].split(':', 1) return RefContext(db_refs={db_name.upper(): db_id}) context = BioContext() # Example: ['taxonomy:9606'] context.species = get_ref_context(species) # Example: ['cl:CL:0000148'] context.cell_type = get_ref_context(cell_type) # Example: ['cellosaurus:CVCL_0504'] context.cell_line = get_ref_context(cell_line) # Example: ['go:GO:0005886'] context.location = get_ref_context(location) # Example: ['uberon:UBERON:0000105'] context.organ = get_ref_context(organ) # NOTE: we can't handle tissue currently # context['tissue'] = tissue # This is so we don't add a blank BioContext as context and rather # just add None if not context: context = None return annotations, context def _get_epistemics(self, event): epistemics = {} # Check whether information is negative neg = event.get('is_negated') if neg is True: epistemics['negated'] = True # Check if it is a hypothesis hyp = event.get('is_hypothesis') if hyp is True: epistemics['hypothesis'] = True # Check if it is direct if 'is_direct' in event: direct = event['is_direct'] epistemics['direct'] = direct # Get the section of the paper it comes from section = self._get_section(event) epistemics['section_type'] = section return epistemics _section_list = ['title', 'abstract', 'introduction', 'background', 'results', 'methods', 'discussion', 'conclusion', 'supplementary', 'figure'] def _get_section(self, event): """Get the section of the paper that the event is from.""" sentence_id = event.get('sentence') section = None if sentence_id: qstr = "$.sentences.frames[(@.frame_id is \'%s\')]" % sentence_id res = self.tree.execute(qstr) if res: sentence_frame = list(res)[0] passage_id = sentence_frame.get('passage') if passage_id: qstr = "$.sentences.frames[(@.frame_id is \'%s\')]" % \ passage_id res = self.tree.execute(qstr) if res: passage_frame = list(res)[0] section = passage_frame.get('section-id') # If the section is in the standard list, return as is if section in self._section_list: return section # Next, handle a few special cases that come up in practice elif section.startswith('fig'): return 'figure' elif section.startswith('supm'): return 'supplementary' elif section == 'article-title': return 'title' elif section in ['subjects|methods', 'methods|subjects']: return 'methods' elif section == 'conclusions': return 'conclusion' elif section == 'intro': return 'introduction' else: return None def _get_controller_agent(self, arg): """Return a single or a complex controller agent.""" controller_agent = None controller = arg.get('arg') # There is either a single controller here if controller is not None: controller_agent, coords = self._get_agent_from_entity(controller) # Or the controller is a complex elif arg['argument-type'] == 'complex': controllers = list(arg.get('args').values()) controller_agent, coords = \ self._get_agent_from_entity(controllers[0]) bound_agents = [self._get_agent_from_entity(c)[0] for c in controllers[1:]] bound_conditions = [BoundCondition(ba, True) for ba in bound_agents] controller_agent.bound_conditions = bound_conditions return controller_agent, coords @staticmethod def _get_arg_type(arg): """Return the type of the argument with backwards compatibility.""" if arg.get('argument_label') is not None: return arg.get('argument_label') else: return arg.get('type') @staticmethod def _parse_mutation(s): m = re.match(r'([A-Z]+)([0-9]+)([A-Z]+)', s.upper()) if m is not None: parts = [str(g) for g in m.groups()] try: residue_from = get_valid_residue(parts[0]) except Exception as e: return None try: residue_to = get_valid_residue(parts[2]) except Exception as e: return None position = parts[1] mut = MutCondition(position, residue_from, residue_to) return mut elif s.lower() in ('mutation', 'mutations', 'mutant', 'mutants', 'mutational'): mut = MutCondition(None, None, None) return mut else: logger.warning('Unhandled mutation string: %s' % s) return None @staticmethod def _parse_site_text(s): has_comma = ',' in s has_slash = '/' in s has_both = has_comma and has_slash if has_both: logger.error(s + ' is not a valid site text string') return [] if has_comma: texts = s.split(',') else: texts = s.split('/') sites = [ReachProcessor._parse_site_text_single(t) for t in texts] # If the first site has a residue, and the remaining sites do not # explicitly give a residue (example: Tyr-577/576), then apply the # first site's residue to all sites in the site text. only_first_site_has_residue = sites[0].residue is not None for i in range(1, len(sites)): if sites[i].residue is not None: only_first_site_has_residue = False if only_first_site_has_residue: for i in range(1, len(sites)): sites[i] = Site(sites[0].residue, sites[i].position) return sites @staticmethod def _parse_site_text_single(s): s = s.strip() for p in (_site_pattern1, _site_pattern2, _site_pattern3): m = re.match(p, s.upper()) if m is not None: residue = get_valid_residue(m.groups()[0]) site = m.groups()[1] return Site(residue, site) m = re.match(_site_pattern4, s.upper()) if m is not None: site = m.groups()[0] residue = m.groups()[1] return Site(residue, site) for p in (_site_pattern5, _site_pattern6, _site_pattern7): m = re.match(p, s.upper()) if m is not None: residue = get_valid_residue(m.groups()[0]) site = None return Site(residue, site) m = re.match(_site_pattern8, s.upper()) if m is not None: site = m.groups()[0] residue = None return Site(residue, site) logger.warning('Could not parse site text %s' % s) return Site(None, None)
_site_pattern1 = '([' + ''.join(list(amino_acids.keys())) + '])[-]?([0-9]+)$' _site_pattern2 = '(' + '|'.join([v['short_name'].upper() for v in amino_acids.values()]) + \ ')[- ]?([0-9]+)$' _site_pattern3 = '(' + '|'.join([v['indra_name'].upper() for v in amino_acids.values()]) + \ ')[^0-9]*([0-9]+)$' _site_pattern4 = '([0-9]+)[ ]?([' + ''.join(list(amino_acids.keys())) + '])$' _site_pattern5 = '^([' + ''.join(list(amino_acids.keys())) + '])$' _site_pattern6 = '^(' + '|'.join([v['short_name'].upper() for v in amino_acids.values()]) + ')$' _site_pattern7 = '.*(' + '|'.join([v['indra_name'].upper() for v in amino_acids.values()]) + ').*' _site_pattern8 = '([0-9]+)$' # Subtypes that exist but we don't handle: hydrolysis agent_mod_map = { 'phosphorylation': ('phosphorylation', True), 'phosphorylated': ('phosphorylation', True), 'dephosphorylation': ('phosphorylation', False), 'acetylation': ('acetylation', True), 'deacetylation': ('acetylation', False), 'ubiquitination': ('ubiquitination', True), 'deubiquitination': ('ubiquitination', False), 'hydroxylation': ('hydroxylation', True), 'dehydroxylation': ('hydroxylation', False), 'sumoylation': ('sumoylation', True), 'desumoylation': ('sumoylation', False), 'glycosylation': ('glycosylation', True), 'deglycosylation': ('glycosylation', False), 'farnesylation': ('farnesylation', True), 'defarnesylation': ('farnesylation', False), 'ribosylation': ('ribosylation', True), 'deribosylation': ('ribosylation', False), 'methylation': ('methylation', True), 'demethylation': ('methylation', False), 'unknown': ('modification', True), } def _read_famplex_map(): fname = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../resources/famplex_map.tsv') famplex_map = {} csv_rows = read_unicode_csv(fname, delimiter='\t') for row in csv_rows: source_ns = row[0] source_id = row[1] be_id = row[2] famplex_map[(source_ns, source_id)] = be_id return famplex_map famplex_map = _read_famplex_map() def _read_reach_rule_regexps(): """Load in a file with the regular expressions corresponding to each reach rule. Why regular expression matching? The rule name in found_by has instances of some reach rules for each possible event type (activation, binding, etc). This makes for too many different types of rules for practical curation of examples. We use regular expressions to only match the rule used for extraction, independently of what the event is. """ reach_rule_filename = \ os.path.join(os.path.dirname(os.path.abspath(__file__)), 'reach_rule_regexps.txt') with open(reach_rule_filename, 'r') as f: reach_rule_regexp = [] for line in f: reach_rule_regexp.append(line.rstrip()) return reach_rule_regexp reach_rule_regexps = _read_reach_rule_regexps()
[docs]def determine_reach_subtype(event_name): """Returns the category of reach rule from the reach rule instance. Looks at a list of regular expressions corresponding to reach rule types, and returns the longest regexp that matches, or None if none of them match. Parameters ---------- evidence : indra.statements.Evidence A reach evidence object to subtype Returns ------- best_match : str A regular expression corresponding to the reach rule that was used to extract this evidence """ best_match_length = None best_match = None for ss in reach_rule_regexps: if re.search(ss, event_name): if best_match is None or len(ss) > best_match_length: best_match = ss best_match_length = len(ss) return best_match