# -*- coding: utf-8 -*-
"""Processor for PyBEL."""
import os
import re
import logging
import pybel.dsl as dsl
import pybel.constants as pc
import pybel.language
from collections import defaultdict
from pybel.struct import has_protein_modification
from pybel.canonicalize import edge_to_bel
from bel_resources import get_bel_resource
from indra.statements import *
from indra.util import read_unicode_csv
from indra.databases import (
chebi_client, go_client, hgnc_client, mesh_client,
mirbase_client, uniprot_client, taxonomy_client,
identifiers
)
from indra.resources import get_resource_path
from indra.ontology.standardize import standardize_name_db_refs
from indra.assemblers.pybel.assembler import _pybel_indra_act_map
__all__ = [
'PybelProcessor',
'get_agent',
]
logger = logging.getLogger(__name__)
_pybel_indra_pmod_map_helper = {
'Ph': 'phosphorylation',
'Hy': 'hydroxylation',
'Sumo': 'sumoylation',
'Ac': 'acetylation',
'Glyco': 'glycosylation',
'ADPRib': 'ribosylation',
'Ub': 'ubiquitination',
'Farn': 'farnesylation',
'Gerger': 'geranylgeranylation',
'Palm': 'palmitoylation',
'Myr': 'myristoylation',
'Me': 'methylation',
}
_pybel_indra_pmod_map = {
v['xrefs'][0]: _pybel_indra_pmod_map_helper[k]
for k, v in pybel.language.pmod_mappings.items()
if k in _pybel_indra_pmod_map_helper # More indra statements could be proposed based on this
}
#: A mapping from the BEL text location annotation to the INDRA ones at
#: :py:data:`indra.reach.processor._section_list`
#: see https://arty.scai.fraunhofer.de/artifactory/bel/annotation/text-location/text-location-1.0.0.belanno
_pybel_text_location_map = {
"Abstract": 'abstract',
"Results": 'results',
"Legend": 'figure',
"Review": None,
'Introduction': 'introduction',
'Methods': 'methods',
'Discussion': 'discussion',
'Conclusion': 'conclusion'
}
EXT_CITATION_PREFIXES = {
'DOI',
'BIORXIV',
'CHEMRXIV',
'AGRICOLA',
}
[docs]class PybelProcessor(object):
"""Extract INDRA Statements from a PyBEL Graph.
Currently does not handle non-causal relationships (positiveCorrelation,
(negativeCorrelation, hasVariant, etc.)
Parameters
----------
graph : pybel.BELGraph
PyBEL graph containing the BEL content.
Attributes
----------
statements : list[indra.statements.Statement]
A list of extracted INDRA Statements representing BEL Statements.
"""
def __init__(self, graph):
self.graph = graph
self.statements = []
self.unhandled = []
self.annot_manager = AnnotationManager(self.graph.annotation_url)
# FIXME: Handle reactions
def get_statements(self):
for u_data, v_data, k, d in self.graph.edges(keys=True, data=True):
# We only interpret causal relations, not correlations
if d[pc.RELATION] not in pc.CAUSAL_RELATIONS:
self.unhandled.append((u_data, v_data, k, d))
continue
# If the left or right-hand sides involve complex abundances,
# add them as statements
for node_ix, node in enumerate((u_data, v_data)):
if isinstance(node, dsl.ComplexAbundance):
self._get_enum_complex(u_data, v_data, k, d, node_ix)
subj_activity = _get_activity_condition(d.get(pc.SOURCE_MODIFIER))
obj_activity = _get_activity_condition(d.get(pc.TARGET_MODIFIER))
obj_to_loc = _get_translocation_target(d.get(pc.TARGET_MODIFIER))
# If the object is a translocation, this represents a controlled
# translocation, which we currently do not represent
if obj_to_loc:
self.unhandled.append((u_data, v_data, k, d))
logger.info("Controlled translocations are currently not "
"handled: %s)", edge_to_bel(u_data, v_data, d))
continue
# Modification, e.g.
# x(Foo) -> p(Bar, pmod(Ph))
# act(x(Foo)) -> p(Bar, pmod(Ph))
if isinstance(v_data, dsl.Protein) and \
has_protein_modification(v_data):
if obj_activity:
logger.info("Ignoring object activity modifier in "
"modification statement: %s, %s, %s, %s",
u_data, v_data, k, d)
else:
self._get_modification(u_data, v_data, k, d)
elif obj_activity:
# If the agents on the left and right hand sides are the same,
# then get an active form:
# ActiveForm
# p(Foo, {variants}) ->/-| act(p(Foo))
# Also Composite active forms:
# compositeAbundance(p(Foo, pmod('Ph', 'T')),
# p(Foo, pmod('Ph', 'Y'))) ->/-|
# act(p(Foo))
if not subj_activity and _proteins_match(u_data, v_data):
self._get_active_form(u_data, v_data, k, d)
# Gef
# act(p(Foo)) => gtp(p(Foo))
# Gap
# act(p(Foo)) =| gtp(p(Foo))
elif subj_activity and _rel_is_direct(d) and \
obj_activity.activity_type == 'gtpbound':
self._get_gef_gap(u_data, v_data, k, d)
# Activation/Inhibition
# x(Foo) -> act(x(Foo))
# act(x(Foo)) -> act(x(Foo))
# GtpActivation
# gtp(p(Foo)) => act(p(Foo))
else:
self._get_regulate_activity(u_data, v_data, k, d)
# Activations involving biological processes or pathologies
# x(Foo) -> bp(Bar)
elif isinstance(v_data, (dsl.BiologicalProcess, dsl.Pathology)):
self._get_regulate_activity(u_data, v_data, k, d)
# Regulate amount
# x(Foo) -> p(Bar)
# x(Foo) -> r(Bar)
# act(x(Foo)) -> p(Bar):
# x(Foo) -> deg(p(Bar))
# act(x(Foo)) ->/-| deg(p(Bar))
elif (
isinstance(v_data, (
dsl.Protein, dsl.Rna, dsl.Abundance, dsl.MicroRna,
dsl.NamedComplexAbundance,
))
and not obj_activity
):
self._get_regulate_amount(u_data, v_data, k, d)
# Controlled conversions
# x(Foo) -> rxn(reactants(r1,...,rn), products(p1,...pn))
# act(x(Foo)) -> rxn(reactants(r1,...,rn), products(p1,...pn))
# Note that we can't really handle statements where the relation
# is decreases, as inhibition of a reaction match the semantics
# of a controlled conversion
elif (
isinstance(v_data, dsl.Reaction)
and d[pc.RELATION] in pc.CAUSAL_INCREASE_RELATIONS
):
self._get_conversion(u_data, v_data, k, d)
# UNHANDLED
# rxn(reactants(r1,...,rn), products(p1,...pn))
# Complex(a,b)
# p(A, pmod('ph')) -> Complex(A, B)
# Complex(A-Ph, B)
# Complexes
# complex(x(Foo), x(Bar), ...)
else:
self.unhandled.append((u_data, v_data, k, d))
def _get_enum_complex(self, u_data, v_data, k, edge_data, node_ix):
# Get an agent with bound conditions from the Complex
assert node_ix in (0, 1)
node = [u_data, v_data][node_ix]
cplx_agent = get_agent(node, None)
if cplx_agent is None:
return
agents = [bc.agent for bc in cplx_agent.bound_conditions]
cplx_agent.bound_conditions = []
agents.append(cplx_agent)
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = Complex(agents, evidence=[ev])
self.statements.append(stmt)
def _get_regulate_amount(self, u_data, v_data, k, edge_data):
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
obj_agent = get_agent(v_data, edge_data.get(pc.TARGET_MODIFIER))
if subj_agent is None or obj_agent is None:
self.unhandled.append((u_data, v_data, edge_data))
return
obj_mod = edge_data.get(pc.TARGET_MODIFIER)
has_deg = (obj_mod and obj_mod.get(pc.MODIFIER) == pc.DEGRADATION)
rel = edge_data[pc.RELATION]
if rel == pc.REGULATES:
# TODO: once generic regulations work, we can make this an
# actual statement
# stmt_class = RegulateAmount
return
elif has_deg:
stmt_class = (DecreaseAmount if rel in
pc.CAUSAL_INCREASE_RELATIONS else IncreaseAmount)
else:
stmt_class = (IncreaseAmount if rel in
pc.CAUSAL_INCREASE_RELATIONS else DecreaseAmount)
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = stmt_class(subj_agent, obj_agent, evidence=[ev])
self.statements.append(stmt)
def _get_modification(self, u_data, v_data, k, edge_data):
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
mods, muts = _get_mods_and_muts(v_data)
v_data_no_mods = v_data.get_parent()
obj_agent = get_agent(v_data_no_mods, edge_data.get(pc.TARGET_MODIFIER))
if subj_agent is None or obj_agent is None:
self.unhandled.append((u_data, v_data, k, edge_data))
return
for mod in mods:
modclass = modtype_to_modclass[mod.mod_type]
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = modclass(subj_agent, obj_agent, mod.residue, mod.position,
evidence=[ev])
self.statements.append(stmt)
def _get_regulate_activity(self, u_data, v_data, k, edge_data):
# Subject info
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
subj_activity = _get_activity_condition(edge_data.get(pc.SOURCE_MODIFIER))
# Object info
# Note: Don't pass the object modifier data because we don't want to
# put an activity on the agent
obj_agent = get_agent(v_data, None)
# If it's a bioprocess object, we won't have an activity in the edge
if isinstance(v_data, (dsl.BiologicalProcess, dsl.Pathology)):
activity_type = 'activity'
else:
obj_activity_condition = \
_get_activity_condition(edge_data.get(pc.TARGET_MODIFIER))
activity_type = obj_activity_condition.activity_type
assert obj_activity_condition.is_active is True
# Check for valid subject/object
if subj_agent is None or obj_agent is None:
self.unhandled.append((u_data, v_data, edge_data))
return
# Check which kind of statement we need to make
# GtpActivation
if subj_activity and subj_activity.activity_type == 'gtpbound' and \
isinstance(u_data, dsl.Protein) and isinstance(v_data, dsl.Protein) and \
edge_data[pc.RELATION] == pc.DIRECTLY_INCREASES:
stmt_class = GtpActivation
elif edge_data[pc.RELATION] in pc.CAUSAL_INCREASE_RELATIONS:
stmt_class = Activation
elif edge_data[pc.RELATION] == pc.REGULATES:
# TODO: once generic regulations work, we can make this an
# actual statement
# stmt_class = RegulateActivity
return
else:
stmt_class = Inhibition
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = stmt_class(subj_agent, obj_agent, activity_type, evidence=[ev])
self.statements.append(stmt)
def _get_active_form(self, u_data, v_data, k, edge_data):
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
# Don't pass the object modifier info because we don't want an activity
# condition applied to the agent
obj_agent = get_agent(v_data)
if subj_agent is None or obj_agent is None:
self.unhandled.append((u_data, v_data, edge_data))
return
obj_activity_condition = \
_get_activity_condition(edge_data.get(pc.TARGET_MODIFIER))
activity_type = obj_activity_condition.activity_type
# If the relation is DECREASES, this means that this agent state
# is inactivating
is_active = edge_data[pc.RELATION] in pc.CAUSAL_INCREASE_RELATIONS
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = ActiveForm(subj_agent, activity_type, is_active, evidence=[ev])
self.statements.append(stmt)
def _get_gef_gap(self, u_data, v_data, k, edge_data):
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
obj_agent = get_agent(v_data)
if subj_agent is None or obj_agent is None:
self.unhandled.append((u_data, v_data, k, edge_data))
return
ev = self._get_evidence(u_data, v_data, k, edge_data)
if edge_data[pc.RELATION] in pc.CAUSAL_INCREASE_RELATIONS:
stmt_class = Gef
else:
stmt_class = Gap
stmt = stmt_class(subj_agent, obj_agent, evidence=[ev])
self.statements.append(stmt)
def _get_conversion(self, u_data, v_data, k, edge_data):
subj_agent = get_agent(u_data, edge_data.get(pc.SOURCE_MODIFIER))
# Get the nodes for the reactants and products
reactant_agents = [get_agent(r) for r in v_data[pc.REACTANTS]]
product_agents = [get_agent(p) for p in v_data[pc.PRODUCTS]]
# We are not handling the following degenerate cases:
# If there is no subject agent
if (subj_agent is None or
# If get_agent returned None for any of the reactants or
# products
any(r is None for r in reactant_agents) or
any(p is None for p in product_agents) or
# If there are no reactants and or no products
(not reactant_agents and not product_agents)):
self.unhandled.append((u_data, v_data, k, edge_data))
return
ev = self._get_evidence(u_data, v_data, k, edge_data)
stmt = Conversion(subj_agent, obj_from=reactant_agents,
obj_to=product_agents, evidence=ev)
self.statements.append(stmt)
def _get_evidence(self, u_data, v_data, k, edge_data):
ev_text = edge_data.get(pc.EVIDENCE)
ev_citation = edge_data.get(pc.CITATION)
ev_pmid = None
ev_ref = None
text_refs = {}
if ev_citation:
cit_type = ev_citation.namespace
cit_ref = ev_citation.identifier
if cit_type == pc.CITATION_TYPE_PUBMED:
ev_pmid = cit_ref
ev_ref = None
elif cit_type.upper() in EXT_CITATION_PREFIXES:
text_refs[cit_type.upper()] = cit_ref
else:
ev_pmid = None
ev_ref = '%s: %s' % (cit_type, cit_ref)
epistemics = {'direct': _rel_is_direct(edge_data)}
annotations = edge_data.get(pc.ANNOTATIONS, {})
annotations['bel'] = edge_to_bel(u_data, v_data, edge_data)
if ev_ref:
annotations['citation_ref'] = ev_ref
context = extract_context(annotations, self.annot_manager)
text_location = annotations.pop('TextLocation', None)
if text_location:
text_location = text_location[0].identifier
epistemics['section_type'] = \
_pybel_text_location_map.get(text_location)
ev = Evidence(text=ev_text, pmid=ev_pmid, source_api='bel',
source_id=k, epistemics=epistemics,
annotations=annotations, context=context,
text_refs=text_refs)
return ev
[docs]def get_agent(node_data, node_modifier_data=None):
"""Get an INDRA agent from a PyBEL node."""
# Check the node type/function
if not isinstance(node_data, dsl.BaseEntity):
raise TypeError('Non-pybel dict given: ({}) {}'.format(
type(node_data), node_data))
if not isinstance(node_data, (
dsl.Protein, dsl.Rna, dsl.BiologicalProcess, dsl.ComplexAbundance,
dsl.NamedComplexAbundance, dsl.Pathology, dsl.Abundance,
dsl.MicroRna, dsl.Gene,
)):
mod_data = node_modifier_data or 'No node data'
logger.info("Nodes of type %s not handled: %s",
node_data.function, mod_data)
return None
# Skip gene/protein fusions
if isinstance(node_data, dsl.FusionBase):
logger.info("Gene and protein fusions not handled: %s", node_data)
return None
# COMPLEXES ------------
# First, handle complexes, which will consist recursively of other agents
if isinstance(node_data, dsl.ComplexAbundance):
# First, check for members: if there are no members, we assume this
# is a named complex
members = node_data.members
if members is None:
return None
# Otherwise, get the "main" agent, to which the other members will be
# attached as bound conditions
main_agent = get_agent(members[0])
# If we can't get the main agent, return None
if main_agent is None:
return None
bound_conditions = [BoundCondition(get_agent(m), True)
for m in members[1:]]
# Check the bound_conditions for any None agents
if any([bc.agent is None for bc in bound_conditions]):
return None
main_agent.bound_conditions = bound_conditions
# Get activity of main agent
ac = _get_activity_condition(node_modifier_data)
main_agent.activity = ac
return main_agent
# OTHER NODE TYPES -----
# Get node identifier information
name = node_data.name
ns = node_data.namespace.upper()
ident = node_data.identifier
# No ID present, get identifier using the name, namespace
if not ident:
assert name, "Node must have a name if lacking an identifier."
name, db_refs = get_db_refs_by_name(ns, name, node_data)
# We've already got an identifier, look up other identifiers if necessary
else:
name, db_refs = get_db_refs_by_ident(ns, ident, node_data)
if db_refs is None:
logger.info('Unable to get identifier information for node: %s',
node_data)
return None
# Get modification conditions
mods, muts = _get_mods_and_muts(node_data)
# Get activity condition
ac = _get_activity_condition(node_modifier_data)
to_loc = _get_translocation_target(node_modifier_data)
# Check for unhandled node modifiers, skip if so
if _has_unhandled_modifiers(node_modifier_data):
return None
if not name:
return None
# Make the agent
ag = Agent(name, db_refs=db_refs, mods=mods, mutations=muts, activity=ac,
location=to_loc)
return ag
def get_db_refs_by_name(ns, name, node_data):
"""Return standard name and grounding based on a namespace and a name.
Parameters
----------
ns : str
A name space in which the given name is interpreted.
name : str
The name in the given name space to get grounding for.
node_data : dict
Node data for logging purposes.
Returns
-------
name : str
The standardized name for the given entity.
db_refs : dict
The grounding for the given entity.
"""
db_refs = None
if ns == 'HGNC':
# Assumption: name is an HGNC symbol
hgnc_id = hgnc_client.get_current_hgnc_id(name)
if not hgnc_id:
logger.info("Invalid HGNC name: %s (%s)" % (name, node_data))
return name, None
elif isinstance(hgnc_id, list):
logger.info('More than one current HGNC ID for %s, choosing %s'
% (name, hgnc_id[0]))
hgnc_id = hgnc_id[0]
name = hgnc_client.get_hgnc_name(hgnc_id)
db_refs = {'HGNC': hgnc_id}
up_id = _get_up_id(hgnc_id)
if up_id:
db_refs['UP'] = up_id
mirbase_id = mirbase_client.get_mirbase_id_from_hgnc_id(hgnc_id)
if mirbase_id:
db_refs['MIRBASE'] = mirbase_id
elif ns in ('UNIPROT', 'UP'):
up_id = None
# This is a simple test to see if name is a valid UniProt ID,
# if we can't get a mnemonic, we assume it's not a UP ID
if uniprot_client.get_mnemonic(name, web_fallback=False):
up_id = name
# We next check if it's a mnemonic
else:
up_id_from_mnem = uniprot_client.get_id_from_mnemonic(name)
if up_id_from_mnem:
up_id = up_id_from_mnem
if not up_id:
logger.info('Couldn\'t get UP ID from %s' % name)
return name, None
db_refs = {'UP': up_id}
hgnc_id = uniprot_client.get_hgnc_id(up_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
else:
name = uniprot_client.get_gene_name(up_id)
elif ns == 'FPLX':
db_refs = {'FPLX': name}
elif ns in ('GO', 'GOBP', 'GOCC'):
if name == 'cell proliferation':
name = 'cell population proliferation'
go_id = go_client.get_go_id_from_label(name)
if not go_id:
logger.info('Could not find GO ID for %s' % name)
return name, None
db_refs = {'GO': go_id}
name = go_client.get_go_label(go_id)
elif ns in ('MESHPP', 'MESHD', 'MESH'):
mesh_id, mesh_name = mesh_client.get_mesh_id_name(name)
if not mesh_id:
logger.info('Could not find MESH ID from %s' % name)
return name, None
name = mesh_name
db_refs = {'MESH': mesh_id}
# For now, handle MGI/RGD but putting the name into the db_refs so
# it's clear what namespace the name belongs to
# FIXME: Full implementation would look up MGI/RGD identifiers from
# the names, and obtain corresponding Uniprot IDs
elif ns == 'MGI':
up_id = mouse_lookup.get(name)
if up_id:
db_refs = {'UP': up_id}
elif ns == 'RGD':
up_id = rat_lookup.get(name)
if up_id:
db_refs = {'UP': up_id}
# Map Selventa families and complexes to FamPlex
elif ns in {'SFAM', 'SCOMP'}:
try:
selventa_id, xrefs = selventa_lookup[(ns, name)]
db_refs = {ns: selventa_id}
indra_name = bel_to_indra.get(name)
except KeyError:
indra_name = None
db_refs = None
if indra_name is None:
logger.info('Could not find mapping for BEL/%s family: '
'%s (%s)' % (ns, name, node_data))
else:
db_refs['FPLX'] = indra_name
name = indra_name
# Map Entrez genes to HGNC/UP
elif ns in ('EGID', 'ENTREZ', 'NCBIGENE'):
hgnc_id = hgnc_client.get_hgnc_from_entrez(name)
db_refs = {'EGID': name}
if hgnc_id is not None:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
up_id = hgnc_client.get_uniprot_id(hgnc_id)
if up_id:
db_refs['UP'] = up_id
else:
logger.info('HGNC entity %s with HGNC ID %s has no '
'corresponding Uniprot ID.',
name, hgnc_id)
mirbase_id = mirbase_client.get_mirbase_id_from_hgnc_id(hgnc_id)
if mirbase_id:
db_refs['MIRBASE'] = mirbase_id
else:
logger.debug('Could not map EGID%s to HGNC.' % name)
name = 'E%s' % name
elif ns == 'MIRBASE':
mirbase_id = mirbase_client.get_mirbase_id_from_mirbase_name(name)
if not mirbase_id:
logger.info('Could not map miRBase name %s to ID', name)
return name, None
db_refs = {'MIRBASE': mirbase_id}
hgnc_id = mirbase_client.get_hgnc_id_from_mirbase_id(mirbase_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
# CHEBI
elif ns == 'CHEBI':
# We first look up BEL's own namespace map for ChEBI names to IDs
chebi_id = chebi_name_id.get(name)
# If that fails, we look up INDRA's ChEBI name to ID mapping
if not chebi_id:
chebi_id = chebi_client.get_chebi_id_from_name(name)
if chebi_id:
db_refs = {'CHEBI': chebi_id}
else:
logger.info('CHEBI name %s not found in map.' % name)
# These appear in the name slot but are actually IDs
elif ns == 'CHEBIID':
chebi_id = identifiers.ensure_chebi_prefix(name)
db_refs = {'CHEBI': chebi_id}
name = chebi_client.get_chebi_name_from_id(chebi_id)
# SDIS, SCHEM: Look up the ID and include it in the db_refs
elif ns in {'SDIS', 'SCHEM'}:
try:
sid, xrefs = selventa_lookup[(ns, name)]
db_refs = xrefs.copy()
db_refs[ns] = sid
except KeyError:
logger.info(
f"Could not map Selventa name {name} to ID for {ns}."
)
return name, None
elif ns == 'TEXT':
db_refs = {ns: name}
elif ns == 'TAX':
tid = taxonomy_client.get_taxonomy_id(name)
if tid:
db_refs = {'TAXONOMY': tid}
else:
logger.info('Could not get taxonomy ID for %s' % name)
else:
logger.info("Unhandled namespace: %s: %s (%s)" % (ns, name,
node_data))
return name, db_refs
def get_db_refs_by_ident(ns, ident, node_data):
"""Return standard name and grounding based on a namespace and an ID.
Parameters
----------
ns : str
A name space in which the given identifier is interpreted.
ident : str
The identifier in the given name space to get grounding for.
node_data : pybel.dsl.BaseAbundance
Node data for logging purposes.
Returns
-------
name : str
The standardized name for the given entity.
db_refs : dict
The grounding for the given entity.
"""
ns_list = ['HGNC', 'UNIPROT', 'UP', 'FPLX', 'GO', 'GOBP', 'GOCC',
'MESHPP', 'MESHD', 'MESH', 'MGI', 'RGD', 'SFAM', 'EGID',
'ENTREZ', 'NCBIGENE', 'MIRBASE', 'CHEBI', 'ECCODE' 'SDIS',
'SCHEM', 'TEXT', 'DOID', 'EFO', 'HP', 'PFAM', 'ECCODE',
'HGNC.GENEFAMILY', 'HGNC_GROUP', 'NCBITAXON', 'PUBCHEM']
ns_mappings = {'UNIPROT': 'UP',
'GOBP': 'GO',
'GOCC': 'GO',
'MESHPP': 'MESH',
'MESHD': 'MESH',
'ENTREZ': 'EGID',
'NCBIGENE': 'EGID',
'NCBITAXON': 'TAXONOMY',
'HGNC.GENEFAMILY': 'HGNC_GROUP',
'CHEBIID': 'CHEBI'}
raw_name = node_data.name
if ns in ns_list:
mapped_ns = ns_mappings.get(ns, ns)
raw_db_refs = {mapped_ns: ident}
std_name, std_db_refs = standardize_name_db_refs(raw_db_refs)
if std_name is None:
std_name = raw_name
if std_db_refs is None:
std_db_refs = raw_db_refs
else:
logger.info("Unhandled namespace %s with name %s and "
"identifier %s (%s)." % (ns, raw_name, ident, node_data))
std_name = raw_name
std_db_refs = None
return std_name, std_db_refs
def extract_context(annotations, annot_manager):
"""Return a BioContext object extracted from the annotations.
The entries that are extracted into the BioContext are popped from the
annotations.
Parameters
----------
annotations : dict
PyBEL annotations dict
annot_manager : AnnotationManager
An annotation manager to get name/db reference mappings for each ot the
annotation types.
Returns
-------
bc : BioContext
An INDRA BioContext object
"""
def get_annot(annotations, key):
"""Return a specific annotation given a key."""
val_list = annotations.pop(key, None)
if val_list:
if len(val_list) > 1:
logger.warning('More than one "%s" in annotations' % key)
elif not val_list:
return None
return val_list[0].identifier
return None
bc = BioContext()
species = get_annot(annotations, 'Species')
if species:
name = annot_manager.get_mapping('Species', species)
bc.species = RefContext(name=name, db_refs={'TAXONOMY': species})
mappings = (('CellLine', 'cell_line', None),
('Disease', 'disease', None),
('Anatomy', 'organ', None),
('Cell', 'cell_type', None),
('CellStructure', 'location', 'MESH'))
for bel_name, indra_name, ns in mappings:
ann = get_annot(annotations, bel_name)
if ann:
ref = annot_manager.get_mapping(bel_name, ann)
if ref is None:
continue
if not ns:
db_ns, db_id = ref.split('_', 1)
else:
db_ns, db_id = ns, ref
if db_ns == 'CLO':
db_ns = 'CL'
if db_ns in {'CL', 'UBERON', 'DOID'}:
db_id = identifiers.ensure_prefix(db_ns, db_id)
setattr(bc, indra_name,
RefContext(name=ann, db_refs={db_ns: db_id}))
# Overwrite blank BioContext
if not bc:
bc = None
return bc
def _rel_is_direct(d):
return d[pc.RELATION] in (pc.DIRECTLY_INCREASES, pc.DIRECTLY_DECREASES)
def _get_up_id(hgnc_id):
hgnc_id = str(hgnc_id)
up_id = hgnc_client.get_uniprot_id(hgnc_id)
if not up_id:
logger.info("No Uniprot ID for HGNC ID %s" % hgnc_id)
return None
if ',' in up_id:
return None
return up_id
class AnnotationManager(object):
def __init__(self, annotation_urls):
self.resources = {}
for key, url in annotation_urls.items():
res = get_bel_resource(url)
self.resources[key] = res
self.failures = defaultdict(set)
def get_mapping(self, key, value):
resource = self.resources.get(key)
if resource is None:
return None
term = resource['Values'].get(value)
if term is not None:
return term
logger.warning('unhandled annotation: %s:%s',
key, value)
self.failures[key].add(value)
def _get_mods_and_muts(node_data: dsl.CentralDogma):
"""Get all modifications and mutations on the PyBEL node.
Parameters
----------
node_data : pybel.dsl.CentralDogma
A PyBEL node
Returns
-------
mods : List[ModCondition]
A list of modifications to the given abundance
muts : List[MutCondition]
A list of mutations to the given abundance
"""
mods = []
muts = []
variants = node_data.get(pc.VARIANTS)
if not variants:
return mods, muts
for var in variants:
if isinstance(var, dsl.Hgvs):
if isinstance(node_data, dsl.Gene):
logger.debug('Unhandled genetic variant: %s', node_data)
continue
hgvs_str = var.variant
position, res_from, res_to = _parse_mutation(hgvs_str)
if position is None and res_from is None and res_to is None:
logger.info("Could not parse HGVS string %s", hgvs_str)
else:
mut_cond = MutCondition(position, res_from, res_to)
muts.append(mut_cond)
elif isinstance(var, dsl.ProteinModification):
var_ns = var.entity.namespace
if var_ns == 'go':
mod_type = _pybel_indra_pmod_map.get(var.entity)
if mod_type is None:
logger.info("Unhandled modification type %s (%s)",
var.entity.name, node_data)
continue
mc = ModCondition(mod_type, var.get(pc.PMOD_CODE),
var.get(pc.PMOD_POSITION))
mods.append(mc)
# FIXME These unhandled mod types should result in throwing out
# the node (raise, or return None)
elif isinstance(var, dsl.GeneModification):
logger.debug('Unhandled node variant GMOD: %s', node_data)
elif isinstance(var, dsl.Fragment):
logger.debug('Unhandled node variant FRAG: %s', node_data)
else:
logger.debug('Unknown node variant type: %s', node_data)
return mods, muts
def _get_activity_condition(node_modifier_data):
if node_modifier_data is None or node_modifier_data == {}:
return None
modifier = node_modifier_data.get(pc.MODIFIER)
if modifier is None or modifier != pc.ACTIVITY:
return None
effect = node_modifier_data.get(pc.EFFECT)
# No specific effect, just return generic activity
if not effect:
return ActivityCondition('activity', True)
activity_ns = effect.namespace
if activity_ns == 'go':
activity_type = _pybel_indra_act_map.get(effect)
# If an activity type in BEL/PyBEL that is not implemented in INDRA,
# return generic activity
if activity_type is None:
return ActivityCondition('activity', True)
return ActivityCondition(activity_type, True)
# If an unsupported namespace, simply return generic activity
return ActivityCondition('activity', True)
def _get_translocation_target(node_modifier_data):
# First check if there is a translocation modifier
if node_modifier_data is None or node_modifier_data == {}:
return None
modifier = node_modifier_data.get(pc.MODIFIER)
if modifier is None or modifier != pc.TRANSLOCATION:
return None
# Next, make sure there is information on the translocation target
transloc_data = node_modifier_data.get(pc.EFFECT)
if transloc_data is None:
return None
to_loc_info = transloc_data.get(pc.TO_LOC)
if not to_loc_info:
return None
to_loc_ns = to_loc_info.namespace
to_loc_id = to_loc_info.identifier
to_loc_name = to_loc_info.name
# Only use GO Cellular Component location names
if to_loc_ns not in ('GO', 'GOCC', 'GOCCID') or (not to_loc_name and not to_loc_id):
return None
return go_client.get_valid_location(to_loc_id or to_loc_name)
def _has_unhandled_modifiers(node_modifier_data):
# First check if there is a translocation modifier
if node_modifier_data is None or node_modifier_data == {}:
return False
mod = node_modifier_data.get(pc.MODIFIER)
if mod is None:
return False
if mod in (pc.CELL_SECRETION, pc.CELL_SURFACE_EXPRESSION):
logger.info("Unhandled node modifier data: %s", node_modifier_data)
return True
def _proteins_match(u_data, v_data):
return (
isinstance(u_data, dsl.Protein) and
isinstance(v_data, dsl.Protein) and
u_data.namespace and
v_data.namespace and
u_data.name and
v_data.name and
u_data.namespace == v_data.namespace and
u_data.name == v_data.name
)
_hgvs_protein_mutation = re.compile(r'^p.([A-Z][a-z]{2})(\d+)([A-Z][a-z]{2})')
def _parse_mutation(s):
m = _hgvs_protein_mutation.match(s)
if not m:
return None, None, None
from_aa, position, to_aa = m.groups()
return position, from_aa, to_aa
def _build_famplex_map():
fname = get_resource_path('famplex_map.tsv')
bel_to_indra = {}
csv_rows = read_unicode_csv(fname, delimiter='\t')
for row in csv_rows:
namespace = row[0]
entry = row[1]
indra_name = row[2]
if namespace == 'BEL':
bel_to_indra[entry] = indra_name
return bel_to_indra
def _build_chebi_map():
fname = get_resource_path('bel_chebi_map.tsv')
chebi_name_id = {}
csv_rows = read_unicode_csv(fname, delimiter='\t')
for row in csv_rows:
chebi_name = row[0]
chebi_id = row[1]
chebi_name_id[chebi_name] = chebi_id
return chebi_name_id
def read_selventa_resources():
fname = get_resource_path('selventa_entries.tsv')
csv_rows = read_unicode_csv(fname)
selventa_lookup = {}
for namespace, sid, name, xrefs_str in csv_rows:
# namespace, name -> ID, xrefs
if xrefs_str:
xrefs_dict = {x.split(':', 1)[0]: x.split(':', 1)[1]
for x in xrefs_str.split('|')}
else:
xrefs_dict = {}
selventa_lookup[(namespace, name)] = (sid, xrefs_dict)
return selventa_lookup
bel_to_indra = _build_famplex_map()
chebi_name_id = _build_chebi_map()
selventa_lookup = read_selventa_resources()
mouse_lookup = {gene_name: up_id for up_id, gene_name in
uniprot_client.um.uniprot_gene_name.items()
if uniprot_client.is_mouse(up_id)}
rat_lookup = {gene_name: up_id for up_id, gene_name in
uniprot_client.um.uniprot_gene_name.items()
if uniprot_client.is_rat(up_id)}