Source code for indra.assemblers.pybel.assembler

from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
import uuid
import logging
from typing import Mapping

import networkx as nx
from copy import deepcopy, copy
import pybel
import pybel.constants as pc
from pybel.dsl import *
# This is redundant but needed for documentation build
from pybel.dsl import Entity
from pybel.language import pmod_mappings, pmod_namespace, activity_mapping
try:  # this works after pybel pull request #453
    from pybel.language import citation_dict
except ImportError: # this works before pybel pull request #453
    from pybel.utils import citation_dict
from indra.statements import *
from indra.databases import hgnc_client

logger = logging.getLogger(__name__)


_indra_pybel_act_map: Mapping[str, Entity] = {
    'activity': activity_mapping['act'],
    'kinase': activity_mapping['kin'],
    'phosphatase': activity_mapping['phos'],
    'catalytic': activity_mapping['cat'],
    'gtpbound': activity_mapping['gtp'],
    'transcription': activity_mapping['tscript'],
    'gef': activity_mapping['gef'],
    'gap': activity_mapping['gap'],
}

_pybel_indra_act_map: Mapping[Entity, str] = {
    pybel_activity: indra_key
    for indra_key, pybel_activity in _indra_pybel_act_map.items()
}


[docs]class PybelAssembler(object): """Assembles a PyBEL graph from a set of INDRA Statements. PyBEL tools can subsequently be used to export the PyBEL graph into BEL script files, SIF files, and other related output formats. Parameters ---------- stmts : list[:py:class:`indra.statement.Statement`] The list of Statements to assemble. name : str Name of the assembled PyBEL network. description : str Description of the assembled PyBEL network. version : str Version of the assembled PyBEL network. authors : str Author(s) of the network. contact : str Contact information (email) of the responsible author. license : str License information for the network. copyright : str Copyright information for the network. disclaimer : str Any disclaimers for the network. annotations_to_include : Optional[list[str]] A list of evidence annotation keys that should be added to the assembled PyBEL graph. Default: None Examples -------- >>> from indra.statements import * >>> map2k1 = Agent('MAP2K1', db_refs={'HGNC': '6840'}) >>> mapk1 = Agent('MAPK1', db_refs={'HGNC': '6871'}) >>> stmt = Phosphorylation(map2k1, mapk1, 'T', '185') >>> pba = PybelAssembler([stmt]) >>> belgraph = pba.make_model() >>> sorted(node.as_bel() for node in belgraph) ['p(HGNC:6840 ! MAP2K1)', 'p(HGNC:6871 ! MAPK1)', 'p(HGNC:6871 ! MAPK1, pmod(go:0006468 ! "protein phosphorylation", Thr, 185))'] >>> len(belgraph) 3 >>> belgraph.number_of_edges() 2 """ def __init__(self, stmts=None, name=None, description=None, version=None, authors=None, contact=None, license=None, copyright=None, disclaimer=None, annotations_to_include=None): if stmts is None: self.statements = [] else: self.statements = stmts if name is None: name = 'indra' if version is None: version = str(uuid.uuid4()) # Create the model and assign metadata self.model = pybel.BELGraph( name=name, description=description, version=version, authors=authors, contact=contact, license=license, copyright=copyright, disclaimer=disclaimer, ) ns_dict = { 'HGNC': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/hgnc-human-genes/hgnc-human-genes-20170725.belns', 'UP': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/swissprot/swissprot-20170725.belns', 'IP': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/interpro/interpro-20170731.belns', 'FPLX': 'https://raw.githubusercontent.com/sorgerlab/famplex/' '5f5b573fe26d7405dbccb711ae8e5697b6a3ec7e/export/famplex.belns', #'PFAM': #'NXPFA': 'CHEBI': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/chebi-ids/chebi-ids-20170725.belns', 'GO': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/go/go-20180109.belns', 'MESH': 'https://arty.scai.fraunhofer.de/artifactory/bel/' 'namespace/mesh-processes/mesh-processes-20170725.belns' } self.model.namespace_url.update(ns_dict) self.model.namespace_pattern['PUBCHEM'] = '\d+' self.annotations_to_include = annotations_to_include def add_statements(self, stmts_to_add): self.statements += stmts_to_add def make_model(self): for stmt in self.statements: # Skip statements with no subject if stmt.agent_list()[0] is None and \ not isinstance(stmt, Conversion): continue # Assemble statements if isinstance(stmt, Modification): self._assemble_modification(stmt) elif isinstance(stmt, RegulateActivity): self._assemble_regulate_activity(stmt) elif isinstance(stmt, RegulateAmount): self._assemble_regulate_amount(stmt) elif isinstance(stmt, Gef): self._assemble_gef(stmt) elif isinstance(stmt, Gap): self._assemble_gap(stmt) elif isinstance(stmt, ActiveForm): self._assemble_active_form(stmt) elif isinstance(stmt, Complex): self._assemble_complex(stmt) elif isinstance(stmt, Conversion): self._assemble_conversion(stmt) elif isinstance(stmt, Autophosphorylation): self._assemble_autophosphorylation(stmt) elif isinstance(stmt, Transphosphorylation): self._assemble_transphosphorylation(stmt) else: logger.info('Unhandled statement: %s' % stmt) return self.model
[docs] def to_database(self, manager=None): """Send the model to the PyBEL database This function wraps :py:func:`pybel.to_database`. Parameters ---------- manager : Optional[pybel.manager.Manager] A PyBEL database manager. If none, first checks the PyBEL configuration for ``PYBEL_CONNECTION`` then checks the environment variable ``PYBEL_REMOTE_HOST``. Finally, defaults to using SQLite database in PyBEL data directory (automatically configured by PyBEL) Returns ------- network : Optional[pybel.manager.models.Network] The SQLAlchemy model representing the network that was uploaded. Returns None if upload fails. """ network = pybel.to_database(self.model, manager=manager) return network
[docs] def to_web(self, host=None, user=None, password=None): """Send the model to BEL Commons by wrapping :py:func:`pybel.to_web` The parameters ``host``, ``user``, and ``password`` all check the PyBEL configuration, which is located at ``~/.config/pybel/config.json`` by default Parameters ---------- host : Optional[str] The host name to use. If none, first checks the PyBEL configuration entry ``PYBEL_REMOTE_HOST``, then the environment variable ``PYBEL_REMOTE_HOST``. Finally, defaults to https://bel-commons.scai.fraunhofer.de. user : Optional[str] The username (email) to use. If none, first checks the PyBEL configuration entry ``PYBEL_REMOTE_USER``, then the environment variable ``PYBEL_REMOTE_USER``. password : Optional[str] The password to use. If none, first checks the PyBEL configuration entry ``PYBEL_REMOTE_PASSWORD``, then the environment variable ``PYBEL_REMOTE_PASSWORD``. Returns ------- response : requests.Response The response from the BEL Commons network upload endpoint. """ response = pybel.to_web(self.model, host=host, user=user, password=password) return response
[docs] def save_model(self, path, output_format=None): """Save the :class:`pybel.BELGraph` using one of the outputs from :py:mod:`pybel` Parameters ---------- path : str The path to output to output_format : Optional[str] Output format as ``cx``, ``pickle``, ``json`` or defaults to ``bel`` """ if output_format == 'pickle': pybel.to_pickle(self.model, path) else: with open(path, 'w') as fh: if output_format == 'json': pybel.to_nodelink_file(self.model, fh) elif output_format == 'cx': pybel.to_cx_file(self.model, fh) else: # output_format == 'bel': pybel.to_bel_script(self.model, fh)
def _add_nodes_edges(self, subj_agent, obj_agent, relation, stmt): """Given subj/obj agents, relation, and evidence, add nodes/edges.""" subj_data, subj_edge = _get_agent_node(subj_agent) obj_data, obj_edge = _get_agent_node(obj_agent) # If we failed to create nodes for subject or object, skip it if subj_data is None or obj_data is None: return self.model.add_node_from_data(subj_data) self.model.add_node_from_data(obj_data) edge_data_list = _combine_edge_data( relation=relation, subj_edge=subj_edge, obj_edge=obj_edge, stmt=stmt, annotations_to_include=self.annotations_to_include, ) for edge_data in edge_data_list: self.model.add_edge(subj_data, obj_data, **edge_data) def _assemble_regulate_activity(self, stmt): """Example: p(HGNC:MAP2K1) => act(p(HGNC:MAPK1))""" act_obj = deepcopy(stmt.obj) act_obj.activity = stmt._get_activity_condition() # We set is_active to True here since the polarity is encoded # in the edge (decreases/increases) act_obj.activity.is_active = True activates = isinstance(stmt, Activation) relation = get_causal_edge(stmt, activates) self._add_nodes_edges(stmt.subj, act_obj, relation, stmt) def _assemble_modification(self, stmt): """Example: p(HGNC:MAP2K1) => p(HGNC:MAPK1, pmod(Ph, Thr, 185))""" sub_agent = deepcopy(stmt.sub) sub_agent.mods.append(stmt._get_mod_condition()) activates = isinstance(stmt, AddModification) relation = get_causal_edge(stmt, activates) self._add_nodes_edges(stmt.enz, sub_agent, relation, stmt) def _assemble_regulate_amount(self, stmt): """Example: p(HGNC:ELK1) => p(HGNC:FOS)""" activates = isinstance(stmt, IncreaseAmount) relation = get_causal_edge(stmt, activates) self._add_nodes_edges(stmt.subj, stmt.obj, relation, stmt) def _assemble_gef(self, stmt): """Example: act(p(HGNC:SOS1), ma(gef)) => act(p(HGNC:KRAS), ma(gtp))""" gef = deepcopy(stmt.gef) gef.activity = ActivityCondition('gef', True) ras = deepcopy(stmt.ras) ras.activity = ActivityCondition('gtpbound', True) self._add_nodes_edges(gef, ras, pc.DIRECTLY_INCREASES, stmt) def _assemble_gap(self, stmt): """Example: act(p(HGNC:RASA1), ma(gap)) =| act(p(HGNC:KRAS), ma(gtp))""" gap = deepcopy(stmt.gap) gap.activity = ActivityCondition('gap', True) ras = deepcopy(stmt.ras) ras.activity = ActivityCondition('gtpbound', True) self._add_nodes_edges(gap, ras, pc.DIRECTLY_DECREASES, stmt) def _assemble_active_form(self, stmt): """Example: p(HGNC:ELK1, pmod(Ph)) => act(p(HGNC:ELK1), ma(tscript))""" act_agent = Agent(stmt.agent.name, db_refs=stmt.agent.db_refs) act_agent.activity = ActivityCondition(stmt.activity, True) activates = stmt.is_active relation = get_causal_edge(stmt, activates) if not stmt.agent.mods and not stmt.agent.bound_conditions and \ not stmt.agent.mutations: self._add_nodes_edges(stmt.agent, act_agent, relation, stmt) else: for mod in stmt.agent.mods: mod_agent = Agent( stmt.agent.name, db_refs=stmt.agent.db_refs, mods=[mod]) self._add_nodes_edges(mod_agent, act_agent, relation, stmt) for bc in stmt.agent.bound_conditions: bound_agent = Agent( stmt.agent.name, db_refs=stmt.agent.db_refs, bound_conditions=[bc]) self._add_nodes_edges(bound_agent, act_agent, relation, stmt) for mut in stmt.agent.mutations: mut_agent = Agent( stmt.agent.name, db_refs=stmt.agent.db_refs, mutations=[mut]) self._add_nodes_edges(mut_agent, act_agent, relation, stmt) def _assemble_complex(self, stmt): """Example: complex(p(HGNC:MAPK14), p(HGNC:TAB1))""" complex_data, _ = _get_complex_node(stmt.members) if complex_data is None: logger.info('skip adding complex with no members: %s', stmt.members) return self.model.add_node_from_data(complex_data) def _assemble_conversion(self, stmt): """Example: p(HGNC:HK1) => rxn(reactants(a(CHEBI:"CHEBI:17634")), products(a(CHEBI:"CHEBI:4170")))""" pybel_lists = ([], []) for pybel_list, agent_list in \ zip(pybel_lists, (stmt.obj_from, stmt.obj_to)): for agent in agent_list: node = _get_agent_grounding(agent) if node is not None: pybel_list.append(node) rxn_node_data = reaction( reactants=pybel_lists[0], products=pybel_lists[1], ) self.model.add_node_from_data(rxn_node_data) obj_edge = None # TODO: Any edge information possible here? # Add node for controller, if there is one if stmt.subj is not None: subj_attr, subj_edge = _get_agent_node(stmt.subj) self.model.add_node_from_data(subj_attr) edge_data_list = _combine_edge_data( relation=pc.DIRECTLY_INCREASES, subj_edge=subj_edge, obj_edge=obj_edge, stmt=stmt, annotations_to_include=self.annotations_to_include, ) for edge_data in edge_data_list: self.model.add_edge(subj_attr, rxn_node_data, **edge_data) def _assemble_autophosphorylation(self, stmt): """Example: complex(p(HGNC:MAPK14), p(HGNC:TAB1)) => p(HGNC:MAPK14, pmod(Ph, Tyr, 100))""" sub_agent = deepcopy(stmt.enz) mc = stmt._get_mod_condition() sub_agent.mods.append(mc) # FIXME Ignore any bound conditions on the substrate!!! # This is because if they are included, a complex node will be returned, # which (at least currently) won't incorporate any protein # modifications. sub_agent.bound_conditions = [] # FIXME self._add_nodes_edges(stmt.enz, sub_agent, pc.DIRECTLY_INCREASES, stmt) def _assemble_transphosphorylation(self, stmt): """Example: complex(p(HGNC:EGFR)) => p(HGNC:EGFR, pmod(Ph, Tyr, 1173))""" # Check our assumptions about the bound condition of the enzyme assert len(stmt.enz.bound_conditions) == 1 assert stmt.enz.bound_conditions[0].is_bound # Create a modified protein node for the bound target sub_agent = deepcopy(stmt.enz.bound_conditions[0].agent) sub_agent.mods.append(stmt._get_mod_condition()) self._add_nodes_edges(stmt.enz, sub_agent, pc.DIRECTLY_INCREASES, stmt) def _assemble_translocation(self, stmt): pass
def belgraph_to_signed_graph( belgraph, include_variants=True, symmetric_variant_links=False, include_components=True, symmetric_component_links=False, propagate_annotations=False): def get_ns(n): # For nodes containing several agents (complex abundance or reaction) # return namespace of the first member if isinstance(n, complex_abundance): return get_ns(n.members[0]) if isinstance(n, reaction): return get_ns(n.products[0]) return n.namespace graph = nx.MultiDiGraph() for n in belgraph.nodes: graph.add_node(n, ns=get_ns(n)) edge_set = set() for u, v, edge_data in belgraph.edges(data=True): rel = edge_data.get('relation') pos_edge = \ (u, v, ('sign', 0)) + \ tuple((key, tuple(entry)) if len(entry) > 1 else tuple( (key, *tuple(entry))) for key, entry in edge_data.get('annotations', {}).items()) \ if propagate_annotations else (u, v, ('sign', 0)) # Unpack tuple pairs at indices >1 or they'll be in nested tuples rev_pos_edge = (pos_edge[1], pos_edge[0], *pos_edge[2:]) if rel in pc.CAUSAL_INCREASE_RELATIONS: edge_set.add(pos_edge) elif rel in pc.HAS_VARIANT and include_variants: edge_set.add(pos_edge) if symmetric_variant_links: edge_set.add(rev_pos_edge) elif rel in pc.PART_OF and include_components: edge_set.add(pos_edge) if symmetric_component_links: edge_set.add(rev_pos_edge) elif rel in pc.CAUSAL_DECREASE_RELATIONS: # Unpack tuples edge_set.add((pos_edge[0], pos_edge[1], ('sign', 1), *pos_edge[3:])) else: continue graph.add_edges_from((t[0], t[1], dict(t[2:])) for t in edge_set) return graph def _combine_edge_data(relation, subj_edge, obj_edge, stmt, annotations_to_include=None): edge_data = { pc.RELATION: relation, pc.ANNOTATIONS: _get_annotations_from_stmt(stmt), } if subj_edge: edge_data[pc.SUBJECT] = subj_edge if obj_edge: edge_data[pc.OBJECT] = obj_edge if not stmt.evidence: return [edge_data] return [ _update_edge_data_from_evidence(evidence, edge_data, annotations_to_include=annotations_to_include) for evidence in stmt.evidence ] def _update_edge_data_from_evidence(evidence, edge_data, annotations_to_include=None): edge_data_one = copy(edge_data) citation, evidence, annotations = \ _get_evidence(evidence, annotations_to_include=annotations_to_include) edge_data_one.update({ pc.CITATION: citation, pc.EVIDENCE: evidence, }) edge_data_one[pc.ANNOTATIONS].update(annotations) return edge_data_one def _get_annotations_from_stmt(stmt): return { 'stmt_hash': {stmt.get_hash(refresh=True): True}, 'uuid': {stmt.uuid: True}, 'belief': {stmt.belief: True}, } def _get_agent_node(agent): if not agent.bound_conditions: return _get_agent_node_no_bcs(agent) # Check if bound conditions are bound to agent bound_conditions = [ bc.agent for bc in agent.bound_conditions if bc.is_bound] if not bound_conditions: return _get_agent_node_no_bcs(agent) # "Flatten" the bound conditions for the agent at this level agent_no_bc = deepcopy(agent) agent_no_bc.bound_conditions = [] members = [agent_no_bc] + bound_conditions return _get_complex_node(members) def _get_complex_node(members): members_list = [] for member in members: member_data, member_edge = _get_agent_node(member) if member_data: members_list.append(member_data) if members_list: complex_node_data = complex_abundance(members=members_list) return complex_node_data, None return None, None def _get_agent_node_no_bcs(agent): node_data = _get_agent_grounding(agent) if node_data is None: logger.warning('Agent %s has no grounding.', agent) return None, None variants = [] for mod in agent.mods: pybel_mod = pmod_namespace.get(mod.mod_type) if not pybel_mod: logger.info('Skipping modification of type %s on agent %s', mod.mod_type, agent) continue pmod_entity = pmod_mappings[pybel_mod]['xrefs'][0] var = ProteinModification( namespace=pmod_entity.namespace, name=pmod_entity.name, identifier=pmod_entity.identifier, ) if mod.residue is not None: res = amino_acids[mod.residue]['short_name'].capitalize() var[pc.PMOD_CODE] = res if mod.position is not None: var[pc.PMOD_POSITION] = int(mod.position) variants.append(var) for mut in agent.mutations: var = hgvs(mut.to_hgvs()) variants.append(var) if variants and not isinstance(node_data, CentralDogma): logger.warning('Node should not have variants: %s, %s', node_data, variants) elif variants: node_data = node_data.with_variants(variants) if isinstance(node_data, (bioprocess, pathology)): return node_data, None # Also get edge data for the agent edge_data = _get_agent_activity(agent) return node_data, edge_data def _get_agent_grounding(agent): """Convert an agent to the corresponding PyBEL DSL object (to be filled with variants later).""" def _get_id(_agent, key): _id = _agent.db_refs.get(key) if isinstance(_id, list): _id = _id[0] return _id hgnc_id = _get_id(agent, 'HGNC') if hgnc_id: hgnc_name = hgnc_client.get_hgnc_name(hgnc_id) if not hgnc_name: logger.warning('Agent %s with HGNC ID %s has no HGNC name.', agent, hgnc_id) return return protein('HGNC', name=hgnc_name, identifier=hgnc_id) uniprot_id = _get_id(agent, 'UP') if uniprot_id: return protein('UP', name=uniprot_id, identifier=uniprot_id) fplx_id = _get_id(agent, 'FPLX') if fplx_id: return protein('FPLX', name=fplx_id, identifier=fplx_id) pfam_id = _get_id(agent, 'PF') if pfam_id: return protein('PFAM', name=agent.name, identifier=pfam_id) ip_id = _get_id(agent, 'IP') if ip_id: return protein('IP', ip_id) fa_id = _get_id(agent, 'FA') if fa_id: return protein('NXPFA', fa_id) chebi_id = _get_id(agent, 'CHEBI') if chebi_id: if chebi_id.startswith('CHEBI:'): chebi_id = chebi_id[len('CHEBI:'):] return abundance('CHEBI', name=agent.name, identifier=chebi_id) pubchem_id = _get_id(agent, 'PUBCHEM') if pubchem_id: return abundance('PUBCHEM', name=pubchem_id, identifier=pubchem_id) go_id = _get_id(agent, 'GO') if go_id: return bioprocess('GO', name=agent.name, identifier=go_id) mesh_id = _get_id(agent, 'MESH') if mesh_id: return bioprocess('MESH', name=agent.name, identifier=mesh_id) return abundance('TEXT', name=agent.name) def _get_agent_activity(agent): ac = agent.activity if not ac: return None if not ac.is_active: logger.warning('Cannot represent negative activity in PyBEL: %s' % agent) if ac.activity_type == 'activity': return activity() return activity(**_indra_pybel_act_map[ac.activity_type]) def _get_evidence(evidence, annotations_to_include=None): text = evidence.text if evidence.text else 'No evidence text.' # If there is a PMID, use it as the citation if evidence.pmid: citation = citation_dict( namespace=pc.CITATION_TYPE_PUBMED, identifier=evidence.pmid, ) # If no PMID, include the interface and source_api for now-- # in general this should probably be in the annotations for all evidence else: cit_source = evidence.source_api or 'Unknown' cit_id = evidence.source_id or 'Unknown' cit_ref_str = '%s:%s' % (cit_source, cit_id) citation = citation_dict( namespace=pc.CITATION_TYPE_OTHER, identifier=cit_ref_str, ) annotations = { 'source_hash': {evidence.get_source_hash(): True}, } if evidence.source_api: annotations['source_api'] = {evidence.source_api: True} if evidence.source_id: annotations['source_id'] = {evidence.source_id: True} for key, value in evidence.epistemics.items(): if key == 'direct' or value is None: continue if isinstance(value, (list, set, tuple)): annotations[key] = {v: True for v in value} else: annotations[key] = {value: True} if evidence.context: context_annotations = [] attrs = ['location', 'cell_line', 'cell_type', 'organ', 'disease', 'species'] for attr_key in attrs: attr_val = evidence.context.__dict__.get(attr_key) if attr_val: for key, value in attr_val.db_refs.items(): context_annotations.append((attr_key, f'{key}:{value}')) # Add the annotations for key, value in context_annotations: annotations[key] = {value: True} if annotations_to_include and evidence.annotations: # Add the annotations for annotation_key in annotations_to_include: value = evidence.annotations.get(annotation_key) # Skip if not in annotations or value is None if not value: continue # For simple lists, sets, tuples, we add all the elements # as annotations if isinstance(value, (list, set, tuple)): annotations[annotation_key] = {v: True for v in value} # We don't handle dicts and skip them elif isinstance(value, dict): continue # Otherwise, the value is a simple type like str and we use # it directly else: annotations[annotation_key] = {value: True} return citation, text, annotations
[docs]def get_causal_edge(stmt, activates): """Returns the causal, polar edge with the correct "contact".""" any_contact = any( evidence.epistemics.get('direct', False) for evidence in stmt.evidence ) if any_contact: return pc.DIRECTLY_INCREASES if activates else pc.DIRECTLY_DECREASES return pc.INCREASES if activates else pc.DECREASES