Source code for indra.explanation.reporting

from collections import namedtuple
from pysb import Monomer

from indra.sources.indra_db_rest.api import get_statements_by_hash
from indra.statements import *
from indra.assemblers.english.assembler import _assemble_agent_str, \
    SentenceBuilder
from indra.assemblers.pysb.assembler import parse_identifiers_url
from indra.assemblers.pysb.common import _n


[docs]class RefEdge(object): """Refinement edge representing ontological relationship between nodes. Parameters ---------- source : indra.statements.Agent Source agent of the edge. target : indra.statements.Agent Target agent of the edge. relation : str 'is_ref' or 'has_ref' depending on the direction. """ def __init__(self, source, relation, target): self.source = source self.relation = relation self.target = target @classmethod def _from_json(cls, json_tuple): source = Agent._from_json(json_tuple[0]) relation = json_tuple[1] target = Agent._from_json(json_tuple[2]) return RefEdge(source, relation, target) def to_english(self): source_str = _assemble_agent_str(self.source) target_str = _assemble_agent_str(self.target) sb = SentenceBuilder() if self.relation == 'is_ref': rel_str = ' is a refinement of ' elif self.relation == 'has_ref': rel_str = ' has a refinement ' sb.append_as_sentence([source_str, rel_str, target_str]) sb.make_sentence() return sb.sentence def __repr__(self): return str(self) def __str__(self): return 'RefEdge(%s %s %s)' % (self.source, self.relation, self.target) def __eq__(self, other): return (self.source.matches(other.source) and self.target.matches(other.target) and self.relation == other.relation)
[docs]def stmts_from_pysb_path(path, model, stmts): """Return source Statements corresponding to a path in a model. Parameters ---------- path : list[tuple[str, int]] A list of tuples where the first element of the tuple is the name of a rule, and the second is the associated polarity along a path. model : pysb.core.Model A PySB model which contains the rules along the path. stmts : list[indra.statements.Statement] A list of INDRA Statements from which the model was assembled. Returns ------- path_stmts : list[indra.statements.Statement] The Statements from which the rules along the path were obtained. """ path_stmts = [] for step in path: # Refinement edge if len(step) == 3: edge = RefEdge._from_json(step) path_stmts.append(edge) # Regular rule elif len(step) == 2: path_rule, sign = step for rule in model.rules: if rule.name == path_rule: stmt = stmt_from_rule(path_rule, model, stmts) assert stmt is not None path_stmts.append(stmt) return path_stmts
[docs]def stmts_from_indranet_path(path, model, signed, from_db=True, stmts=None): """Return source Statements corresponding to a path in an IndraNet model (found by SignedGraphModelChecker or UnsignedGraphModelChecker). Parameters ---------- path : list[tuple[str, int]] A list of tuples where the first element of the tuple is the name of an agent, and the second is the associated polarity along a path. model : nx.Digraph or nx.MultiDiGraph An IndraNet model flattened into an unsigned DiGraph or signed MultiDiGraph. signed : bool Whether the model and path are signed. from_db : bool If True, uses statement hashes to query the database. Otherwise, looks for path statements in provided stmts. stmts : Optional[list[indra.statements.Statement]] A list of INDRA Statements from which the model was assembled. Required if from_db is set to False. Returns ------- path_stmts : list[[indra.statements.Statement]] A list of lists of INDRA statements explaining the path (each inner corresponds to one step in the path because the flattened model can have multiple statements per edge). """ steps = [] for i in range(len(path[:-1])): source = path[i] target = path[i+1] if len(source) == 3: edge = RefEdge._from_json(source) steps.append([edge]) continue elif len(target) == 3: edge = RefEdge._from_json(target) steps.append([edge]) continue if signed: if source[1] == target[1]: sign = 0 else: sign = 1 stmt_data = model[source[0]][target[0]][sign]['statements'] else: stmt_data = model[source[0]][target[0]]['statements'] hashes = [stmt['stmt_hash'] for stmt in stmt_data] if from_db: p = get_statements_by_hash(hashes) statements = p.statements else: statements = [ stmt for stmt in stmts if stmt.get_hash() in hashes] steps.append(statements) return steps
PybelEdge = namedtuple( 'PybelEdge', ['source', 'target', 'relation', 'reverse']) def pybel_edge_to_english(pybel_edge): source_str = _assemble_agent_str(pybel_edge.source) target_str = _assemble_agent_str(pybel_edge.target) sb = SentenceBuilder() if pybel_edge.relation == 'partOf': if pybel_edge.reverse: rel_str = ' has a component ' else: rel_str = ' is a part of ' elif pybel_edge.relation == 'hasVariant': if pybel_edge.reverse: rel_str = ' is a variant of ' else: rel_str = ' has a variant ' sb.append_as_sentence([source_str, rel_str, target_str]) sb.make_sentence() return sb.sentence
[docs]def stmts_from_pybel_path(path, model, from_db=True, stmts=None): """Return source Statements corresponding to a path in a PyBEL model. Parameters ---------- path : list[tuple[str, int]] A list of tuples where the first element of the tuple is the name of an agent, and the second is the associated polarity along a path. model : pybel.BELGraph A PyBEL BELGraph model. from_db : bool If True, uses statement hashes to query the database. Otherwise, looks for path statements in provided stmts. stmts : Optional[list[indra.statements.Statement]] A list of INDRA Statements from which the model was assembled. Required if from_db is set to False. Returns ------- path_stmts : list[[indra.statements.Statement]] A list of lists of INDRA statements explaining the path (each inner corresponds to one step in the path because PyBEL model can have multiple edges representing multiple statements and evidences between two nodes). """ import pybel.constants as pc from indra.sources.bel.processor import get_agent steps = [] for i in range(len(path[:-1])): source = path[i] target = path[i+1] if len(source) == 3: edge = RefEdge._from_json(source) steps.append([edge]) continue elif len(target) == 3: edge = RefEdge._from_json(target) steps.append([edge]) continue # Check if the signs of source and target nodes are the same positive = (source[1] == target[1]) reverse = False try: all_edges = model[source[0]][target[0]] except KeyError: # May be a symmetric edge all_edges = model[target[0]][source[0]] reverse = True # Only keep the edges with correct sign or non-causal edges = {} key = 0 for edge_data in all_edges.values(): if edge_data['relation'] not in pc.CAUSAL_RELATIONS: edges[key] = edge_data key += 1 if positive and \ edge_data['relation'] in pc.CAUSAL_INCREASE_RELATIONS: edges[key] = edge_data key += 1 elif not positive and \ edge_data['relation'] in pc.CAUSAL_DECREASE_RELATIONS: edges[key] = edge_data key += 1 else: continue hashes = set() for j in range(len(edges)): try: hashes.add(list(edges[j]['annotations']['stmt_hash'])[0]) # partOf and hasVariant edges don't have hashes except KeyError: continue # If we didn't get any hashes, we can get PybelEdge object from # partOf and hasVariant edges if not hashes: statements = [] # Can't get statements without hash from db for edge_v in edges.values(): rel = edge_v['relation'] edge = PybelEdge(get_agent(source[0]), get_agent(target[0]), rel, reverse) statements.append(edge) # Stop if we have an edge to avoid duplicates if len(statements) > 0: break # If we have hashes, retrieve statements from them else: if from_db: p = get_statements_by_hash(list(hashes)) statements = p.statements else: statements = [ stmt for stmt in stmts if stmt.get_hash() in hashes] steps.append(statements) return steps
[docs]def stmt_from_rule(rule_name, model, stmts): """Return the source INDRA Statement corresponding to a rule in a model. Parameters ---------- rule_name : str The name of a rule in the given PySB model. model : pysb.core.Model A PySB model which contains the given rule. stmts : list[indra.statements.Statement] A list of INDRA Statements from which the model was assembled. Returns ------- stmt : indra.statements.Statement The Statement from which the given rule in the model was obtained. """ stmt_uuid = None for ann in model.annotations: if ann.subject == rule_name: if ann.predicate == 'from_indra_statement': stmt_uuid = ann.object break if stmt_uuid: for stmt in stmts: if stmt.uuid == stmt_uuid: return stmt
def agent_from_obs(obs_name, model): db_refs = {} ag_name = None for ann in model.annotations: if ann.subject == obs_name: if ann.predicate == 'from_indra_agent': ag_name = ann.object break if ag_name: mon_name = _n(ag_name) for ann in model.annotations: if isinstance(ann.subject, Monomer) and \ ann.subject.name == mon_name and ann.predicate == 'is': db_name, db_ref = parse_identifiers_url(ann.object) db_refs[db_name] = db_ref ag = Agent(ag_name, db_refs=db_refs) return ag