import uuid
import logging
import networkx
import itertools
from indra.util import fast_deepcopy
from indra.statements import *
from indra.ontology.bio import bio_ontology
logger = logging.getLogger(__name__)
[docs]class MechLinker(object):
"""Rewrite the activation pattern of Statements and derive new Statements.
The mechanism linker (MechLinker) traverses a corpus of Statements and
uses various inference steps to make the activity types and active
forms consistent among Statements.
"""
def __init__(self, stmts=None):
if stmts is not None:
self.statements = stmts
else:
self.statements = []
self.base_agents = BaseAgentSet()
[docs] def add_statements(self, stmts):
"""Add statements to the MechLinker.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of Statements to add.
"""
self.statements.extend(stmts)
[docs] def gather_explicit_activities(self):
"""Aggregate all explicit activities and active forms of Agents.
This function iterates over self.statements and extracts explicitly
stated activity types and active forms for Agents.
"""
for stmt in self.statements:
agents = stmt.agent_list()
# Activity types given as ActivityConditions
for agent in agents:
if agent is not None and agent.activity is not None:
agent_base = self._get_base(agent)
agent_base.add_activity(agent.activity.activity_type)
# Object activities given in RegulateActivity statements
if isinstance(stmt, RegulateActivity):
if stmt.obj is not None:
obj_base = self._get_base(stmt.obj)
obj_base.add_activity(stmt.obj_activity)
# Activity types given in ActiveForms
elif isinstance(stmt, ActiveForm):
agent_base = self._get_base(stmt.agent)
agent_base.add_activity(stmt.activity)
if stmt.is_active:
agent_base.add_active_state(stmt.activity, stmt.agent,
stmt.evidence)
else:
agent_base.add_inactive_state(stmt.activity, stmt.agent,
stmt.evidence)
[docs] def gather_implicit_activities(self):
"""Aggregate all implicit activities and active forms of Agents.
Iterate over self.statements and collect the implied activities
and active forms of Agents that appear in the Statements.
Note that using this function to collect implied Agent activities can
be risky. Assume, for instance, that a Statement from a reading
system states that EGF bound to EGFR phosphorylates ERK. This would
be interpreted as implicit evidence for the EGFR-bound form of EGF
to have 'kinase' activity, which is clearly incorrect.
In contrast the alternative pair of this function:
gather_explicit_activities collects only explicitly stated activities.
"""
for stmt in self.statements:
if isinstance(stmt, Phosphorylation) or \
isinstance(stmt, Transphosphorylation) or \
isinstance(stmt, Autophosphorylation):
if stmt.enz is not None:
enz_base = self._get_base(stmt.enz)
enz_base.add_activity('kinase')
enz_base.add_active_state('kinase', stmt.enz.mods)
elif isinstance(stmt, Dephosphorylation):
if stmt.enz is not None:
enz_base = self._get_base(stmt.enz)
enz_base.add_activity('phosphatase')
enz_base.add_active_state('phosphatase', stmt.enz.mods)
elif isinstance(stmt, Modification):
if stmt.enz is not None:
enz_base = self._get_base(stmt.enz)
enz_base.add_activity('catalytic')
enz_base.add_active_state('catalytic', stmt.enz.mods)
elif isinstance(stmt, SelfModification):
if stmt.enz is not None:
enz_base = self._get_base(stmt.enz)
enz_base.add_activity('catalytic')
enz_base.add_active_state('catalytic', stmt.enz.mods)
elif isinstance(stmt, Gef):
if stmt.gef is not None:
gef_base = self._get_base(stmt.gef)
gef_base.add_activity('gef')
if stmt.gef.activity is not None:
act = stmt.gef.activity.activity_type
else:
act = 'activity'
gef_base.add_active_state(act, stmt.gef.mods)
elif isinstance(stmt, Gap):
if stmt.gap is not None:
gap_base = self._get_base(stmt.gap)
gap_base.add_activity('gap')
if stmt.gap.activity is not None:
act = stmt.gap.activity.activity_type
else:
act = 'activity'
gap_base.add_active_state('act', stmt.gap.mods)
elif isinstance(stmt, RegulateActivity):
if stmt.subj is not None:
subj_base = self._get_base(stmt.subj)
subj_base.add_activity(stmt.j)
def gather_modifications(self):
for stmt in self.statements:
if isinstance(stmt, Modification):
sub_base = self._get_base(stmt.sub)
pol = isinstance(stmt, AddModification)
mod_type = modclass_to_modtype[stmt.__class__]
if not pol:
mod_type = modtype_to_inverse[mod_type]
mc = ModCondition(mod_type, stmt.residue, stmt.position, pol)
sub_base.add_modification(mc)
for agent in stmt.agent_list():
if agent is not None:
agent_base = self._get_base(agent)
for mc in agent.mods:
agent_base.add_modification(mc)
def reduce_modifications(self):
for stmt in self.statements:
if isinstance(stmt, Modification):
pol = isinstance(stmt, AddModification)
mod_type = modclass_to_modtype[stmt.__class__]
if not pol:
mod_type = modtype_to_inverse[mod_type]
mc = ModCondition(mod_type, stmt.residue, stmt.position, pol)
sub_base = self._get_base(stmt.sub)
mc_red = sub_base.get_modification_reduction(mc)
stmt.residue = mc_red.residue
stmt.position = mc_red.position
agents = stmt.agent_list()
for agent in agents:
if agent is not None and agent.mods:
agent_base = self._get_base(agent)
for i, mc in enumerate(agent.mods):
mc_red = agent_base.get_modification_reduction(mc)
agent.mods[i] = mc_red
[docs] def reduce_activities(self):
"""Rewrite the activity types referenced in Statements for consistency.
Activity types are reduced to the most specific form whenever possible.
For instance, if 'kinase' is the only specific activity type known
for the BaseAgent of BRAF, its generic 'activity' forms are rewritten
to 'kinase'.
"""
for stmt in self.statements:
agents = stmt.agent_list()
for agent in agents:
if agent is not None and agent.activity is not None:
agent_base = self._get_base(agent)
act_red = agent_base.get_activity_reduction(
agent.activity.activity_type)
if act_red is not None:
agent.activity.activity_type = act_red
if isinstance(stmt, RegulateActivity):
if stmt.obj is not None:
obj_base = self._get_base(stmt.obj)
act_red = \
obj_base.get_activity_reduction(stmt.obj_activity)
if act_red is not None:
stmt.obj_activity = act_red
elif isinstance(stmt, ActiveForm):
agent_base = self._get_base(stmt.agent)
act_red = agent_base.get_activity_reduction(stmt.activity)
if act_red is not None:
stmt.activity = act_red
[docs] @staticmethod
def infer_complexes(stmts):
"""Return inferred Complex from Statements implying physical interaction.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of Statements to infer Complexes from.
Returns
-------
linked_stmts : list[indra.mechlinker.LinkedStatement]
A list of LinkedStatements representing the inferred Statements.
"""
interact_stmts = _get_statements_by_type(stmts, Modification)
linked_stmts = []
for mstmt in interact_stmts:
if mstmt.enz is None:
continue
st = Complex([mstmt.enz, mstmt.sub], evidence=mstmt.evidence)
linked_stmts.append(st)
return linked_stmts
[docs] @staticmethod
def infer_activations(stmts):
"""Return inferred RegulateActivity from Modification + ActiveForm.
This function looks for combinations of Modification and ActiveForm
Statements and infers Activation/Inhibition Statements from them.
For example, if we know that A phosphorylates B, and the
phosphorylated form of B is active, then we can infer that
A activates B. This can also be viewed as having "explained" a given
Activation/Inhibition Statement with a combination of more mechanistic
Modification + ActiveForm Statements.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of Statements to infer RegulateActivity from.
Returns
-------
linked_stmts : list[indra.mechlinker.LinkedStatement]
A list of LinkedStatements representing the inferred Statements.
"""
linked_stmts = []
af_stmts = _get_statements_by_type(stmts, ActiveForm)
mod_stmts = _get_statements_by_type(stmts, Modification)
for af_stmt, mod_stmt in itertools.product(*(af_stmts, mod_stmts)):
# There has to be an enzyme and the substrate and the
# agent of the active form have to match
if mod_stmt.enz is None or \
(not af_stmt.agent.entity_matches(mod_stmt.sub)):
continue
# We now check the modifications to make sure they are consistent
if not af_stmt.agent.mods:
continue
found = False
for mc in af_stmt.agent.mods:
if mc.mod_type == modclass_to_modtype[mod_stmt.__class__] and \
mc.residue == mod_stmt.residue and \
mc.position == mod_stmt.position:
found = True
if not found:
continue
# Collect evidence
ev = mod_stmt.evidence
# Finally, check the polarity of the ActiveForm
if af_stmt.is_active:
st = Activation(mod_stmt.enz, mod_stmt.sub, af_stmt.activity,
evidence=ev)
else:
st = Inhibition(mod_stmt.enz, mod_stmt.sub, af_stmt.activity,
evidence=ev)
linked_stmts.append(LinkedStatement([af_stmt, mod_stmt], st))
return linked_stmts
[docs] @staticmethod
def infer_modifications(stmts):
"""Return inferred Modification from RegulateActivity + ActiveForm.
This function looks for combinations of Activation/Inhibition Statements
and ActiveForm Statements that imply a Modification Statement.
For example, if we know that A activates B, and phosphorylated B is
active, then we can infer that A leads to the phosphorylation of B.
An additional requirement when making this assumption is that the
activity of B should only be dependent on the modified state and not
other context - otherwise the inferred Modification is not necessarily
warranted.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of Statements to infer Modifications from.
Returns
-------
linked_stmts : list[indra.mechlinker.LinkedStatement]
A list of LinkedStatements representing the inferred Statements.
"""
linked_stmts = []
for act_stmt in _get_statements_by_type(stmts, RegulateActivity):
for af_stmt in _get_statements_by_type(stmts, ActiveForm):
if not af_stmt.agent.entity_matches(act_stmt.obj):
continue
mods = af_stmt.agent.mods
# Make sure the ActiveForm only involves modified sites
if af_stmt.agent.mutations or \
af_stmt.agent.bound_conditions or \
af_stmt.agent.location:
continue
if not af_stmt.agent.mods:
continue
for mod in af_stmt.agent.mods:
evs = act_stmt.evidence + af_stmt.evidence
for ev in evs:
ev.epistemics['direct'] = False
if mod.is_modified:
mod_type_name = mod.mod_type
else:
mod_type_name = modtype_to_inverse[mod.mod_type]
mod_class = modtype_to_modclass[mod_type_name]
if not mod_class:
continue
st = mod_class(act_stmt.subj,
act_stmt.obj,
mod.residue, mod.position,
evidence=evs)
ls = LinkedStatement([act_stmt, af_stmt], st)
linked_stmts.append(ls)
logger.info('inferred: %s' % st)
return linked_stmts
[docs] def replace_complexes(self, linked_stmts=None):
"""Remove Complex Statements that can be inferred out.
This function iterates over self.statements and looks for Complex
Statements that either match or are refined by inferred Complex
Statements that were linked (provided as the linked_stmts argument).
It removes Complex Statements from self.statements that can be
explained by the linked statements.
Parameters
----------
linked_stmts : Optional[list[indra.mechlinker.LinkedStatement]]
A list of linked statements, optionally passed from outside.
If None is passed, the MechLinker runs self.infer_complexes to
infer Complexes and obtain a list of LinkedStatements that are
then used for removing existing Complexes in self.statements.
"""
if linked_stmts is None:
linked_stmts = self.infer_complexes(self.statements)
new_stmts = []
for stmt in self.statements:
if not isinstance(stmt, Complex):
new_stmts.append(stmt)
continue
found = False
for linked_stmt in linked_stmts:
if linked_stmt.refinement_of(stmt, bio_ontology):
found = True
if not found:
new_stmts.append(stmt)
else:
logger.info('Removing complex: %s' % stmt)
self.statements = new_stmts
[docs] def replace_activations(self, linked_stmts=None):
"""Remove RegulateActivity Statements that can be inferred out.
This function iterates over self.statements and looks for
RegulateActivity Statements that either match or are refined by
inferred RegulateActivity Statements that were linked
(provided as the linked_stmts argument).
It removes RegulateActivity Statements from self.statements that can be
explained by the linked statements.
Parameters
----------
linked_stmts : Optional[list[indra.mechlinker.LinkedStatement]]
A list of linked statements, optionally passed from outside.
If None is passed, the MechLinker runs self.infer_activations to
infer RegulateActivities and obtain a list of LinkedStatements
that are then used for removing existing Complexes
in self.statements.
"""
if linked_stmts is None:
linked_stmts = self.infer_activations(self.statements)
new_stmts = []
for stmt in self.statements:
if not isinstance(stmt, RegulateActivity):
new_stmts.append(stmt)
continue
found = False
for linked_stmt in linked_stmts:
inferred_stmt = linked_stmt.inferred_stmt
if stmt.is_activation == inferred_stmt.is_activation and \
stmt.subj.entity_matches(inferred_stmt.subj) and \
stmt.obj.entity_matches(inferred_stmt.obj):
found = True
if not found:
new_stmts.append(stmt)
else:
logger.info('Removing regulate activity: %s' % stmt)
self.statements = new_stmts
def _get_base(self, agent):
"""Return the BaseAgent corresponding to an Agent.
Parameters
----------
agent : indra.statements.Agent
Returns
-------
base_agent : indra.mechlinker.BaseAgent
"""
base_agent = self.base_agents.get_create_base_agent(agent)
return base_agent
[docs]class BaseAgentSet(object):
"""Container for a set of BaseAgents.
This class wraps a dict of BaseAgent instance and can be used to get and
set BaseAgents.
"""
def __init__(self):
self.agents = {}
[docs] def get_create_base_agent(self, agent):
"""Return BaseAgent from an Agent, creating it if needed.
Parameters
----------
agent : indra.statements.Agent
Returns
-------
base_agent : indra.mechlinker.BaseAgent
"""
try:
base_agent = self.agents[agent.name]
except KeyError:
base_agent = BaseAgent(agent.name)
self.agents[agent.name] = base_agent
return base_agent
def keys(self):
return self.agents.keys()
def items(self):
return self.agents.items()
def __getitem__(self, name):
return self.agents[name]
[docs]class BaseAgent(object):
"""Represents all activity types and active forms of an Agent.
Parameters
----------
name : str
The name of the BaseAgent
activity_types : list[str]
A list of activity types that the Agent has
active_states : dict
A dict of activity types and their associated Agent states
activity_reductions : dict
A dict of activity types and the type they are reduced to by inference.
"""
def __init__(self, name):
self.name = name
self.activity_types = []
self.active_states = {}
self.inactive_states = {}
self.activity_graph = None
self.activity_reductions = None
self.modification_reductions = None
self.modifications = []
def get_activity_reduction(self, activity):
if self.activity_reductions is None:
self._make_activity_reductions()
return self.activity_reductions.get(activity)
def _make_activity_reductions(self):
self._make_activity_graph()
self.activity_reductions = _get_graph_reductions(self.activity_graph)
def _make_activity_graph(self):
self.activity_graph = networkx.DiGraph()
for a1, a2 in itertools.combinations(self.activity_types, 2):
if bio_ontology.isa('INDRA_ACTIVITIES', a1,
'INDRA_ACTIVITIES', a2):
self.activity_graph.add_edge(a2, a1)
if bio_ontology.isa('INDRA_ACTIVITIES', a2,
'INDRA_ACTIVITIES', a1):
self.activity_graph.add_edge(a1, a2)
def get_modification_reduction(self, mc):
if self.modification_reductions is None:
self._make_modification_reductions()
mc_red_tuple = self.modification_reductions.get(_mc_tuple(mc))
# This handles the case where there was no reduction
if not mc_red_tuple:
return mc
mc = ModCondition(*(list(mc_red_tuple) + [mc.is_modified]))
return mc
def _make_modification_reductions(self):
self._make_modification_graph()
self.modification_reductions = \
_get_graph_reductions(self.modification_graph)
def _make_modification_graph(self):
self.modification_graph = networkx.DiGraph()
for m1, m2 in itertools.combinations(self.modifications, 2):
if m1.refinement_of(m2, bio_ontology):
self.modification_graph.add_edge(_mc_tuple(m2), _mc_tuple(m1))
elif m2.refinement_of(m1, bio_ontology):
self.modification_graph.add_edge(_mc_tuple(m1), _mc_tuple(m2))
def add_activity(self, activity_type):
if activity_type not in self.activity_types:
self.activity_types.append(activity_type)
def add_active_state(self, activity_type, agent, evidence):
agent_state = AgentState(agent, evidence)
if activity_type in self.active_states:
self.active_states[activity_type].append(agent_state)
else:
self.active_states[activity_type] = [agent_state]
def add_inactive_state(self, activity_type, agent, evidence):
agent_state = AgentState(agent, evidence)
if activity_type in self.inactive_states:
self.inactive_states[activity_type].append(agent_state)
else:
self.inactive_states[activity_type] = [agent_state]
def get_active_forms(self):
# TODO: handle activity types
if self.active_states:
states = []
for k, v in self.active_states.items():
states += v
return states
return None
def get_inactive_forms(self):
# TODO: handle activity types
if self.inactive_states:
states = []
for k, v in self.inactive_states.items():
states += v
return states
return None
def add_modification(self, mc):
mcc = ModCondition(mc.mod_type, mc.residue, mc.position, True)
found = False
for mod in self.modifications:
if mcc.matches(mod):
found = True
break
if not found:
self.modifications.append(mcc)
def __str__(self):
s = '%s(' % self.name
if self.activity_types:
s += 'activity_types: %s, ' % self.activity_types
for k, v in self.active_states.items():
s += '%s: %s' % (k, v)
s += ')'
return s
def __repr__(self):
return str(self)
[docs]class AgentState(object):
"""A class representing Agent state without identifying a specific Agent.
Attributes
----------
bound_conditions : list[indra.statements.BoundCondition]
mods : list[indra.statements.ModCondition]
mutations : list[indra.statements.Mutation]
location : indra.statements.location
"""
def __init__(self, agent, evidence=None):
self.bound_conditions = agent.bound_conditions
self.mods = agent.mods
self.mutations = agent.mutations
self.location = agent.location
self.evidence = evidence or []
[docs] def apply_to(self, agent):
"""Apply this object's state to an Agent.
Parameters
----------
agent : indra.statements.Agent
The agent to which the state should be applied
"""
agent.bound_conditions = self.bound_conditions
agent.mods = self.mods
agent.mutations = self.mutations
agent.location = self.location
return self.evidence
def __repr__(self):
s = 'AgentState(%s, %s, %s, %s)' % (self.bound_conditions, self.mods,
self.mutations, self.location)
return s
[docs]class LinkedStatement(object):
"""A tuple containing a list of source Statements and an inferred Statement.
The list of source Statements are the basis for the inferred Statement.
Parameters
----------
source_stmts : list[indra.statements.Statement]
A list of source Statements
inferred_stmts : indra.statements.Statement
A Statement that was inferred from the source Statements.
"""
def __init__(self, source_stmts, inferred_stmt):
self.source_stmts = source_stmts
self.inferred_stmt = inferred_stmt
def __str__(self):
source_str = ', '.join([str(st) for st in self.source_stmts])
inferred_str = str(self.inferred_stmt)
s = 'LinkedStatement((%s), %s)' % (source_str, inferred_str)
return s
def __repr__(self):
return str(self)
def _get_statements_by_type(stmts, stmt_type):
return [st for st in stmts if isinstance(st, stmt_type)]
def _get_graph_reductions(graph):
"""Return transitive reductions on a DAG.
This is used to reduce the set of activities of a BaseAgent to the most
specific one(s) possible. For instance, if a BaseAgent is know to have
'activity', 'catalytic' and 'kinase' activity, then this function will
return {'activity': 'kinase', 'catalytic': 'kinase', 'kinase': 'kinase'}
as the set of reductions.
"""
def frontier(g, nd):
"""Return the nodes after nd in the topological sort that are at the
lowest possible level of the topological sort."""
if g.out_degree(nd) == 0:
return set([nd])
else:
frontiers = set()
for n in g.successors(nd):
frontiers = frontiers.union(frontier(graph, n))
return frontiers
reductions = {}
nodes_sort = list(networkx.algorithms.dag.topological_sort(graph))
frontiers = [frontier(graph, n) for n in nodes_sort]
# This loop ensures that if a node n2 comes after node n1 in the topological
# sort, and their frontiers are identical then n1 can be reduced to n2.
# If their frontiers aren't identical, the reduction cannot be done.
for i, n1 in enumerate(nodes_sort):
for j, n2 in enumerate(nodes_sort):
if i > j:
continue
if frontiers[i] == frontiers[j]:
reductions[n1] = n2
return reductions
def _mc_tuple(mc):
return (mc.mod_type, mc.residue, mc.position)