import re
import json
import time
import logging
import itertools
from ndex2.nice_cx_network import NiceCXNetwork
from collections import OrderedDict
from indra.statements import *
from indra.databases import context_client, ndex_client
from indra.databases.identifiers import get_identifiers_url, url_prefixes
logger = logging.getLogger(__name__)
[docs]class NiceCxAssembler(object):
"""Assembles a Nice CX network from a set of INDRA Statements.
Parameters
----------
stmts : Optional[list[indra.statements.Statement]]
A list of INDRA Statements to be assembled.
network_name : Optional[str]
The name of the network to be assembled. Default: indra_assembled
Attributes
----------
network : ndex2.nice_cx_network.NiceCXNetwork
A Nice CX network object that is assembled from Statements.
"""
def __init__(self, stmts=None, network_name=None):
self.statements = stmts if stmts else []
self.network = NiceCXNetwork()
self.network.set_network_attribute('name',
(network_name if network_name
else 'indra_assembled'))
self.node_keys = {}
self.context_prefixes = {
'pubmed': 'https://identifiers.org/pubmed:',
'hgnc.symbol': 'https://identifiers.org/hgnc.symbol:'
}
[docs] def make_model(self, self_loops=False, network_attributes=None):
"""Return a Nice CX network object after running assembly.
Parameters
----------
self_loops : Optional[bool]
If False, self-loops are excluded from the network. Default: False
network_attributes : Optional[dict]
A dictionary containing attributes to be added to the
assembled network.
Returns
-------
ndex2.nice_cx_network.NiceCXNetwork
The assembled Nice CX network.
"""
for stmt in self.statements:
agents = stmt.agent_list()
not_none_agents = [a for a in agents if a is not None]
if len(not_none_agents) < 2:
continue
for a1, a2 in itertools.combinations(not_none_agents, 2):
a1_id = self.add_node(a1)
a2_id = self.add_node(a2)
if not self_loops and a1_id == a2_id:
continue
edge_id = self.add_edge(a1_id, a2_id, stmt)
self.network.set_network_attribute('@context',
json.dumps(self.context_prefixes))
if network_attributes:
for k, v in network_attributes.items():
self.network.set_network_attribute(k, v, 'string')
return self.network
[docs] def add_node(self, agent):
"""Add an Agent to the network as a node."""
agent_key = self.get_agent_key(agent)
# If the node already exists
if agent_key in self.node_keys:
return self.node_keys[agent_key]
# If the node doesn't exist yet
db_ns, db_id = agent.get_grounding()
# TODO: handle more represents name spaces
if db_ns == 'HGNC':
represents = 'hgnc.symbol:%s' % agent.name
else:
represents = None
node_id = self.network.create_node(agent.name,
node_represents=represents)
self.node_keys[agent_key] = node_id
# Add db_refs as aliases
db_refs_list = ['%s:%s' % (db_name, db_id)
for db_name, db_id in agent.db_refs.items()
if db_name in url_prefixes]
if db_refs_list:
self.network.add_node_attribute(property_of=node_id,
name='aliases',
values=db_refs_list,
type='list_of_string')
# Add the type of the node, inferred from grounding
if db_ns:
mapped_type = db_ns_type_mappings.get(db_ns)
if mapped_type:
self.network.add_node_attribute(property_of=node_id,
name='type',
values=mapped_type,
type='string')
return node_id
[docs] def add_edge(self, a1_id, a2_id, stmt):
"""Add a Statement to the network as an edge."""
stmt_type = stmt.__class__.__name__
edge_id = self.network.create_edge(a1_id, a2_id, stmt_type)
evs = []
for ev in stmt.evidence:
# We skip evidences with no PMID
if not ev.pmid:
continue
# We take a maximum 200 character snippet of the evidence text
if not ev.text:
ev_txt = 'Evidence text not available.'
elif len(ev.text) > 200:
ev_txt = ev.text[:200] + '...'
else:
ev_txt = ev.text
# Construct a clickable PMID link with the source and evidence text
ev_str = ('<a target="_blank" '
'href="https://identifiers.org/pubmed:%s">'
'pubmed:%s</a> (%s) %s') % (ev.pmid, ev.pmid,
ev.source_api, ev_txt)
evs.append((ev_str, 0 if ev.text is None else 1))
# Reorder to have ones with text first
evs = sorted(evs, key=lambda x: x[1], reverse=True)
# Cap at 10 pieces of evidence
evs = [e[0] for e in evs[:10]]
self.network.set_edge_attribute(edge_id, 'citation', evs,
type='list_of_string')
return edge_id
[docs] def print_model(self):
"""Return the CX string of the assembled model."""
return self.network.to_cx()
@staticmethod
def get_agent_key(agent):
return agent.name
db_ns_type_mappings = {'HGNC': 'gene',
'UP': 'protein',
'FPLX': 'proteinfamily',
'CHEBI': 'chemical',
'GO': 'biological_process'}
[docs]class CxAssembler(object):
"""This class assembles a CX network from a set of INDRA Statements.
The CX format is an aspect oriented data mode for networks.
The format is defined at http://www.home.ndexbio.org/data-model/.
The CX format is the standard for NDEx and is compatible with
CytoScape via the CyNDEx plugin.
Parameters
----------
stmts : Optional[list[indra.statements.Statement]]
A list of INDRA Statements to be assembled.
network_name : Optional[str]
The name of the network to be assembled. Default: indra_assembled
Attributes
----------
statements : list[indra.statements.Statement]
A list of INDRA Statements to be assembled.
network_name : str
The name of the network to be assembled.
cx : dict
The structure of the CX network that is assembled.
"""
def __init__(self, stmts=None, network_name=None):
if stmts is None:
self.statements = []
else:
self.statements = stmts
if network_name is None:
self.network_name = 'indra_assembled'
else:
self.network_name = network_name
self.cx = {'nodes': [], 'edges': [],
'nodeAttributes': [], 'edgeAttributes': [],
'citations': [], 'edgeCitations': [],
'supports': [], 'edgeSupports': [],
'networkAttributes': []}
self._existing_nodes = {}
self._existing_edges = {}
self._id_counter = 0
[docs] def add_statements(self, stmts):
"""Add INDRA Statements to the assembler's list of statements.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of :py:class:`indra.statements.Statement`
to be added to the statement list of the assembler.
"""
for stmt in stmts:
self.statements.append(stmt)
[docs] def make_model(self, add_indra_json=True):
"""Assemble the CX network from the collected INDRA Statements.
This method assembles a CX network from the set of INDRA Statements.
The assembled network is set as the assembler's cx argument.
Parameters
----------
add_indra_json : Optional[bool]
If True, the INDRA Statement JSON annotation is added to each
edge in the network. Default: True
Returns
-------
cx_str : str
The json serialized CX model.
"""
self.add_indra_json = add_indra_json
for stmt in self.statements:
if isinstance(stmt, Modification):
self._add_modification(stmt)
if isinstance(stmt, SelfModification):
self._add_self_modification(stmt)
elif isinstance(stmt, RegulateActivity) or \
isinstance(stmt, RegulateAmount):
self._add_regulation(stmt)
elif isinstance(stmt, Complex):
self._add_complex(stmt)
elif isinstance(stmt, Gef):
self._add_gef(stmt)
elif isinstance(stmt, Gap):
self._add_gap(stmt)
elif isinstance(stmt, Influence):
self._add_influence(stmt)
network_description = ''
self.cx['networkAttributes'].append({'n': 'name',
'v': self.network_name})
self.cx['networkAttributes'].append({'n': 'description',
'v': network_description})
cx_str = self.print_cx()
return cx_str
[docs] def print_cx(self, pretty=True):
"""Return the assembled CX network as a json string.
Parameters
----------
pretty : bool
If True, the CX string is formatted with indentation (for human
viewing) otherwise no indentation is used.
Returns
-------
json_str : str
A json formatted string representation of the CX network.
"""
def _get_aspect_metadata(aspect):
count = len(self.cx.get(aspect)) if self.cx.get(aspect) else 0
if not count:
return None
data = {'name': aspect,
'idCounter': self._id_counter,
'consistencyGroup': 1,
'elementCount': count}
return data
full_cx = OrderedDict()
full_cx['numberVerification'] = [{'longNumber': 281474976710655}]
aspects = ['nodes', 'edges', 'supports', 'citations', 'edgeAttributes',
'edgeCitations', 'edgeSupports', 'networkAttributes',
'nodeAttributes', 'cartesianLayout']
full_cx['metaData'] = []
for aspect in aspects:
metadata = _get_aspect_metadata(aspect)
if metadata:
full_cx['metaData'].append(metadata)
for k, v in self.cx.items():
full_cx[k] = v
full_cx['status'] = [{'error': '', 'success': True}]
full_cx = [{k: v} for k, v in full_cx.items()]
if pretty:
json_str = json.dumps(full_cx, indent=2)
else:
json_str = json.dumps(full_cx)
return json_str
[docs] def save_model(self, file_name='model.cx'):
"""Save the assembled CX network in a file.
Parameters
----------
file_name : Optional[str]
The name of the file to save the CX network to. Default: model.cx
"""
with open(file_name, 'wt') as fh:
cx_str = self.print_cx()
fh.write(cx_str)
[docs] def upload_model(self, ndex_cred=None, private=True, style='default'):
"""Creates a new NDEx network of the assembled CX model.
To upload the assembled CX model to NDEx, you need to have
a registered account on NDEx (http://ndexbio.org/) and have
the `ndex` python package installed. The uploaded network
is private by default.
Parameters
----------
ndex_cred : Optional[dict]
A dictionary with the following entries:
'user': NDEx user name
'password': NDEx password
private : Optional[bool]
Whether or not the created network will be private on NDEX.
style : Optional[str]
This optional parameter can either be (1)
The UUID of an existing NDEx network whose style should be applied
to the new network. (2) Unspecified or 'default' to use
the default INDRA-assembled network style. (3) None to
not set a network style.
Returns
-------
network_id : str
The UUID of the NDEx network that was created by uploading
the assembled CX model.
"""
cx_str = self.print_cx()
if not ndex_cred:
username, password = ndex_client.get_default_ndex_cred({})
ndex_cred = {'user': username,
'password': password}
network_id = ndex_client.create_network(cx_str, ndex_cred, private)
if network_id and style:
template_id = None if style == 'default' else style
nretries = 3
for retry_idx in range(nretries):
time.sleep(3)
try:
ndex_client.set_style(network_id, ndex_cred, template_id)
break
except Exception:
msg = 'Style setting failed, '
if retry_idx + 1 < nretries:
logger.info(msg + 'retrying %d more times' %
(nretries - (retry_idx+1)))
else:
logger.info(msg + 'the network will be missing style '
'information.')
return network_id
[docs] def set_context(self, cell_type):
"""Set protein expression data and mutational status as node attribute
This method uses :py:mod:`indra.databases.context_client` to get
protein expression levels and mutational status for a given cell type
and set a node attribute for proteins accordingly.
Parameters
----------
cell_type : str
Cell type name for which expression levels are queried.
The cell type name follows the CCLE database conventions.
Example: LOXIMVI_SKIN, BT20_BREAST
"""
node_names = [node['n'] for node in self.cx['nodes']]
res_expr = context_client.get_protein_expression(node_names,
[cell_type])
res_mut = context_client.get_mutations(node_names,
[cell_type])
res_expr = res_expr.get(cell_type)
res_mut = res_mut.get(cell_type)
if not res_expr:
msg = 'Could not get protein expression for %s cell type.' % \
cell_type
logger.warning(msg)
if not res_mut:
msg = 'Could not get mutational status for %s cell type.' % \
cell_type
logger.warning(msg)
if not res_expr and not res_mut:
return
self.cx['networkAttributes'].append({'n': 'cellular_context',
'v': cell_type})
counter = 0
for node in self.cx['nodes']:
amount = res_expr.get(node['n'])
mut = res_mut.get(node['n'])
if amount is not None:
node_attribute = {'po': node['@id'],
'n': 'expression_amount',
'v': int(amount)}
self.cx['nodeAttributes'].append(node_attribute)
if mut is not None:
is_mutated = 1 if mut else 0
node_attribute = {'po': node['@id'],
'n': 'is_mutated',
'v': is_mutated}
self.cx['nodeAttributes'].append(node_attribute)
if mut is not None or amount is not None:
counter += 1
logger.info('Set context for %d nodes.' % counter)
def _get_new_id(self):
ret = self._id_counter
self._id_counter += 1
return ret
def _add_modification(self, stmt):
if stmt.enz is None:
return
enz_id = self._add_node(stmt.enz)
sub_id = self._add_node(stmt.sub)
stmt_type = stmt.__class__.__name__
self._add_edge(enz_id, sub_id, stmt_type, stmt)
def _add_self_modification(self, stmt):
enz_id = self._add_node(stmt.enz)
stmt_type = stmt.__class__.__name__
self._add_edge(enz_id, enz_id, stmt_type, stmt)
def _add_complex(self, stmt):
# Here we do some bookkeeping to handle the special case where
# a member appears twice in a complex e.g.
# Complex(CDK12(), RECQL4(), RECQL4(), Ku())
# and we don't want to have duplicate edges.
added_edges = set()
for m1, m2 in itertools.combinations(stmt.members, 2):
m1_id = self._add_node(m1)
m2_id = self._add_node(m2)
if (m1_id, m2_id) not in added_edges:
self._add_edge(m1_id, m2_id, 'Complex', stmt)
added_edges.add((m1_id, m2_id))
def _add_regulation(self, stmt):
if stmt.subj is None:
return
subj_id = self._add_node(stmt.subj)
obj_id = self._add_node(stmt.obj)
stmt_type = stmt.__class__.__name__
self._add_edge(subj_id, obj_id, stmt_type, stmt)
def _add_influence(self, stmt):
subj_id = self._add_node(stmt.subj.concept)
obj_id = self._add_node(stmt.obj.concept)
stmt_type = stmt.__class__.__name__
self._add_edge(subj_id, obj_id, stmt_type, stmt)
def _add_gef(self, stmt):
gef_id = self._add_node(stmt.gef)
ras_id = self._add_node(stmt.ras)
stmt_type = stmt.__class__.__name__
self._add_edge(gef_id, ras_id, stmt_type, stmt)
def _add_gap(self, stmt):
gap_id = self._add_node(stmt.gap)
ras_id = self._add_node(stmt.ras)
stmt_type = stmt.__class__.__name__
self._add_edge(gap_id, ras_id, stmt_type, stmt)
def _add_node(self, agent):
node_key = agent.name
node_id = self._existing_nodes.get(node_key)
if node_id is not None:
return node_id
node_id = self._get_new_id()
self._existing_nodes[node_key] = node_id
node = {'@id': node_id,
'n': agent.name}
self.cx['nodes'].append(node)
self._add_node_metadata(node_id, agent)
return node_id
def _add_node_metadata(self, node_id, agent):
agent_type = _get_agent_type(agent)
node_attribute = {'po': node_id,
'n': 'type',
'v': agent_type}
self.cx['nodeAttributes'].append(node_attribute)
for db_name, db_ids in agent.db_refs.items():
if not db_ids:
logger.warning('Missing db_id for %s' % agent)
continue
elif isinstance(db_ids, int):
db_id = str(db_ids)
elif isinstance(db_ids, list):
db_id = db_ids[0][0]
else:
db_id = db_ids
url = get_identifiers_url(db_name, db_id)
if not url:
continue
db_name_map = {
'UP': 'UniProt', 'PUBCHEM': 'PubChem',
'IP': 'InterPro', 'NXPFA': 'NextProtFamily',
'PF': 'Pfam', 'CHEBI': 'ChEBI'}
name = db_name_map.get(db_name)
if not name:
name = db_name
node_attribute = {'po': node_id,
'n': name,
'v': url}
self.cx['nodeAttributes'].append(node_attribute)
def _add_edge(self, source, target, interaction, stmt):
edge_key = (source, target, interaction)
try:
edge_id = self._existing_edges[edge_key]
return edge_id
except KeyError:
pass
edge_id = self._get_new_id()
self._existing_nodes[edge_key] = edge_id
edge = {'@id': edge_id,
's': source,
't': target,
'i': interaction.lower()}
self.cx['edges'].append(edge)
self._add_edge_metadata(edge_id, stmt)
return edge_id
def _add_edge_metadata(self, edge_id, stmt):
# Add the string of the statement itself
indra_stmt_str = '%s' % stmt
edge_attribute = {'po': edge_id,
'n': 'INDRA statement',
'v': indra_stmt_str}
self.cx['edgeAttributes'].append(edge_attribute)
# Add INDRA JSON
if self.add_indra_json:
indra_stmt_json = json.dumps(stmt.to_json())
edge_attribute = {'po': edge_id,
'n': '__INDRA json',
'v': indra_stmt_json}
self.cx['edgeAttributes'].append(edge_attribute)
# Add the type of statement as the edge type
stmt_type, stmt_polarity = _get_stmt_type(stmt)
edge_attribute = {'po': edge_id,
'n': 'type',
'v': stmt_type}
self.cx['edgeAttributes'].append(edge_attribute)
edge_attribute = {'po': edge_id,
'n': 'polarity',
'v': stmt_polarity}
self.cx['edgeAttributes'].append(edge_attribute)
# Add the citations for the edge
pmids = [e.pmid for e in stmt.evidence if e.pmid]
edge_citations = []
pmids_added = []
for pmid in pmids:
pmid_txt = None
if re.match('[0-9]+', pmid):
pmid_txt = 'pmid:' + pmid
if pmid_txt not in pmids_added:
citation_id = self._get_new_id()
citation = {'@id': citation_id,
'dc:identifier': pmid_txt}
self.cx['citations'].append(citation)
edge_citations.append(citation_id)
pmids_added.append(pmid_txt)
if edge_citations:
edge_citation = {'citations': edge_citations,
'po': [edge_id]}
self.cx['edgeCitations'].append(edge_citation)
# Add the textual supports for the edge
texts = [_fix_evidence_text(e.text) for e in stmt.evidence if e.text]
edge_supports = []
for text in texts:
support_id = self._get_new_id()
support = {'@id': support_id,
'text': text}
self.cx['supports'].append(support)
edge_supports.append(support_id)
if edge_supports:
edge_support = {'supports': edge_supports,
'po': [edge_id]}
self.cx['edgeSupports'].append(edge_support)
belief_str = '%.2f' % stmt.belief
edge_attribute = {'po': edge_id,
'n': 'belief',
'v': belief_str}
self.cx['edgeAttributes'].append(edge_attribute)
# NOTE: supports and edgeSupports are currently
# not shown on NDEx therefore we add text evidence as a generic
# edgeAttribute
if texts:
text = texts[0]
edge_attribute = {'po': edge_id,
'n': 'text',
'v': text}
self.cx['edgeAttributes'].append(edge_attribute)
# Add the serialized JSON INDRA Statement
stmt_dict = stmt.to_json()
edge_attribute = {'po': edge_id, 'n': 'indra', 'v': stmt_dict}
self.cx['edgeAttributes'].append(edge_attribute)
# Add support type
support_type = _get_support_type(stmt)
edge_attribute = {'po': edge_id, 'n': 'supportType', 'v': support_type}
self.cx['edgeAttributes'].append(edge_attribute)
def _get_support_type(stmt):
dbs = ['bel', 'biopax', 'phosphosite', 'biogrid', 'signor', 'tas', 'hprd',
'trrust', 'ctd', 'virhostnet', 'phosphoelm', 'drugbank', 'omnipath']
readers = ['reach', 'trips', 'sparser', 'r3', 'eidos', 'geneways', 'tees',
'rlimsp', 'medscan']
has_db = False
has_reading = False
for ev in stmt.evidence:
if ev.source_api in dbs:
has_db = True
if ev.source_api in readers:
has_reading = True
if has_db and not has_reading:
return 'database'
elif has_db and has_db:
return 'database and literature'
elif not has_db and has_reading:
return 'literature'
def _get_stmt_type(stmt):
if isinstance(stmt, AddModification):
edge_type = 'Modification'
edge_polarity = 'positive'
elif isinstance(stmt, RemoveModification):
edge_type = 'Modification'
edge_polarity = 'negative'
elif isinstance(stmt, SelfModification):
edge_type = 'SelfModification'
edge_polarity = 'positive'
elif isinstance(stmt, Complex):
edge_type = 'Complex'
edge_polarity = 'none'
elif isinstance(stmt, Activation):
edge_type = 'Activation'
edge_polarity = 'positive'
elif isinstance(stmt, Inhibition):
edge_type = 'Inhibition'
edge_polarity = 'negative'
elif isinstance(stmt, DecreaseAmount):
edge_type = 'DecreaseAmount'
edge_polarity = 'negative'
elif isinstance(stmt, IncreaseAmount):
edge_type = 'IncreaseAmount'
edge_polarity = 'positive'
elif isinstance(stmt, Gef):
edge_type = 'Gef'
edge_polarity = 'positive'
elif isinstance(stmt, Gap):
edge_type = 'Gap'
edge_polarity = 'negative'
elif isinstance(stmt, Influence):
edge_type = 'Influence'
if stmt.overall_polarity() == -1:
edge_polarity = 'negative'
elif stmt.overall_polarity() == 1:
edge_polarity = 'positive'
else:
edge_polarity = 'none'
else:
edge_type = stmt.__class__.__str__()
edge_polarity = 'none'
return edge_type, edge_polarity
def _get_agent_type(agent):
hgnc_id = agent.db_refs.get('HGNC')
uniprot_id = agent.db_refs.get('UP')
pfam_id = agent.db_refs.get('PF')
fa_id = agent.db_refs.get('FA')
chebi_id = agent.db_refs.get('CHEBI')
pubchem_id = agent.db_refs.get('PUBCHEM')
be_id = agent.db_refs.get('FPLX')
go_id = agent.db_refs.get('GO')
mir_id = agent.db_refs.get('MIRBASEM') or agent.db_refs.get('MIRBASE')
if hgnc_id or uniprot_id:
agent_type = 'protein'
elif pfam_id or fa_id or be_id:
agent_type = 'proteinfamily'
elif chebi_id or pubchem_id:
agent_type = 'chemical'
elif go_id:
agent_type = 'bioprocess'
elif mir_id:
agent_type = 'microrna'
else:
agent_type = 'other'
return agent_type
def _fix_evidence_text(txt):
"""Eliminate some symbols to have cleaner supporting text."""
txt = re.sub('[ ]?\( xref \)', '', txt)
# This is to make [ xref ] become [] to match the two readers
txt = re.sub('\[ xref \]', '[]', txt)
txt = re.sub('[\(]?XREF_BIBR[\)]?[,]?', '', txt)
txt = re.sub('[\(]?XREF_FIG[\)]?[,]?', '', txt)
txt = re.sub('[\(]?XREF_SUPPLEMENT[\)]?[,]?', '', txt)
txt = txt.strip()
return txt