import re
import copy
import logging
import itertools
import collections
from functools import lru_cache
import pybiopax.biopax as bp
from pybiopax import model_to_owl_file
from indra.statements import *
from indra.util import flatten
from indra.statements.validate import print_validation_report, \
assert_valid_db_refs, validate_id
from indra.ontology.standardize import standardize_name_db_refs
from indra.databases import hgnc_client, uniprot_client, chebi_client, \
parse_identifiers_url
logger = logging.getLogger(__name__)
# TODO:
# - Extract cellularLocation from each PhysicalEntity
# - Extract and represent FragmentFeatures
# - Extract and represent BindingFeatures
# - direct is not always appropriate, e.g., for CTD as a source
# - Look at participantStoichiometry within BiochemicalReaction
# - Check whether to use Control or only Catalysis (Control might not
# be direct)
# - Implement extracting modifications with Complex enzyme
# - Implement extracting modifications with Complex substrate
[docs]class BiopaxProcessor(object):
"""The BiopaxProcessor extracts INDRA Statements from a BioPAX model.
The BiopaxProcessor uses pattern searches in a BioPAX OWL model to
extract mechanisms from which it constructs INDRA Statements.
Parameters
----------
model : org.biopax.paxtools.model.Model
A BioPAX model object (java object)
Attributes
----------
model : org.biopax.paxtools.model.Model
A BioPAX model object (java object) which is queried using Paxtools
to extract INDRA Statements
statements : list[indra.statements.Statement]
A list of INDRA Statements that were extracted from the model.
"""
def __init__(self, model, use_conversion_level_evidence=False):
self.model = model
self.statements = []
self._mod_conditions = {}
self._activity_conditions = {}
self._agents = {}
self.use_conversion_level_evidence = use_conversion_level_evidence
def process_all(self):
self._extract_features()
self.get_modifications()
self.get_regulate_activities()
self.get_activity_modification()
self.get_regulate_amounts()
self.get_conversions()
self.get_gap_gef()
self.eliminate_exact_duplicates()
print_validation_report(self.statements)
[docs] def save_model(self, file_name):
"""Save the BioPAX model object in an OWL file.
Parameters
----------
file_name : str
The name of the OWL file to save the model in.
"""
model_to_owl_file(self.model, file_name)
[docs] def eliminate_exact_duplicates(self):
"""Eliminate Statements that were extracted multiple times.
Due to the way the patterns are implemented, they can sometimes yield
the same Statement information multiple times, in which case,
we end up with redundant Statements that aren't from independent
underlying entries. To avoid this, here, we filter out such
duplicates.
"""
# Here we use the deep hash of each Statement, and by making a dict,
# we effectively keep only one Statement with a given deep hash
self.statements = list({stmt.get_hash(shallow=False, refresh=True): stmt
for stmt in self.statements}.values())
[docs] def feature_delta(self, from_pe: bp.PhysicalEntity,
to_pe: bp.PhysicalEntity):
"""Return gained and lost modifications and any activity change."""
# First deal with activity changes
from_acts = {self._activity_conditions[f.uid] for f in from_pe.feature
if f.uid in self._activity_conditions}
to_acts = {self._activity_conditions[f.uid] for f in to_pe.feature
if f.uid in self._activity_conditions}
assert len(from_acts) <= 1
assert len(to_acts) <= 1
activity_change = None
if from_acts == {'active'}:
if not to_acts or to_acts == {'inactive'}:
activity_change = 'inactive'
elif not from_acts or from_acts == {'inactive'}:
if to_acts == {'active'}:
activity_change = 'active'
# Now look at modification changes
from_mods = {self._mod_conditions[f.uid] for f in from_pe.feature
if f.uid in self._mod_conditions}
from_mods = {mc.matches_key(): mc for mc in from_mods}
to_mods = {self._mod_conditions[f.uid] for f in to_pe.feature
if f.uid in self._mod_conditions}
to_mods = {mc.matches_key(): mc for mc in to_mods}
gained_mods = {to_mods[k] for k in
set(to_mods.keys()) - set(from_mods.keys())}
lost_mods = {from_mods[k] for k in
set(from_mods.keys()) - set(to_mods.keys())}
return gained_mods, lost_mods, activity_change
def _extract_features(self):
"""Pre-extract features before processing statements."""
for feature in self.model.get_objects_by_type(bp.EntityFeature):
# TODO: handle BindingFeatures and FragmentFeatures here
if not isinstance(feature, bp.ModificationFeature):
continue
mf_type = get_modification_type(feature)
if mf_type == 'modification':
mod = self.mod_condition_from_mod_feature(feature)
if mod:
self._mod_conditions[feature.uid] = mod
elif mf_type == 'activity':
self._activity_conditions[feature.uid] = 'active'
elif mf_type == 'inactivity':
self._activity_conditions[feature.uid] = 'inactive'
[docs] @staticmethod
def find_matching_left_right(conversion: bp.Conversion):
"""Find matching entities on the left and right of a conversion."""
left_simple = []
for pe in conversion.left:
left_simple += expand_complex(pe)
right_simple = []
for pe in conversion.right:
right_simple += expand_complex(pe)
return BiopaxProcessor.find_matching_entities(left_simple, right_simple)
[docs] @staticmethod
def find_matching_entities(left_entities, right_entities):
"""Find matching entities between two lists of entities."""
matches = []
for inp, outp in itertools.product(left_entities, right_entities):
# If these are the exact same object instances, then they are
# trivially matching. This covers 'complex' and 'complex_named'
# physical entity types as well.
if inp.uid == outp.uid:
matches.append((inp, outp))
continue
# Otherwise we need to check what types of entities we are dealing
# with and proceed depending on the type
inp_type = infer_pe_type(inp)
outp_type = infer_pe_type(outp)
if inp_type != outp_type:
continue
# Here we don't go deeper - if the complex is not handled by the
# uid-based match above, we don't consider it a match.
elif inp_type in {'complex', 'complex_named'}:
continue
elif inp_type == 'family':
input_ers = {mpe.entity_reference.uid
for mpe in expand_family(inp)
if isinstance(mpe, bp.SimplePhysicalEntity)}
output_ers = {mpe.entity_reference.uid
for mpe in expand_family(outp)
if isinstance(mpe, bp.SimplePhysicalEntity)}
if input_ers == output_ers:
matches.append((inp, outp))
# Sometimes we get a "raw" PhysicalEntity here that doesn't
# actually have an entity reference which is required here so
# we skip those.
elif not isinstance(inp, (bp.SimplePhysicalEntity, bp.Complex)) or \
not isinstance(outp, (bp.SimplePhysicalEntity, bp.Complex)):
continue
elif inp_type == 'complex_family':
inp_members = flatten([expand_complex(m)
for m in expand_family(inp)])
outp_members = flatten([expand_complex(m)
for m in expand_family(outp)])
matches += BiopaxProcessor.find_matching_entities(inp_members,
outp_members)
elif inp.entity_reference.uid == outp.entity_reference.uid:
matches.append((inp, outp))
return matches
def _control_conversion_iter(self, conversion_type, controller_logic):
"""An iterator over controlled conversions in the model."""
for control in self.model.get_objects_by_type(bp.Control):
conversion = control.controlled
# Sometimes there is nothing being controlled, we skip
# those cases. We also want to skip things like Modulation
# that don't convert anything.
if not isinstance(conversion, conversion_type):
continue
ev = self._get_evidence(control)
for controller_pe in control.controller:
# We skip e.g., Pathway controllers
if not isinstance(controller_pe, bp.PhysicalEntity):
continue
if controller_logic == 'primary':
primary_controller = \
self._get_primary_controller(controller_pe)
else:
primary_controller = \
self._get_all_protein_controllers(controller_pe)
if not primary_controller:
continue
primary_controller_agents = []
for pc in _listify(primary_controller):
primary_controller_agents += \
_listify(self._get_agents_from_entity(pc))
for primary_controller_agent in primary_controller_agents:
yield primary_controller_agent, ev, control, conversion
def _conversion_no_control_iter(self):
"""An iterator over conversions irrespective of control in the model."""
for conversion in self.model.get_objects_by_type(bp.Conversion):
ev = self._get_evidence(conversion)
for inp, outp in self.find_matching_left_right(conversion):
for inp_simple, outp_simple in \
self.find_matching_entities(expand_family(inp),
expand_family(outp)):
gained_mods, lost_mods, activity_change = \
self.feature_delta(inp_simple, outp_simple)
inp_agents = \
self._get_agents_from_singular_entity(inp_simple)
for inp_agent in inp_agents:
yield inp_agent, gained_mods, lost_mods, \
activity_change, ev
def _conversion_state_iter(self):
"""An iterator over state changed in controlled conversions
in the model."""
for primary_controller_agent, ev, control, conversion in \
self._control_conversion_iter(bp.Conversion, 'primary'):
for inp, outp in self.find_matching_left_right(conversion):
# There is sometimes activity change at the family level
# which we need to capture
_, _, overall_activity_change = self.feature_delta(inp, outp)
for inp_simple, outp_simple in \
self.find_matching_entities(expand_family(inp),
expand_family(outp)):
gained_mods, lost_mods, activity_change = \
self.feature_delta(inp_simple, outp_simple)
activity_change = activity_change if activity_change else \
overall_activity_change
inp_agents = \
self._get_agents_from_singular_entity(inp_simple)
for inp_agent in inp_agents:
yield primary_controller_agent, inp_agent, \
gained_mods, lost_mods, activity_change, ev
[docs] def get_modifications(self):
"""Extract INDRA Modification Statements from the BioPAX model."""
for enz, sub, gained_mods, lost_mods, \
activity_change, ev in self._conversion_state_iter():
for mods, is_gain in ((gained_mods, True), (lost_mods, False)):
for mod in mods:
# We skip generic / unspecified modifications
if mod.mod_type == 'modification':
continue
stmt_class = modtype_to_modclass[mod.mod_type]
if not is_gain:
stmt_class = modclass_to_inverse[stmt_class]
stmt = stmt_class(enz, sub, mod.residue,
mod.position, evidence=ev)
stmt = _remove_redundant_mods(stmt)
self.statements.append(stmt)
[docs] def get_regulate_activities(self):
"""Get Activation/Inhibition INDRA Statements from the BioPAX model."""
for subj, obj, gained_mods, lost_mods, \
activity_change, ev in self._conversion_state_iter():
# We don't want to have gained or lost modification features
if not activity_change or (gained_mods or lost_mods):
continue
stmt_class = Activation if activity_change == 'active' \
else Inhibition
stmt = stmt_class(subj, obj, 'activity',
evidence=ev)
self.statements.append(stmt)
[docs] def get_activity_modification(self):
"""Extract INDRA ActiveForm statements from the BioPAX model."""
for agent, gained_mods, lost_mods, activity_change, ev in \
self._conversion_no_control_iter():
# We have to have both a modification change and an activity
# change
if not (gained_mods or lost_mods) or not activity_change:
continue
is_active = (activity_change == 'active')
# NOTE: with the ActiveForm representation we cannot
# separate static_mods and gained_mods. We assume here
# that the static_mods are inconsequential and therefore
# are not mentioned as an Agent condition, following
# don't care don't write semantics. Therefore only the
# gained_mods are listed in the ActiveForm as Agent
# conditions.
assert isinstance(agent, Agent)
if gained_mods:
ag = copy.deepcopy(agent)
ag.mods = gained_mods
stmt = ActiveForm(ag, 'activity', is_active, evidence=ev)
self.statements.append(stmt)
if lost_mods:
ag = copy.deepcopy(agent)
ag.mods = lost_mods
stmt = ActiveForm(ag, 'activity', not is_active, evidence=ev)
self.statements.append(stmt)
[docs] def get_regulate_amounts(self):
"""Extract INDRA RegulateAmount Statements from the BioPAX model."""
for subj, ev, control, conversion in \
self._control_conversion_iter(bp.TemplateReaction, 'all'):
stmt_type = IncreaseAmount if control.control_type == 'ACTIVATION' \
else DecreaseAmount
for product in conversion.product:
product_agents = self._get_agents_from_entity(product)
for obj in _listify(product_agents):
stmt = stmt_type(subj, obj, evidence=ev)
self.statements.append(stmt)
[docs] def get_conversions(self):
"""Extract Conversion INDRA Statements from the BioPAX model."""
for subj, ev, control, conversion in \
self._control_conversion_iter(bp.Conversion, 'primary'):
# We only extract conversions for small molecules
if not all(_is_small_molecule(pe)
for pe in (conversion.left + conversion.right)):
continue
# Since we don't extract location, this produces conversions where
# the input and output is the same
if isinstance(conversion, bp.Transport):
continue
# Assemble from and to object lists
obj_from = []
obj_to = []
for participants, obj_list in ((conversion.left, obj_from),
(conversion.right, obj_to)):
for participant in participants:
agent = self._get_agents_from_entity(participant)
if isinstance(agent, list):
obj_list += agent
else:
obj_list.append(agent)
# Make statements
if not obj_from and not obj_to:
continue
st = Conversion(subj, obj_from, obj_to, evidence=ev)
self.statements.append(st)
@staticmethod
def find_gdp_gtp_complex(cplxes):
for cplx in cplxes:
members = expand_complex(cplx)
if not members or len(members) != 2:
continue
gdp_gtp_idx = None
ras_member = None
for idx, member in enumerate(members):
if _is_small_molecule(member) and \
member.display_name in {'GDP', 'GTP'}:
gdp_gtp_idx = idx
elif _is_protein(member):
ras_member = member
if gdp_gtp_idx is None or ras_member is None:
continue
return ras_member, members[gdp_gtp_idx].display_name
return None, None
[docs] def get_gap_gef(self):
"""Extract Gap and Gef INDRA Statements."""
for gap_gef, ev, control, conversion in \
self._control_conversion_iter(bp.Conversion, 'primary'):
assert isinstance(gap_gef, Agent)
# We have to make sure that we don't pick up chemicals here
if not set(gap_gef.db_refs.keys()) & {'HGNC', 'UP'}:
continue
left_complexes = [bpe for bpe in conversion.left
if _is_complex(bpe)]
right_complexes = [bpe for bpe in conversion.right
if _is_complex(bpe)]
left_ras, left_gtp_gdp = \
self.find_gdp_gtp_complex(left_complexes)
right_ras, right_gtp_gdp = \
self.find_gdp_gtp_complex(right_complexes)
if left_gtp_gdp == 'GDP' and right_gtp_gdp == 'GTP':
stmt_type = Gef
elif left_gtp_gdp == 'GTP' and right_gtp_gdp == 'GDP':
stmt_type = Gap
else:
continue
ras_agents = self._get_agents_from_entity(left_ras)
for ras in _listify(ras_agents):
st = stmt_type(gap_gef, ras, evidence=ev)
self.statements.append(st)
@staticmethod
def _get_entity_mods(bpe):
"""Get all the modifications of an entity in INDRA format"""
if _is_entity(bpe):
features = bpe.feature
else:
features = bpe.entity_feature
mods = []
for feature in features:
if not _is_modification(feature):
continue
mc = BiopaxProcessor.mod_condition_from_mod_feature(feature)
if mc is not None:
mods.append(mc)
return mods
def _get_primary_controller(self, controller_pe):
# If it's not a real complex, just return the physical entity
# as is
if infer_pe_type(controller_pe) != 'complex':
return controller_pe
# Identifying the "real" enzyme in a complex may not always be
# possible.
# One heuristic here could be to find the member which is
# active and if it is the only active member then
# set this as the enzyme to which all other members of the
# complex are bound.
# Separate out protein and non-protein members
protein_members = [p for p in expand_complex(controller_pe)
if _is_protein(p)]
# If there is only one protein member, we can assume that
# it is the enzyme, and everything else is just bound
# to it.
if len(protein_members) == 1:
return protein_members[0]
return None
def _get_all_protein_controllers(self, controller_pe):
# If it's not a real complex, just return the physical entity
# as is
if infer_pe_type(controller_pe) != 'complex':
return controller_pe
protein_members = [p for p in controller_pe.component
if _is_protein(p)]
return protein_members
def _get_agents_from_singular_entity(self, bpe: bp.PhysicalEntity):
"""This is for extracting one or more Agents from a PhysicalEntity
which doesn't have member_physical_entities."""
try:
return copy.deepcopy(self._agents[bpe.uid])
except KeyError:
pass
mcs = BiopaxProcessor._get_entity_mods(bpe) if _is_protein(bpe) else []
name = bpe.display_name
agents = []
# We first get processed xrefs
xrefs = BiopaxProcessor._get_processed_xrefs(bpe)
# We now need to harmonize UP and HGNC
nhgnc_ids = len(xrefs.get('HGNC', {}))
nup_ids = len(xrefs.get('UP', {}))
# One protein coded by many genes, including the special case where
# there aren't any UniProt IDs given
if nhgnc_ids > 1 and nup_ids <= 1:
for hgnc_id in xrefs['HGNC']:
agent = get_standard_agent(name, {'HGNC': hgnc_id}, mods=mcs)
agents.append(agent)
# One gene coding for many proteins, including the case where
# a HGNC ID is missing and we only have UniProt IDs
elif nhgnc_ids <= 1 and nup_ids > 1:
for up_id in xrefs['UP']:
agent = get_standard_agent(name, _refs_from_up_id(up_id),
mods=mcs)
agents.append(agent)
# This is secretly a family, i.e., we have more than one
# gene/protein IDs and so we can go by one of the ID sets and
# standardize from there. Usually the number of HGNC and UniProt IDs
# are the same but there are exceptions where there are e.g., 3
# UniProt IDs but only 2 of them have corresponding HGNC IDs.
elif nhgnc_ids > 1 and nup_ids > 1:
for up_id in xrefs['UP']:
agent = get_standard_agent(name, _refs_from_up_id(up_id),
mods=mcs)
agents.append(agent)
# Otherwise it's just a regular Agent
else:
agent = get_standard_agent(name, clean_up_xrefs(xrefs), mods=mcs)
agents.append(agent)
# There is a potential here that an Agent name was set to None
# if both the display name and the standard name are missing.
# We filter these out
agents = [a for a in agents if a.name is not None]
return agents
def _get_agents_from_entity(self, bpe: bp.PhysicalEntity):
# If the entity has members (like a protein family),
# we iterate over them
if bpe.member_physical_entity:
agents = []
for m in bpe.member_physical_entity:
member_agents = self._get_agents_from_entity(m)
agents += member_agents
return agents
# If it is a single entity, we get its name and database
# references
return self._get_agents_from_singular_entity(bpe)
[docs] @staticmethod
def mod_condition_from_mod_feature(mf: bp.ModificationFeature):
"""Extract the type of modification and the position from
a ModificationFeature object in the INDRA format."""
# ModificationFeature / SequenceModificationVocabulary
mf_type = mf.modification_type
if mf_type is None:
return None
for t in mf_type.term:
if t.startswith('MOD_RES '):
t = t[8:]
mf_type_indra = _mftype_dict.get(t)
if mf_type_indra is not None:
break
else:
logger.debug('Skipping modification with unknown terms: %s' %
', '.join(mf_type.term))
return None
mod_type, residue = mf_type_indra
if mf.feature_location is not None:
# If it is not a SequenceSite we can't handle it
if not isinstance(mf.feature_location, bp.SequenceSite):
mod_pos = None
else:
mf_pos_status = mf.feature_location.position_status
if mf_pos_status is None:
mod_pos = None
elif mf_pos_status and mf_pos_status != 'EQUAL':
logger.debug('Modification site position is %s' %
mf_pos_status)
mod_pos = None
else:
mod_pos = str(mf.feature_location.sequence_position)
else:
mod_pos = None
mc = ModCondition(mod_type, residue, mod_pos, True)
return mc
def _get_evidence(self, bpe: bp.PhysicalEntity):
citations = BiopaxProcessor._get_citations(bpe)
if self.use_conversion_level_evidence and hasattr(bpe, 'controller'):
citations += BiopaxProcessor._get_citations(bpe.controlled)
if not citations:
citations = [None]
epi = {'direct': True}
annotations = {}
if bpe.data_source:
if len(bpe.data_source) > 1:
logger.warning('More than one data source for %s' % bpe.uid)
db_name = bpe.data_source[0].display_name
if db_name:
annotations['source_sub_id'] = db_name.lower()
source_id = '%s%s' % (self.model.xml_base, bpe.uid) if \
not bpe.uid.startswith('http') else bpe.uid
ev = [Evidence(source_api='biopax', pmid=cit,
source_id=source_id, epistemics=epi,
annotations=annotations)
for cit in citations]
return ev
@staticmethod
def _get_citations(bpe: bp.PhysicalEntity):
refs = []
xrefs = bpe.xref + flatten([ev.xref for ev in bpe.evidence])
for xr in xrefs:
db_name = xr.db
if db_name is not None and db_name.upper() == 'PUBMED':
refs.append(xr.id)
# TODO: handle non-pubmed evidence
return refs
@staticmethod
def _get_processed_xrefs(bpe: bp.PhysicalEntity):
entref = BiopaxProcessor._get_entref(bpe)
if not entref:
return {}
primary_ns, primary_id = \
BiopaxProcessor._get_reference_primary_id(entref)
from collections import defaultdict
xrefs = defaultdict(set)
if primary_ns and primary_id:
xrefs[primary_ns].add(primary_id)
# In the case of CHEBI, we return directly, no further processing
# of xrefs since xrefs tend to contain other related CHEBI IDs
# representing other chemicals
if primary_ns == 'CHEBI':
return xrefs
for xref in entref.xref:
if not xref.db:
continue
xref_db_ns = xref_ns_map.get(xref.db.lower())
if not xref_db_ns:
continue
# This is the "see also" relation which points to related but
# not exact xrefs that we can skip here
if isinstance(xref, bp.RelationshipXref) and \
xref.relationship_type in psi_mi_see_also:
continue
xrefs[xref_db_ns].add(xref.id)
xrefs = dict(xrefs)
# We now sanitize certain key name spaces
if 'UP' in xrefs:
up_ids = sanitize_up_ids(xrefs['UP'])
if up_ids:
xrefs['UP'] = up_ids
else:
xrefs.pop('UP')
if 'HGNC' in xrefs or 'HGNC.SYMBOL' in xrefs:
hgnc_ids = xrefs.get('HGNC', set()) | \
xrefs.get('HGNC.SYMBOL', set())
hgnc_ids = sanitize_hgnc_ids(hgnc_ids)
if hgnc_ids:
xrefs['HGNC'] = hgnc_ids
else:
xrefs.pop('HGNC', None)
xrefs.pop('HGNC.SYMBOL', None)
if 'CHEBI' in xrefs:
chebi_id = sanitize_chebi_ids(xrefs['CHEBI'], bpe.display_name)
if chebi_id:
xrefs['CHEBI'] = chebi_id
else:
xrefs.pop('CHEBI')
return xrefs
@staticmethod
def _get_reference_primary_id(entref: bp.EntityReference):
# This is a simple check to see if we should treat this as a URL
if not entref.uid.startswith('http'):
return None, None
primary_ns, primary_id = parse_identifiers_url(entref.uid)
return primary_ns, primary_id
@staticmethod
def _get_entref(bpe: bp.PhysicalEntity):
"""Returns the entity reference of an entity if it exists or
return the entity reference that was passed in as argument."""
if isinstance(bpe, bp.SimplePhysicalEntity):
return bpe.entity_reference
return None
def get_coverage(self):
uids = set()
for uid, obj in self.model.objects.items():
if isinstance(obj, (bp.Catalysis, bp.TemplateReactionRegulation)):
uids.add(uid)
stmt_uids = set()
for stmt in self.statements:
for ev in stmt.evidence:
stmt_uids.add(ev.source_id)
print('Total in model: %d' % len(uids))
print('Total covered: %d' % len(uids & stmt_uids))
print('%.2f%% coverage' % (100.0*len(uids & stmt_uids)/len(uids)))
return len(uids), len(uids & stmt_uids)
_mftype_dict = {
'phosres': ('phosphorylation', None),
'phosphorylation': ('phosphorylation', None),
'phosphorylated residue': ('phosphorylation', None),
'phosphorylated': ('phosphorylation', None),
'O-phospho-L-serine': ('phosphorylation', 'S'),
'O-phosphopantetheine-L-serine': ('phosphorylation', 'S'),
'opser': ('phosphorylation', 'S'),
'O-phospho-L-threonine': ('phosphorylation', 'T'),
'opthr': ('phosphorylation', 'T'),
'O-phospho-L-tyrosine': ('phosphorylation', 'Y'),
'O4\'-phospho-L-tyrosine': ('phosphorylation', 'Y'),
'optyr': ('phosphorylation', 'Y'),
'ubiquitinated lysine': ('ubiquitination', 'K'),
'N4-glycosyl-L-asparagine': ('glycosylation', 'N'),
'n4glycoasn': ('glycosylation', 'N'),
'O-glycosyl-L-threonine': ('glycosylation', 'T'),
'S-palmitoyl-L-cysteine': ('palmitoylation', 'C'),
'N6-acetyllysine': ('acetylation', 'K'),
'N6-acetyl-L-lysine' : ('acetylation', 'K'),
'n6aclys': ('acetylation', 'K'),
'naclys': ('acetylation', 'K'),
'N-acetylated L-lysine': ('acetylation', 'K'),
'N-acetylglycine': ('acetylation', 'G'),
'N-acetylmethionine': ('acetylation', 'M'),
'Hydroxyproline': ('hydroxylation', 'P'),
'hydroxylated proline': ('hydroxylation', 'P'),
'3-hydroxyproline': ('hydroxylation', 'P'),
'4-hydroxyproline': ('hydroxylation', 'P'),
'5-hydroxylysine': ('hydroxylation', 'K'),
'N-myristoylglycine': ('myristoylation', 'G'),
'N-myristoyl-glycine': ('myristoylation', 'G'),
'sumoylated lysine': ('sumoylation', 'K'),
'mearg': ('methylation', 'R'),
'methylated L-arginine': ('methylation', 'R'),
'methylated arginine': ('methylation', 'R'),
'melys' : ('methylation', 'K'),
'methylated lysine' : ('methylation', 'K'),
'methylated L-lysine' : ('methylation', 'K'),
'ubiquitination': ('ubiquitination', None),
'ubiquitinylated lysine': ('ubiquitination', 'K'),
'ubiquitination signature tetrapeptidyl lysine': ('ubiquitination', 'K'),
'Phosphoserine': ('phosphorylation', 'S'),
'Phosphothreonine': ('phosphorylation', 'T'),
'Phosphotyrosine': ('phosphorylation', 'Y'),
'N-acetylalanine': ('acetylation', 'A'),
'N-acetylserine': ('acetylation', 'S'),
'N-acetylthreonine': ('acetylation', 'T'),
'N-acetylvaline': ('acetylation', 'V'),
'Omega-N-methylarginine': ('methylation', 'R'),
'N6-methyllysine': ('methylation', 'K'),
'Dimethylated arginine': ('methylation', 'R'),
'Asymmetric dimethylarginine': ('methylation', 'R'),
'Omega-N-methylated arginine': ('methylation', 'R'),
'N6,N6-dimethyllysine': ('methylation', 'K'),
'N6,N6,N6-trimethyllysine': ('methylation', 'K'),
'Symmetric dimethylarginine': ('methylation', 'R'),
'ADP-ribosylarginine': ('ribosylation', 'R'),
'ADP-ribosylcysteine': ('ribosylation', 'C'),
'ADP-ribosylasparagine': ('ribosylation', 'N'),
'PolyADP-ribosyl glutamic acid': ('ribosylation', 'E'),
'O-acetylserine': ('acetylation', 'S'),
'O-acetyl-L-serine': ('acetylation', 'S'),
'N-acetyl-L-alanine': ('acetylation', 'A'),
'omega-N-methyl-L-arginine': ('methylation', 'R'),
'symmetric dimethyl-L-arginine': ('methylation', 'R'),
'N-acetylproline': ('acetylation', 'P'),
'acetylated': ('acetylation', None),
'acetylation': ('acetylation', None),
'(3R)-3-hydroxyaspartate': ('hydroxylation', 'D'),
'(3R)-3-hydroxyasparagine': ('hydroxylation', 'N'),
'Tele-methylhistidine': ('methylation', 'H'),
'N-acetylglutamate': ('acetylation', 'E'),
'N-acetylaspartate': ('acetylation', 'D'),
'n6me2lys': ('methylation', 'K'),
'Phosphohistidine': ('phosphorylation', 'H'),
'S-farnesyl-L-cysteine': ('farnesylation', 'C'),
'modified glycine residue': ('modification', 'G'),
'N-acetyl-L-methionine': ('acetylation', 'M'),
'4-hydroxy-L-proline': ('hydroxylation', 'P'),
'4hypro': ('hydroxylation', 'P'),
'3-hydroxy-L-proline': ('hydroxylation', 'P'),
'5-hydroxy-L-lysine': ('hydroxylation', 'K'),
'O-palmitoyl-L-serine': ('palmitoylation', 'S')
}
def _is_complex(pe):
"""Return True if the physical entity is a complex"""
return isinstance(pe, bp.Complex)
def _is_protein(pe):
"""Return True if the element is a protein"""
return isinstance(pe, (bp.Protein, bp.ProteinReference))
def _is_small_molecule(pe):
"""Return True if the element is a small molecule"""
return isinstance(pe, (bp.SmallMolecule, bp.SmallMoleculeReference))
def _is_modification(feature):
return isinstance(feature, bp.ModificationFeature) and \
get_modification_type(feature) == 'modification'
def get_modification_type(mf: bp.ModificationFeature):
if mf.modification_type is None:
return None
mf_type_terms = set(mf.modification_type.term)
if not mf_type_terms:
return 'modification'
if mf_type_terms & inactivity_terms:
return 'inactivity'
elif mf_type_terms & activity_terms:
return 'activity'
else:
return 'modification'
def _is_entity(bpe):
"""Return True if the element is a physical entity."""
return isinstance(bpe, bp.Entity)
def _listify(lst):
if not isinstance(lst, list):
return [lst]
else:
return lst
def _remove_redundant_mods(stmt):
"""Remove redundant Agent states that are modified by statement."""
# A custom matches function is used so we also match conditions
# where the polarities are opposite
def matches(mc1, mc2):
return mc1.mod_type == mc2.mod_type and \
mc1.residue == mc2.residue and \
mc1.position == mc2.position
stmt_mc = stmt._get_mod_condition()
stmt.sub = copy.deepcopy(stmt.sub)
stmt.sub.mods = [mc for mc in stmt.sub.mods if not matches(mc, stmt_mc)]
return stmt
def expand_complex(pe: bp.PhysicalEntity):
simple_pes = []
if _is_complex(pe) and pe.component:
for component_pe in pe.component:
simple_pes += expand_complex(component_pe)
else:
simple_pes.append(pe)
return simple_pes
def expand_family(pe: bp.PhysicalEntity):
members = []
if pe.member_physical_entity:
for member in pe.member_physical_entity:
members += expand_family(member)
else:
members = [pe]
return members
def infer_pe_type(pe: bp.PhysicalEntity):
if isinstance(pe, bp.Complex):
if pe.component:
return 'complex'
elif pe.member_physical_entity:
return 'complex_family'
else:
return 'complex_named'
else:
if pe.member_physical_entity:
return 'family'
else:
return 'single'
generic_chebi_ids = {
'CHEBI:76971', # E-coli metabolite
'CHEBI:75771', # mouse metabolite
'CHEBI:77746', # human metabolite
'CHEBI:27027', # micronutrient
'CHEBI:78675', # fundamental metabolite
'CHEBI:50860', # organic molecular entity
}
manual_chebi_map = {
'H2O': 'CHEBI:15377',
'phosphate': 'CHEBI:18367',
'H+': 'CHEBI:15378',
'O2': 'CHEBI:15379'
}
@lru_cache(maxsize=5000)
def get_specific_chebi_id(chebi_ids, name):
# NOTE: this function is mainly factored out to be able to use cacheing, it
# requires a frozenset as input to work.
# First, if we have a manual override, we just do that
manual_id = manual_chebi_map.get(name)
if manual_id:
return manual_id
# The first thing we do is eliminate the secondary IDs by mapping them to
# primaries
primary_ids = {chebi_client.get_primary_id(cid)
for cid in chebi_ids}
# Occasinally, invalid ChEBI IDs are given that don't have corresponding
# primary IDs, which we can filter out
primary_ids = {pi for pi in primary_ids if pi is not None}
# We then get rid of generic IDs which are never useful for grounding
non_generic_ids = primary_ids - generic_chebi_ids
# We then try name-based grounding to see if any of the names in the list
# match the name of the entity well enough
grounding_names = [chebi_client.get_chebi_name_from_id(p) for p in
non_generic_ids]
for grounding_name, grounding_id in zip(grounding_names, non_generic_ids):
if grounding_name and (name.lower() == grounding_name.lower()):
return grounding_id
# If we still have no best grounding, we try to distill the IDs down to
# the most specific one based on the hierarchy
specific_chebi_id = chebi_client.get_specific_id(non_generic_ids)
return specific_chebi_id
def sanitize_up_ids(up_ids):
# First, we map any secondary IDs to primary IDs
up_ids = {uniprot_client.get_primary_id(up_id)
for up_id in up_ids}
# We filter out IDs that are actually mnemonics, these are just mixed
# in without any differentiation from other IDs
up_ids = {up_id for up_id in up_ids if '_' not in up_id}
# TODO: should we do anything about isoforms?
# We separate out specific sets of IDs
human_ids = [up_id for up_id in up_ids
if uniprot_client.is_human(up_id)]
reviewed_non_human_ids = [
up_id for up_id in up_ids
if not uniprot_client.is_human(up_id)
# get_mnemonic is just a quick way to see if we have this entry
and uniprot_client.get_mnemonic(up_id, web_fallback=False)]
if human_ids:
return human_ids
elif reviewed_non_human_ids:
return reviewed_non_human_ids
else:
return []
def sanitize_chebi_ids(chebi_ids, name):
chebi_ids = {chebi_id if chebi_id.startswith('CHEBI:')
else 'CHEBI:%s' % chebi_id for chebi_id in chebi_ids}
chebi_ids = {chebi_client.get_primary_id(chebi_id)
for chebi_id in chebi_ids}
# Make sure we eliminate Nones here which can appear as a result
# of failed primary ID lookups
chebi_ids = {ci for ci in chebi_ids if ci is not None}
if not chebi_ids:
return []
elif len(chebi_ids) == 1:
return list(chebi_ids)
specific_chebi_id = get_specific_chebi_id(frozenset(chebi_ids), name)
return specific_chebi_id
def sanitize_hgnc_ids(raw_hgnc_ids):
# First we get a list of primary IDs
hgnc_ids = set()
for raw_hgnc_id in raw_hgnc_ids:
# Check if it's an ID first
m1 = re.match('([0-9]+)', raw_hgnc_id)
m2 = re.match('hgnc:([0-9]+)', raw_hgnc_id.lower())
if m1:
hgnc_id = str(m1.groups()[0])
hgnc_ids.add(hgnc_id)
elif m2:
hgnc_id = str(m2.groups()[0])
hgnc_ids.add(hgnc_id)
# If not, we assume it's a symbol
else:
hgnc_id = hgnc_client.get_current_hgnc_id(raw_hgnc_id)
if isinstance(hgnc_id, list):
hgnc_ids |= set(hgnc_id)
elif hgnc_id:
hgnc_ids.add(hgnc_id)
return list(hgnc_ids)
def _refs_from_up_id(up_id):
"""Get the refs from the up_id"""
if '-' in up_id:
base_id = up_id.split('-')[0]
db_refs = {'UP': base_id, 'UPISO': up_id}
else:
db_refs = {'UP': up_id}
return db_refs
def get_standard_agent(name, db_refs, **kwargs):
standard_name, db_refs = standardize_name_db_refs(db_refs)
# We remove any invalid entries here, e.g.,
# "---" which comes up in Pathway Commons
for k, v in copy.copy(db_refs).items():
if not validate_id(k, v):
db_refs.pop(k)
if standard_name:
name = standard_name
assert_valid_db_refs(db_refs)
return Agent(name, db_refs=db_refs, **kwargs)
def clean_up_xrefs(xrefs):
db_refs = {}
for k, v in xrefs.items():
if isinstance(v, (list, set)):
if len(v) == 1:
db_refs[k] = list(v)[0]
else:
db_refs[k] = v
if k == 'UP':
db_refs.update(_refs_from_up_id(db_refs[k]))
return db_refs
activity_terms = {
'active',
'residue modification, active',
}
inactivity_terms = {
'inactive',
'residue modification, inactive',
}
xref_ns_map = {
'chebi': 'CHEBI',
'uniprot': 'UP',
'uniprot isoform': 'UP',
'uniprot knowledgebase': 'UP',
'uniprotkb': 'UP',
'ncbi gene': 'EGID',
'hgnc symbol': 'HGNC.SYMBOL',
'hgnc': 'HGNC',
'mesh': 'MESH',
'drugbank': 'DRUGBANK',
'pubchem-compound': 'PUBCHEM',
'chembl compound': 'CHEMBL',
'pubchem': 'PUBCHEM',
'cas': 'CAS',
'hmdb': 'HMDB',
'gene ontology': 'GO',
'interpro': 'IP',
'mirbase': 'MIRBASE',
'mirbase mature sequence': 'MIRBASEM',
'hugo gene nomenclature committee (hgnc)': 'HGNC',
'ensembl': 'ENSEMBL',
'taxonomy': 'TAXONOMY',
}
psi_mi_see_also = {
# This is an invalid URL but used in practice
'http://identifiers.org/psimi/MI:0361',
# This is a valid old style URL
'http://identifiers.org/mi/MI:0361',
# This is a valid new style URL
'http://identifiers.org/MI:0361',
}