from __future__ import unicode_literals
import copy
import logging
from indra.statements.validate import validate_text_refs
from indra.ontology.standardize import standardize_agent_name
from indra.statements import modtype_to_modclass, Agent, Evidence, Complex, \
get_statement_by_name as stmt_by_name, BoundCondition
logger = logging.getLogger(__name__)
ignore_srcs = [db.lower() for db in ['NetPath', 'SIGNOR', 'ProtMapper',
'BioGRID', 'HPRD-phos', 'phosphoELM']]
[docs]class OmniPathProcessor(object):
"""Class to process OmniPath JSON into INDRA Statements."""
def __init__(self, ptm_json=None, ligrec_json=None):
self.statements = []
self.ptm_json = ptm_json
self.ligrec_json = ligrec_json
[docs] def process_ptm_mods(self):
"""Process ptm json if present"""
if self.ptm_json:
self.statements += self._stmts_from_op_mods(self.ptm_json)
[docs] def process_ligrec_interactions(self):
"""Process ligand-receptor json if present"""
if self.ligrec_json:
self.statements += self._stmt_from_op_lr(self.ligrec_json)
def _stmts_from_op_mods(self, ptm_json):
"""Build Modification Statements from a list of Omnipath PTM entries
"""
ptm_stmts = []
unhandled_mod_types = []
annot_ignore = {'enzyme', 'substrate', 'residue_type',
'residue_offset', 'references', 'modification'}
if ptm_json is None:
return []
for mod_entry in ptm_json:
# Skip entries without references
if not mod_entry['references']:
continue
enz = self._agent_from_up_id(mod_entry['enzyme'])
sub = self._agent_from_up_id(mod_entry['substrate'])
res = mod_entry['residue_type']
pos = mod_entry['residue_offset']
evidence = []
for source_pmid in mod_entry['references']:
source_db, pmid_ref = source_pmid.split(':', 1)
# Skip evidence from already known sources
if source_db.lower() in ignore_srcs:
continue
if 'pmc' in pmid_ref.lower():
text_refs = {'PMCID': pmid_ref.split('/')[-1]}
pmid = None
elif not validate_text_refs({'PMID': pmid_ref}):
pmid = None
text_refs = None
else:
pmid = pmid_ref
text_refs = {'PMID': pmid}
evidence.append(Evidence(
source_api='omnipath',
source_id=source_db,
pmid=pmid,
text_refs=text_refs,
annotations={k: v for k, v in mod_entry.items() if k not
in annot_ignore}
))
mod_type = mod_entry['modification']
modclass = modtype_to_modclass.get(mod_type)
if modclass is None:
unhandled_mod_types.append(mod_type)
continue
else:
# All evidences filtered out
if not evidence:
continue
stmt = modclass(enz, sub, res, pos, evidence)
ptm_stmts.append(stmt)
return ptm_stmts
def _stmt_from_op_lr(self, ligrec_json):
"""Make ligand-receptor Complexes from Omnipath API interactions db"""
ligrec_stmts = []
ign_annot = {'source_sub_id', 'source', 'target', 'references'}
no_refs = 0
bad_pmid = 0
no_consensus = 0
if ligrec_json is None:
return ligrec_stmts
for lr_entry in ligrec_json:
if not lr_entry['references']:
no_refs += 1
continue
if len(lr_entry['sources']) == 1 and \
lr_entry['sources'][0].lower() in ignore_srcs:
continue
# Assemble evidence
evidence = []
for source_pmid in lr_entry['references']:
source_db, pmid = source_pmid.split(':')
# Skip evidence from already known sources
if source_db.lower() in ignore_srcs:
continue
if len(pmid) > 8:
bad_pmid += 1
continue
annot = {k: v for k, v in lr_entry.items() if k not in
ign_annot}
annot['source_sub_id'] = source_db
evidence.append(Evidence(source_api='omnipath', pmid=pmid,
annotations=annot))
# Get statements if we have evidences
if evidence:
# Get complexes
ligrec_stmts.append(self._get_op_complex(lr_entry['source'],
lr_entry['target'],
evidence))
# On consensus, make Activations or Inhibitions as well
if bool(lr_entry['consensus_stimulation']) ^ \
bool(lr_entry['consensus_inhibition']):
activation = True if lr_entry['consensus_stimulation'] else \
False
ligrec_stmts.append(self._get_ligrec_regs(
lr_entry['source'], lr_entry['target'],
# Make sure we decouple evidences from the above
copy.deepcopy(evidence),
activation=activation))
elif lr_entry['consensus_stimulation'] and \
lr_entry['consensus_inhibition']:
no_consensus += 1
# All evidences were filtered out
else:
no_refs += 1
if no_refs:
logger.warning(f'{no_refs} entries without references were '
f'skipped')
if bad_pmid:
logger.warning(f'{bad_pmid} references with bad pmids were '
f'skipped')
if no_consensus:
logger.warning(f'{no_consensus} entries with conflicting '
f'regulation were skipped')
return ligrec_stmts
@staticmethod
def _agent_from_up_id(up_id):
"""Build an Agent object from a Uniprot ID. Adds db_refs for both
Uniprot and HGNC where available."""
db_refs = {'UP': up_id}
ag = Agent(up_id, db_refs=db_refs)
standardize_agent_name(ag)
return ag
def _bc_agent_from_up_list(self, up_id_list):
# Return the first agent with the remaining agents as a bound condition
agents_list = [self._agent_from_up_id(up_id) for up_id in up_id_list]
agent = agents_list[0]
agent.bound_conditions = \
[BoundCondition(a, True) for a in agents_list[1:]]
return agent
def _complex_agents_from_op_complex(self, up_id_str):
"""Return a list of agents from a string containing multiple UP ids
"""
# Get agents
if 'complex' in up_id_str.lower():
up_id_list = [up for up in up_id_str.split(':')[1].split('_')]
else:
up_id_list = [up_id_str]
return [self._agent_from_up_id(up_id) for up_id in up_id_list]
def _get_op_complex(self, source, target, evidence_list):
ag_list = self._complex_agents_from_op_complex(source) + \
self._complex_agents_from_op_complex(target)
return Complex(members=ag_list,
evidence=evidence_list)
def _get_ligrec_regs(self, source, target, evidence_list, activation=True):
# Check if any of the agents is a complex
# Source
if 'complex' in source.lower():
# Make bound condition agent
up_id_list = [up for up in source.split(':')[1].split('_')]
subj = self._bc_agent_from_up_list(up_id_list)
else:
subj = self._agent_from_up_id(source)
# Target
if 'complex' in target.lower():
# Make bound condition agent
up_id_list = [up for up in target.split(':')[1].split('_')]
obj = self._bc_agent_from_up_list(up_id_list)
else:
obj = self._agent_from_up_id(target)
# Regular case:
Regulation = stmt_by_name('activation') if activation else \
stmt_by_name('inhibition')
regulation = Regulation(subj=subj, obj=obj, evidence=evidence_list)
return regulation