Source code for indra.assemblers.kami.assembler

from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
import logging
import itertools
from collections import defaultdict
import indra.statements as ist
from indra.assemblers.pysb.assembler import \
        PysbAssembler, _is_whitelisted, \
        UnknownPolicyError, \
        get_binding_site_name, PysbPreassembler, \
        get_agent_rule_str, abbrevs, states, get_mod_site_name

# Python 2
try:
    basestring
# Python 3
except:
    basestring = str

logger = logging.getLogger(__name__)


[docs]class KamiAssembler(PysbAssembler):
[docs] def make_model(self, policies=None, initial_conditions=True, reverse_effects=False): """Assemble the Kami model from the collected INDRA Statements. This method assembles a Kami model from the set of INDRA Statements. The assembled model is both returned and set as the assembler's model argument. Parameters ---------- policies : Optional[Union[str, dict]] A string or dictionary of policies, as defined in :py:class:`indra.assemblers.KamiAssembler`. This set of policies locally supersedes the default setting in the assembler. This is useful when this function is called multiple times with different policies. initial_conditions : Optional[bool] If True, default initial conditions are generated for the agents in the model. Returns ------- model : dict The assembled Kami model. """ self.processed_policies = self.process_policies(policies) ppa = PysbPreassembler(self.statements) ppa.replace_activities() if reverse_effects: ppa.add_reverse_effects() self.statements = ppa.statements # Set local policies for this make_model call that overwrite # the global policies of the Kami assembler if policies is not None: global_policies = self.policies if isinstance(policies, basestring): local_policies = {'other': policies} else: local_policies = {'other': 'default'} local_policies.update(policies) self.policies = local_policies self.model = {} graphs = [] self.model['graphs'] = graphs self.model['typing'] = [] # Action graph generated here action_graph = {'id': 'action_graph', 'attrs': {'name': 'action_graph'}} action_graph['graph'] = {'nodes': [], 'edges': []} graphs.append(action_graph) # Iterate over the statements to generate rules self._assemble() # Add initial conditions #if initial_conditions: # self.add_default_initial_conditions() # If local policies were applied, revert to the global one if policies is not None: self.policies = global_policies return self.model
def _assemble(self): """Calls the appropriate assemble method based on policies.""" for stmt in self.statements: if _is_whitelisted(stmt): self._dispatch(stmt, 'assemble', self.model) def _dispatch(self, stmt, stage, *args): """Construct and call an assembly function. This function constructs the name of the assembly function based on the type of statement, the corresponding policy and the stage of assembly. It then calls that function to perform the assembly task.""" class_name = stmt.__class__.__name__ policy = self.processed_policies[stmt.uuid] func_name = '%s_%s_%s' % (class_name.lower(), stage, policy) func = globals().get(func_name) if func is None: # The specific policy is not implemented for the # given statement type. # We try to apply a default policy next. func_name = '%s_%s_default' % (class_name.lower(), stage) func = globals().get(func_name) if func is None: # The given statement type doesn't have a default # policy. #raise UnknownPolicyError('%s function %s not defined' % # (stage, func_name)) logger.warning('%s function %s not defined' % (stage, func_name)) return return func(stmt, *args)
[docs]class Nugget(object): """Represents a Kami Nugget.""" def __init__(self, id, name, rate): self.counters = defaultdict(int) self.id = id self.name = name self.rate = rate self.nodes = [] self.edges = [] self.typings = {}
[docs] def add_agent(self, agent): """Add an INDRA Agent and its conditions to the Nugget.""" agent_id = self.add_node(agent.name) self.add_typing(agent_id, 'agent') # Handle bound conditions for bc in agent.bound_conditions: # Here we make the assumption that the binding site # is simply named after the binding partner if bc.is_bound: test_type = 'is_bnd' else: test_type = 'is_free' bound_name = bc.agent.name agent_bs = get_binding_site_name(bc.agent) test_name = '%s_bound_to_%s_test' % (agent_id, bound_name) agent_bs_id = self.add_node(agent_bs) test_id = self.add_node(test_name) self.add_edge(agent_bs_id, agent_id) self.add_edge(agent_bs_id, test_id) self.add_typing(agent_bs_id, 'locus') self.add_typing(test_id, test_type) for mod in agent.mods: mod_site_str = abbrevs[mod.mod_type] if mod.residue is not None: mod_site_str = mod.residue mod_pos_str = mod.position if mod.position is not None else '' mod_site = ('%s%s' % (mod_site_str, mod_pos_str)) site_states = states[mod.mod_type] if mod.is_modified: val = site_states[1] else: val = site_states[0] mod_site_id = self.add_node(mod_site, {'val': val}) self.add_edge(mod_site_id, agent_id) self.add_typing(mod_site_id, 'state') return agent_id
[docs] def add_node(self, name_base, attrs=None): """Add a node with a given base name to the Nugget and return ID.""" if name_base not in self.counters: node_id = name_base else: node_id = '%s_%d' % (name_base, self.counters[name_base]) node = {'id': node_id} if attrs: node['attrs'] = attrs self.nodes.append(node) self.counters[node_id] += 1 return node_id
[docs] def add_edge(self, from_node, to_node): """Add an edge between two nodes to the Nugget.""" self.edges.append({'from': from_node, 'to': to_node})
[docs] def add_typing(self, node_id, typing): """Add typing information to a node in the Nugget.""" self.typings[node_id] = typing
[docs] def get_nugget_dict(self): """Return the Nugget as a dictionary.""" nugget_dict = \ {'id': self.id, 'graph': { 'nodes': self.nodes, 'edges': self.edges }, 'attrs': { 'name': self.name, 'rate': self.rate } } return nugget_dict
[docs] def get_typing_dict(self): """Return the Nugget's typing information as a dictionary.""" return self.typings
# COMPLEX ############################################################ def complex_assemble_one_step(stmt, model): pairs = itertools.combinations(stmt.members, 2) for pair in pairs: # Make a rule name nugget_name = '_'.join([get_agent_rule_str(m) for m in pair]) nugget_name += '_bind' action_name = nugget_name + '_act' kf_bind = 1e-6 nugget = Nugget(nugget_name, nugget_name, kf_bind) action_id = nugget.add_node(action_name) # Initialize dicts/lists for this nugget nugget.add_typing(action_id, 'bnd') for agent in pair: agent_id = nugget.add_agent(agent) agent_bs = get_binding_site_name(agent) agent_bs_id = nugget.add_node(agent_bs) nugget.add_edge(agent_bs_id, agent_id) nugget.add_edge(agent_bs_id, action_id) # Add to the Kami typing dict nugget.add_typing(agent_bs_id, 'locus') # Typing dicts linking the nugget to the Action Graph and to the # Kami graph typing_dict_ag = {'from': nugget_name, 'to': 'action_graph', 'mapping': {}, 'total': False, 'ignore_attrs': False} typing_dict_kami = {'from': nugget_name, 'to': 'kami', 'mapping': nugget.get_typing_dict(), 'total': True, 'ignore_attrs': True} # Add the graphs for this nugget to the graphs and typing lists model['typing'] += [typing_dict_ag, typing_dict_kami] model['graphs'].append(nugget.get_nugget_dict()) # In reverse reaction, assume that dissocition is unconditional nugget_name = '_'.join([get_agent_rule_str(m) for m in pair]) nugget_name += '_dissociate' action_name = nugget_name + '_act' kr_bind = 1e-1 nugget = Nugget(nugget_name, nugget_name, kr_bind) action_id = nugget.add_node(action_name) nugget.add_typing(action_id, 'brk') for agent in pair: agent_bs = get_binding_site_name(agent) agent_id = nugget.add_node(agent.name) agent_bs_id = nugget.add_node(agent_bs) nugget.add_edge(agent_bs_id, agent_id) nugget.add_edge(agent_bs_id, action_id) nugget.add_typing(agent_id, 'agent') nugget.add_typing(agent_bs_id, 'locus') # Typing dicts linking the nugget to the Action Graph and to the # Kami graph typing_dict_ag = {'from': nugget_name, 'to': 'action_graph', 'mapping': {}, 'total': False, 'ignore_attrs': False} typing_dict_kami = {'from': nugget_name, 'to': 'kami', 'mapping': nugget.get_typing_dict(), 'total': True, 'ignore_attrs': True} # Add the graphs for this nugget to the graphs and typing lists model['typing'] += [typing_dict_ag, typing_dict_kami] model['graphs'].append(nugget.get_nugget_dict()) complex_assemble_default = complex_assemble_one_step def _mod_demod_assemble_one_step(stmt, model, is_mod): # Define some basic parameters for the modification mc = stmt._get_mod_condition() mod_condition_name = mc.mod_type mod_site = get_mod_site_name(mc) rule_enz_str = get_agent_rule_str(stmt.enz) rule_sub_str = get_agent_rule_str(stmt.sub) nugget_name = '%s_%s_%s_%s' % \ (rule_enz_str, mod_condition_name, rule_sub_str, mod_site) action_name = nugget_name + '_act' kf_mod = 1e-6 nugget = Nugget(nugget_name, nugget_name, kf_mod) enz_id = nugget.add_agent(stmt.enz) sub_id = nugget.add_agent(stmt.sub) st = states[mod_condition_name] from_state, to_state = (st[0], st[1]) if is_mod else (st[1], st[0]) mod_site_id = nugget.add_node(mod_site, {'val': from_state}) action_id = nugget.add_node(action_name, {'val': to_state}) nugget.add_typing(mod_site_id, 'state') nugget.add_typing(action_id, 'mod') nugget.add_edge(mod_site_id, sub_id) nugget.add_edge(action_id, mod_site_id) nugget.add_edge(enz_id, action_id) # Typing dicts linking the nugget to the Action Graph and to the # Kami graph typing_dict_ag = {'from': nugget_name, 'to': 'action_graph', 'mapping': {}, 'total': False, 'ignore_attrs': False} typing_dict_kami = {'from': nugget_name, 'to': 'kami', 'mapping': nugget.get_typing_dict(), 'total': True, 'ignore_attrs': True} # Add the graphs for this nugget to the graphs and typing lists model['typing'] += [typing_dict_ag, typing_dict_kami] model['graphs'].append(nugget.get_nugget_dict()) def modification_assemble_one_step(stmt, model): if stmt.enz is None: return _mod_demod_assemble_one_step(stmt, model, True) def demodification_assemble_one_step(stmt, model): if stmt.enz is None: return _mod_demod_assemble_one_step(stmt, model, False) modification_assemble_default = modification_assemble_one_step demodification_assemble_default = demodification_assemble_one_step # Map specific modification monomer/assembly functions to the generic # Modification assembly function policies = ['one_step', 'default'] mod_classes = [cls for cls in ist.AddModification.__subclasses__()] for mc, func_type, pol in itertools.product(mod_classes, ('assemble', ), policies): code = '{mc}_{func_type}_{pol} = ' \ 'modification_{func_type}_{pol}'.format( mc=ist.modclass_to_modtype[mc], func_type=func_type, pol=pol) exec(code) demod_classes = [cls for cls in ist.RemoveModification.__subclasses__()] for mc, func_type, pol in itertools.product(demod_classes, ('assemble', ), policies): code = '{mc}_{func_type}_{pol} = ' \ 'demodification_{func_type}_{pol}'.format( mc=ist.modclass_to_modtype[mc], func_type=func_type, pol=pol) exec(code)