import os
import re
import logging
import objectpath
from collections import defaultdict
from indra.statements import *
from indra.util import read_unicode_csv
from indra.databases import go_client, uniprot_client
from indra.ontology.standardize import \
standardize_db_refs, standardize_agent_name, \
from indra.statements.validate import validate_text_refs
from collections import namedtuple
logger = logging.getLogger(__name__)
Site = namedtuple('Site', ['residue', 'position'])
[docs]class ReachProcessor(object):
"""The ReachProcessor extracts INDRA Statements from REACH parser output.
json_dict : dict
A JSON dictionary containing the REACH extractions.
pmid : Optional[str]
The PubMed ID associated with the extractions. This can be passed
in case the PMID cannot be determined from the extractions alone.`
tree : objectpath.Tree
The objectpath Tree object representing the extractions.
statements : list[indra.statements.Statement]
A list of INDRA Statements that were extracted by the processor.
citation : str
The PubMed ID associated with the extractions.
all_events : dict[str, str]
The frame IDs of all events by type in the REACH extraction.
organism_priority : list[str]
A list of Taxonomy IDs providing prioritization among organisms
when choosing protein grounding. If not given, the default behavior
takes the first match produced by Reach, which is prioritized to be
a human protein if such a match exists.
def __init__(self, json_dict, pmid=None, organism_priority=None):
self.tree = objectpath.Tree(json_dict)
self.organism_priority = organism_priority
self.statements = []
self.citation = pmid
if pmid is None:
if self.tree is not None:
self.citation =\
if not validate_text_refs({'PMID': self.citation}):
logger.debug('The citation added is not a valid '
'PMID, removing.')
self.citation = None
[docs] def print_event_statistics(self):
"""Print the number of events in the REACH output by type."""'All events by type')'-------------------')
for k, v in self.all_events.items():'%s, %s' % (k, len(v)))'-------------------')
[docs] def get_all_events(self):
"""Gather all event IDs in the REACH output by type.
These IDs are stored in the self.all_events dict.
self.all_events = {}
events = self.tree.execute("$.events.frames")
if events is None:
for e in events:
event_type = e.get('type')
frame_id = e.get('frame_id')
except KeyError:
self.all_events[event_type] = [frame_id]
[docs] def get_all_entities(self):
"""Return all entities extracted, even ones not part of events."""
return self.tree.execute("$.entities.frames")
[docs] def get_agents_from_entities(self):
"""Return INDRA Agents extracted from all entities, even ones not
part of events."""
entities = self.get_all_entities()
agents = [
for e in entities
agents = [a for a in agents if a is not None]
return agents
[docs] def get_agents_from_entities_with_coords(self, sentence_coords=False):
"""Return INDRA Agents extracted from all entities, along with
global document-level coordinates, even ones not part of events."""
entities = self.get_all_entities()
agents_coords = [
self._get_agent_from_entity(e['frame_id'], sentence_coords=sentence_coords)
for e in entities
agents_coords = [a for a in agents_coords if a[0] is not None]
return agents_coords
def print_regulations(self):
qstr = "$.events.frames[(@.type is 'regulation')]"
res = self.tree.execute(qstr)
if res is None:
for r in res:
for a in r['arguments']:
print(a['type'], '/', a['argument-type'], ':', a['text'])
[docs] def get_modifications(self):
"""Extract Modification INDRA Statements."""
# Find all event frames that are a type of protein modification
qstr = "$.events.frames[(@.type is 'protein-modification')]"
res = self.tree.execute(qstr)
if res is None:
# Extract each of the results when possible
for r in res:
# The subtype of the modification
modification_type = r.get('subtype')
# Skip negated events (i.e. something doesn't happen)
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
annotations, context = self._get_annot_context(r)
frame_id = r['frame_id']
args = r['arguments']
site = None
theme = None
# Find the substrate (the "theme" agent here) and the
# site and position it is modified on
for a in args:
if self._get_arg_type(a) == 'theme':
theme = a['arg']
elif self._get_arg_type(a) == 'site':
site = a['text']
theme_agent, theme_coords = self._get_agent_from_entity(theme)
if site is not None:
mods = self._parse_site_text(site)
mods = [(None, None)]
for mod in mods:
# Add up to one statement for each site
residue, pos = mod
# Now we need to look for all regulation event to get to the
# enzymes (the "controller" here)
qstr = "$.events.frames[(@.type is 'regulation') and " + \
"(@.arguments[0].arg is '%s')]" % frame_id
reg_res = self.tree.execute(qstr)
reg_res = list(reg_res)
for reg in reg_res:
controller_agent, controller_coords = None, None
for a in reg['arguments']:
if self._get_arg_type(a) == 'controller':
controller = a.get('arg')
if controller is not None:
controller_agent, controller_coords = \
# Check the polarity of the regulation and if negative,
# flip the modification type.
# For instance, negative-regulation of a phosphorylation
# will become an (indirect) dephosphorylation
reg_subtype = reg.get('subtype')
if reg_subtype == 'negative-regulation':
modification_type = \
if not modification_type:
logger.warning('Unhandled modification type: %s' %
sentence = reg['verbose-text']
annotations['agents']['coords'] = [controller_coords,
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
args = [controller_agent, theme_agent, residue, pos, ev]
# Here ModStmt is a sub-class of Modification
ModStmt = modtype_to_modclass.get(modification_type)
if ModStmt is None:
logger.warning('Unhandled modification type: %s' %
# Handle this special case here because only
# enzyme argument is needed
if modification_type == 'autophosphorylation':
args = [theme_agent, residue, pos, ev]
[docs] def get_regulate_amounts(self):
"""Extract RegulateAmount INDRA Statements."""
qstr = "$.events.frames[(@.type is 'transcription')]"
res = self.tree.execute(qstr)
all_res = []
if res is not None:
all_res += list(res)
qstr = "$.events.frames[(@.type is 'amount')]"
res = self.tree.execute(qstr)
if res is not None:
all_res += list(res)
for r in all_res:
subtype = r.get('subtype')
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
annotations, context = self._get_annot_context(r)
frame_id = r['frame_id']
args = r['arguments']
theme = None
for a in args:
if self._get_arg_type(a) == 'theme':
theme = a['arg']
if theme is None:
theme_agent, theme_coords = self._get_agent_from_entity(theme)
qstr = "$.events.frames[((@.type is 'regulation') or "\
"(@.type is 'activation')) and " + \
"(@.arguments[0].arg is '%s')]" % frame_id
reg_res = self.tree.execute(qstr)
for reg in reg_res:
controller_agent, controller_coords = None, None
for a in reg['arguments']:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
sentence = reg['verbose-text']
annotations['agents']['coords'] = [controller_coords,
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
args = [controller_agent, theme_agent, ev]
subtype = reg.get('subtype')
if subtype.startswith('positive'):
st = IncreaseAmount(*args)
st = DecreaseAmount(*args)
[docs] def get_complexes(self):
"""Extract INDRA Complex Statements."""
qstr = "$.events.frames[@.type is 'complex-assembly']"
res = self.tree.execute(qstr)
if res is None:
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
# Due to an issue with the REACH output serialization
# (though seemingly not with the raw mentions), sometimes
# a redundant complex-assembly event is reported which can
# be recognized by the missing direct flag, which we can filter
# for here
if epistemics.get('direct') is None:
annotations, context = self._get_annot_context(r)
args = r['arguments']
sentence = r['verbose-text']
members = []
agent_coordinates = []
for a in args:
agent, coords = self._get_agent_from_entity(a['arg'])
annotations['agents']['coords'] = agent_coordinates
ev = Evidence(source_api='reach', text=sentence,
annotations=annotations, pmid=self.citation,
context=context, epistemics=epistemics)
stmt = Complex(members, ev)
[docs] def get_activation(self):
"""Extract INDRA Activation Statements."""
qstr = "$.events.frames[@.type is 'activation']"
res = self.tree.execute(qstr)
if res is None:
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
args = r['arguments']
controller_agent = None
for a in args:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
if self._get_arg_type(a) == 'controlled':
controlled = a['arg']
controlled_agent, controlled_coords = \
if controller_agent is None or controlled_agent is None:
annotations['agents']['coords'] = [controller_coords,
positive = (r['subtype'] == 'positive-activation')
# By default, we choose Activation/Inhibition based on polarity
stmt_cls = Activation if positive else Inhibition
stmt_kwargs = {}
# Here we handle a special case where we have the activation of
# a modified form, which we transform into a modification
# statement, e.g., Phosphorylation
if controlled_agent.mods:
# NOTE: can there be more than one mods here?
mod = controlled_agent.mods[0]
# We check all modification classes here
if mod.mod_type in modtype_to_modclass:
stmt_cls = modtype_to_modclass[mod.mod_type]
# We take the residue/position information from
# the modificatio, if available
if mod.residue:
stmt_kwargs['residue'] = mod.residue
if mod.position:
stmt_kwargs['position'] = mod.position
controlled_agent.mods = []
st = stmt_cls(controller_agent, controlled_agent, **stmt_kwargs,
[docs] def get_translocation(self):
"""Extract INDRA Translocation Statements."""
qstr = "$.events.frames[@.type is 'translocation']"
res = self.tree.execute(qstr)
if res is None:
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
args = r['arguments']
from_location = None
to_location = None
for a in args:
if self._get_arg_type(a) == 'theme':
agent, theme_coords = self._get_agent_from_entity(a['arg'])
if agent is None:
elif self._get_arg_type(a) == 'source':
from_location = self._get_location_by_id(a['arg'])
elif self._get_arg_type(a) == 'destination':
to_location = self._get_location_by_id(a['arg'])
# We skip statements that have no locations associated with them
# at all
if not from_location and not to_location:
annotations['agents']['coords'] = [theme_coords]
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
st = Translocation(agent, from_location, to_location,
def get_conversion(self):
qstr = "$.events.frames[@.type is 'conversion']"
res = self.tree.execute(qstr)
if res is None:
for r in res:
epistemics = self._get_epistemics(r)
if epistemics.get('negated'):
sentence = r['verbose-text']
annotations, context = self._get_annot_context(r)
ev = Evidence(source_api='reach', text=sentence,
pmid=self.citation, annotations=annotations,
context=context, epistemics=epistemics)
args = r['arguments']
controller_agent = substrate_agent = product_agent = None
for a in args:
if self._get_arg_type(a) == 'controller':
controller_agent, controller_coords = \
if self._get_arg_type(a) == 'substrate':
substrate_agent, substrate_coords = \
if self._get_arg_type(a) == 'product':
product_agent, product_coords = \
if not all({controller_agent, substrate_agent, product_agent}):
annotations['agents']['coords'] = [controller_coords,
st = Conversion(controller_agent, [substrate_agent],
[product_agent], evidence=[ev])
def _get_location_by_id(self, loc_id):
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % loc_id
res = self.tree.execute(qstr)
if res is None:
return None
entity_term = next(res)
except StopIteration:
logger.debug(' %s is not an entity' % loc_id)
return None
name = entity_term.get('text')
go_id = None
for xr in entity_term['xrefs']:
ns = xr['namespace']
if ns == 'go':
go_id = xr['id']
# If there is no GO ID, we try to "ground" the name to an ID
if go_id is None:
go_id = go_client.get_go_id_from_label_or_synonym(name.lower())
# Try to get valid location based on GO id
if go_id is not None:
loc = go_client.get_go_label(go_id)
if loc:
return loc
return None
def _get_agent_from_entity(self, entity_id, sentence_coords=True):
"""Return an INDRA Agent from a Reach entity ID.
entity_id : str
The ID of the Reach entity.
sentence_coords : Optional[bool]
If True, the sentence coordinates of the entity are returned
rather than the global coordinates. Default: True
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % entity_id
res = self.tree.execute(qstr)
if res is None:
return None, None
entity_term = next(res)
except StopIteration:
logger.debug(' %s is not an entity' % entity_id)
return None, None
# This is the default name, which can be overwritten
# below for specific database entries
if 'text' not in entity_term:
return None, None
agent_name = entity_term['text']
db_refs = self._get_db_refs(entity_term, self.organism_priority)
mod_terms = entity_term.get('modifications')
mods, muts = self._get_mods_and_muts_from_mod_terms(mod_terms)
# get sentence coordinates of the entity
coords = self._get_entity_coordinates(entity_term, sentence_coords=sentence_coords)
agent = Agent(agent_name, db_refs=db_refs, mods=mods, mutations=muts)
standardize_agent_name(agent, standardize_refs=True)
return agent, coords
def _get_db_refs(entity_term, organism_priority=None):
db_refs = {}
for xr in entity_term['xrefs']:
ns = xr['namespace']
if ns == 'uniprot':
# Note: we add both full protein and protein chain
# IDs here so that we can appli organism prioritization in
# a uniform way. Later these will be separated out.
up_id = xr['id']
db_refs['UP'] = up_id
elif ns == 'hgnc':
db_refs['HGNC'] = xr['id']
elif ns == 'pfam':
fplx_id = famplex_map.get(('PF', xr['id']))
if fplx_id:
db_refs['FPLX'] = fplx_id
db_refs['PF'] = xr['id']
elif ns == 'interpro':
fplx_id = famplex_map.get(('IP', xr['id']))
if fplx_id:
db_refs['FPLX'] = fplx_id
db_refs['IP'] = xr['id']
elif ns == 'chebi':
db_refs['CHEBI'] = xr['id']
elif ns == 'pubchem':
db_refs['PUBCHEM'] = xr['id']
elif ns == 'go':
go_id = xr['id']
# Handle secondary to primary mapping if necessary
pri = go_client.get_primary_id(go_id)
if pri:
go_id = pri
db_refs['GO'] = go_id
elif ns == 'mesh':
db_refs['MESH'] = xr['id']
elif ns == 'hmdb':
db_refs['HMDB'] = xr['id']
elif ns == 'simple_chemical':
if xr['id'].startswith('HMDB'):
db_refs['HMDB'] = xr['id']
# We handle "be" here for compatibility with older versions
elif ns in ('fplx', 'be'):
db_refs['FPLX'] = xr['id']
elif ns == 'proonto':
db_refs['PR'] = xr['id']
elif ns == 'taxonomy':
db_refs['TAXONOMY'] = xr['id']
elif ns == 'cl':
db_refs['CL'] = xr['id']
elif ns == 'tissuelist':
db_refs['CALOHA'] = xr['id']
elif ns == 'uberon':
db_refs['UBERON'] = xr['id']
elif ns == 'cellosaurus':
db_refs['CVCL'] = xr['id']
elif ns in ['uo', 'ncit', 'edam.topic', 'ncbitaxon', 'vo', 'bto',
'efo', 'swo', 'iobc', 'ohpi', 'obi', 'ino',
'ido', 'pr']:
db_refs[ns.upper()] = xr['id']
# These name spaces are ignored
elif ns in ['uaz']:
logger.warning('Unhandled xref namespace: %s' % ns)
db_refs['TEXT'] = entity_term['text']
# If we have a UniProt grounding and we have a non-default
# organism priority list, we call the prioritization function
if db_refs.get('UP'):
if organism_priority:
# These are all the unique groundings in the alt-xrefs list,
# which redundantly lists the same match multiple times because
# it enumerates multiple synonyms for organisms redundantly
unique_altxrefs = \
set((axr['namespace'], axr['id'])
for axr in entity_term.get('alt-xrefs', []))
# This returns a single prioritized UniProt ID or None
prioritized_id = \
# If we got an ID, we set the UP grounding to that, otherwise
# we keep what we already got from the primary xref
if prioritized_id:
db_refs['UP'] = prioritized_id
# After all this, we need to separate protein chain grounding
# and so if we are dealing with one of those, we pop out the UP
# key, split the ID to get the chain ID and add that in the UPPRO
# namespace.
if '#' in db_refs['UP']:
up_id = db_refs.pop('UP', None)
db_refs['UPPRO'] = up_id.split('#')[1]
db_refs = standardize_db_refs(db_refs)
return db_refs
def _get_mods_and_muts_from_mod_terms(self, mod_terms):
mods = []
muts = []
if mod_terms is not None:
for m in mod_terms:
if m['type'].lower() == 'mutation':
# Evidence is usualy something like "V600E"
# We could parse this to get the amino acid
# change that happened.
mutation_str = m.get('evidence')
# TODO: sometimes mutation_str is "mutant", "Mutant",
# "mutants" - this indicates that there is a mutation
# but not the specific type. We should encode this
# somehow as a "blank" mutation condition
mut = self._parse_mutation(mutation_str)
if mut is not None:
mcs = self._get_mod_conditions(m)
return mods, muts
def _get_mod_conditions(self, mod_term):
"""Return a list of ModConditions given a mod term dict."""
site = mod_term.get('site')
if site is not None:
mods = self._parse_site_text(site)
mods = [Site(None, None)]
mcs = []
for mod in mods:
mod_res, mod_pos = mod
mod_type_str = mod_term['type'].lower()
if mod_type_str == 'unknown':
mod_state = agent_mod_map.get(mod_type_str)
# We skip unknown modifications since they are very often
# unrelated to PTMs, representing things like "expression"
# These are real PTM states
if mod_state is not None:
mc = ModCondition(mod_state[0], residue=mod_res,
position=mod_pos, is_modified=mod_state[1])
logger.warning('Unhandled entity modification type: %s'
% mod_type_str)
return mcs
def _get_entity_coordinates(self, entity_term, sentence_coords=True):
"""Return sentence coordinates for a given entity.
Given an entity term return the associated sentence coordinates as
a tuple of the form (int, int). Returns None if for any reason the
sentence coordinates cannot be found.
entity_term : dict
A dictionary representing an entity term.
sentence_coords : Optional[bool]
If True, return the sentence coordinates of the entity. If False,
the global coordinates are returned. Default: True
# Get the entity coordinate in the entire text and subtract the
# coordinate of the first character in the associated sentence to
# get the sentence coordinate of the entity. Return None if entity
# coordinates are missing
entity_start = entity_term.get('start-pos')
entity_stop = entity_term.get('end-pos')
if entity_start is None or entity_stop is None:
return None
entity_start = entity_start.get('offset')
entity_stop = entity_stop.get('offset')
if entity_start is None or entity_stop is None:
return None
# At this point we can return the global coordinates if we aren't
# interested in sentence-specific coordinates
if not sentence_coords:
return entity_start, entity_stop
# The following lines get the starting coordinate of the sentence
# containing the entity.
sent_id = entity_term.get('sentence')
if sent_id is None:
return None
qstr = "$.sentences.frames[(@.frame_id is \'%s')]" % sent_id
res = self.tree.execute(qstr)
if res is None:
return None
sentence = next(res)
except StopIteration:
return None
sent_start = sentence.get('start-pos')
if sent_start is None:
return None
sent_start = sent_start.get('offset')
if sent_start is None:
return None
return (entity_start - sent_start, entity_stop - sent_start)
def _get_annot_context(self, frame_term):
annotations = {'found_by': frame_term['found_by'],
'agents': {}}
context_id = frame_term['context']
except KeyError:
return annotations, None
# For backwards compatibility with older versions
# of REACH
if isinstance(context_id, dict):
context_term = context_id
species = context_term.get('Species')
cell_type = context_term.get('CellType')
cell_line = None
location = None
tissue = None
organ = None
qstr = "$.entities.frames[(@.frame_id is \'%s\')]" % context_id[0]
res = self.tree.execute(qstr)
if res is None:
return annotations, None
context_frame = next(res)
facets = context_frame['facets']
cell_line = facets.get('cell-line')
cell_type = facets.get('cell-type')
species = facets.get('organism')
location = facets.get('location')
tissue = facets.get('tissue_type')
organ = facets.get('organ')
def get_ref_context(lst):
if not lst:
return None
db_name, db_id = lst[0].split(':', 1)
db_name = db_name.upper()
# Here we are dealing with UniProt subcellular components
# so we use a different namespace for those
if db_name == 'UNIPROT':
db_name = 'UPLOC'
# These aren't real groundings
elif db_name == 'UAZ':
return None
standard_name, db_refs = \
standardize_name_db_refs({db_name: db_id})
return RefContext(standard_name, db_refs=db_refs)
context = BioContext()
# Example: ['taxonomy:9606']
context.species = get_ref_context(species)
# Example: ['cl:CL:0000148']
context.cell_type = get_ref_context(cell_type)
# Example: ['cellosaurus:CVCL_0504']
context.cell_line = get_ref_context(cell_line)
# Example: ['go:GO:0005886']
context.location = get_ref_context(location)
# Example: ['uberon:UBERON:0000105']
context.organ = get_ref_context(organ)
# NOTE: we can't handle tissue currently
# context['tissue'] = tissue
# This is so we don't add a blank BioContext as context and rather
# just add None
if not context:
context = None
return annotations, context
def _get_epistemics(self, event):
epistemics = {}
# Check whether information is negative
neg = event.get('is_negated')
if neg is True:
epistemics['negated'] = True
# Check if it is a hypothesis
hyp = event.get('is_hypothesis')
if hyp is True:
epistemics['hypothesis'] = True
# Check if it is direct
if 'is_direct' in event:
direct = event['is_direct']
epistemics['direct'] = direct
# Get the section of the paper it comes from
sentence_id = event.get('sentence')
sections = []
if sentence_id:
qstr = "$.sentences.frames[(@.frame_id is \'%s\')]" % sentence_id
res = self.tree.execute(qstr)
if res:
sentence_frame = list(res)[0]
sections = sentence_frame.get('sections', [])
epistemics['raw_sections'] = sections
for section in sections:
norm_section = normalize_section(section)
if norm_section:
epistemics['section_type'] = norm_section
return epistemics
def _get_controller_agent(self, arg):
"""Return a single or a complex controller agent."""
controller_agent = None
controller = arg.get('arg')
# There is either a single controller here
if controller is not None:
controller_agent, coords = self._get_agent_from_entity(controller)
# Or the controller is a complex
elif arg['argument-type'] == 'complex':
controllers = list(arg.get('args').values())
controller_agent, coords = \
bound_agents = [self._get_agent_from_entity(c)[0]
for c in controllers[1:]]
bound_conditions = [BoundCondition(ba, True) for
ba in bound_agents]
controller_agent.bound_conditions = bound_conditions
return controller_agent, coords
def _get_arg_type(arg):
"""Return the type of the argument with backwards compatibility."""
if arg.get('argument_label') is not None:
return arg.get('argument_label')
return arg.get('type')
def _parse_mutation(s):
m = re.match(r'([A-Z]+)([0-9]+)([A-Z]+)', s.upper())
if m is not None:
parts = [str(g) for g in m.groups()]
residue_from = get_valid_residue(parts[0])
except Exception as e:
return None
residue_to = get_valid_residue(parts[2])
except Exception as e:
return None
position = parts[1]
mut = MutCondition(position, residue_from, residue_to)
return mut
elif s.lower() in ('mutation', 'mutations', 'mutant', 'mutants',
mut = MutCondition(None, None, None)
return mut
logger.warning('Unhandled mutation string: %s' % s)
return None
def _parse_site_text(s):
has_comma = ',' in s
has_slash = '/' in s
has_both = has_comma and has_slash
if has_both:
logger.error(s + ' is not a valid site text string')
return []
if has_comma:
texts = s.split(',')
texts = s.split('/')
sites = [parse_amino_acid_string(t) for t in texts]
# If the first site has a residue, and the remaining sites do not
# explicitly give a residue (example: Tyr-577/576), then apply the
# first site's residue to all sites in the site text.
only_first_site_has_residue = sites[0].residue is not None
for i in range(1, len(sites)):
if sites[i].residue is not None:
only_first_site_has_residue = False
if only_first_site_has_residue:
for i in range(1, len(sites)):
sites[i] = Site(sites[0].residue, sites[i].position)
return sites
_section_list = ['title', 'abstract', 'introduction', 'background',
'results', 'methods', 'discussion', 'conclusion',
'supplementary', 'figure']
def normalize_section(section):
# Strip off any spaces, new lines
section = section.strip()
# Next, we need to deal with a common pattern of section names like
# "3. results"
section = re.sub(r'^\d+[.]?[ ]?', '', section)
# Often, section title ends with a . like "discussion."
if section.endswith('.'):
section = section[:-1]
# If the section is in the standard list, return as is
if section in _section_list:
return section
# Next, handle a few special cases that come up in practice
elif section.startswith('fig'):
return 'figure'
elif section.startswith('supm'):
return 'supplementary'
elif section == 'article-title':
return 'title'
elif section in {'subjects|methods', 'methods|subjects', 'star*methods'}:
return 'methods'
elif section == 'conclusions':
return 'conclusion'
elif section == 'intro':
return 'introduction'
return None
def parse_amino_acid_string(s):
s = s.strip()
for p in (_site_pattern1, _site_pattern2, _site_pattern3):
m = re.match(p, s.upper())
if m is not None:
residue = get_valid_residue(m.groups()[0])
site = m.groups()[1]
return Site(residue, site)
m = re.match(_site_pattern4, s.upper())
if m is not None:
site = m.groups()[0]
residue = m.groups()[1]
return Site(residue, site)
for p in (_site_pattern5, _site_pattern6, _site_pattern7):
m = re.match(p, s.upper())
if m is not None:
residue = get_valid_residue(m.groups()[0])
site = None
return Site(residue, site)
m = re.match(_site_pattern8, s.upper())
if m is not None:
site = m.groups()[0]
residue = None
return Site(residue, site)
logger.warning('Could not parse site text %s' % s)
return Site(None, None)
_site_pattern1 = '([' + ''.join(list(amino_acids.keys())) + '])[-]?([0-9]+)$'
_site_pattern2 = '(' + '|'.join([v['short_name'].upper() for
v in amino_acids.values()]) + \
')[- ]?([0-9]+)$'
_site_pattern3 = '(' + '|'.join([v['indra_name'].upper() for
v in amino_acids.values()]) + \
_site_pattern4 = '([0-9]+)[ ]?([' + ''.join(list(amino_acids.keys())) + '])$'
_site_pattern5 = '^([' + ''.join(list(amino_acids.keys())) + '])$'
_site_pattern6 = '^(' + '|'.join([v['short_name'].upper() for
v in amino_acids.values()]) + ')$'
_site_pattern7 = '.*(' + '|'.join([v['indra_name'].upper() for
v in amino_acids.values()]) + ').*'
_site_pattern8 = '([0-9]+)$'
# Subtypes that exist but we don't handle: hydrolysis
agent_mod_map = {
'phosphorylation': ('phosphorylation', True),
'phosphorylated': ('phosphorylation', True),
'dephosphorylation': ('phosphorylation', False),
'acetylation': ('acetylation', True),
'deacetylation': ('acetylation', False),
'ubiquitination': ('ubiquitination', True),
'deubiquitination': ('ubiquitination', False),
'hydroxylation': ('hydroxylation', True),
'dehydroxylation': ('hydroxylation', False),
'sumoylation': ('sumoylation', True),
'desumoylation': ('sumoylation', False),
'glycosylation': ('glycosylation', True),
'deglycosylation': ('glycosylation', False),
'farnesylation': ('farnesylation', True),
'defarnesylation': ('farnesylation', False),
'ribosylation': ('ribosylation', True),
'deribosylation': ('ribosylation', False),
'methylation': ('methylation', True),
'demethylation': ('methylation', False),
def _read_famplex_map():
fname = os.path.join(os.path.dirname(os.path.abspath(__file__)),
famplex_map = {}
csv_rows = read_unicode_csv(fname, delimiter='\t')
for row in csv_rows:
source_ns = row[0]
source_id = row[1]
be_id = row[2]
famplex_map[(source_ns, source_id)] = be_id
return famplex_map
famplex_map = _read_famplex_map()
def _read_reach_rule_regexps():
"""Load in a file with the regular expressions corresponding to each
reach rule. Why regular expression matching?
The rule name in found_by has instances of some reach rules for each
possible event type
(activation, binding, etc). This makes for too many different types of
rules for practical curation of examples.
We use regular expressions to only match the rule used for extraction,
independently of what the event is.
reach_rule_filename = \
with open(reach_rule_filename, 'r') as f:
reach_rule_regexp = []
for line in f:
return reach_rule_regexp
reach_rule_regexps = _read_reach_rule_regexps()
[docs]def determine_reach_subtype(event_name):
"""Returns the category of reach rule from the reach rule instance.
Looks at a list of regular
expressions corresponding to reach rule types, and returns the longest
regexp that matches, or None if none of them match.
evidence : indra.statements.Evidence
A reach evidence object to subtype
best_match : str
A regular expression corresponding to the reach rule that was used to
extract this evidence
best_match_length = None
best_match = None
for ss in reach_rule_regexps:
if, event_name):
if best_match is None or len(ss) > best_match_length:
best_match = ss
best_match_length = len(ss)
return best_match
[docs]def prioritize_organism_grounding(first_id, xrefs, organism_priority):
"""Pick a prioritized organism-specific UniProt ID for a protein."""
# We find the organism for the first ID picked by Reach
first_organism = uniprot_client.get_organism_id(first_id.split('#')[0])
# We take only UniProt groundings from the xrefs list
uniprot_ids = [xr[1] for xr in xrefs if xr[0] == 'uniprot']
# We group UniProt IDs by their organism ID
groundings_by_organism = defaultdict(list)
for up_id in uniprot_ids:
organism_id = uniprot_client.get_organism_id(up_id.split('#')[0])
# We then go down the list of prioritized organisms and if we find a match
# we return immediately
for organism in organism_priority:
# If the organism matches the organism of the first xref match given by
# Reach then we return that ID
if organism == first_organism:
return first_id
# Otherwiser, if we have a group of matches for the current organism,
# we sort the list of matches and return the first one (so that the
# result is deterministic, though arbitrary).
if organism in groundings_by_organism:
return sorted(groundings_by_organism[organism])[0]
# If there is no prioritized organism-specific match, we return None
# and let the upstream code handle what to do.
return None