from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
import os
import sys
try:
# Python 2
import cPickle as pickle
except ImportError:
# Python 3
import pickle
import logging
from typing import List
from collections import defaultdict
from copy import deepcopy, copy
from indra.statements import *
from indra.belief import BeliefEngine
from indra.util import read_unicode_csv
from indra.pipeline import register_pipeline
from indra.mechlinker import MechLinker
from indra.databases import hgnc_client
from indra.ontology.bio import bio_ontology
from indra.preassembler import Preassembler, flatten_evidence
from indra.resources import get_resource_path
from indra.statements.validate import print_validation_report
from indra.literature.pubmed_client import is_retracted
import indra.tools.fix_invalidities
logger = logging.getLogger(__name__)
def _filter(kwargs, arg_list):
return dict(filter(lambda x: x[0] in arg_list, kwargs.items()))
[docs]@register_pipeline
def dump_statements(stmts_in, fname, protocol=4):
"""Dump a list of statements into a pickle file.
Parameters
----------
fname : str
The name of the pickle file to dump statements into.
protocol : Optional[int]
The pickle protocol to use (use 2 for Python 2 compatibility).
Default: 4
"""
logger.info('Dumping %d statements into %s...' % (len(stmts_in), fname))
with open(fname, 'wb') as fh:
pickle.dump(stmts_in, fh, protocol=protocol)
return stmts_in
[docs]def load_statements(fname, as_dict=False):
"""Load statements from a pickle file.
Parameters
----------
fname : str
The name of the pickle file to load statements from.
as_dict : Optional[bool]
If True and the pickle file contains a dictionary of statements, it
is returned as a dictionary. If False, the statements are always
returned in a list. Default: False
Returns
-------
stmts : list
A list or dict of statements that were loaded.
"""
logger.info('Loading %s...' % fname)
with open(fname, 'rb') as fh:
# Encoding argument not available in pickle for Python 2
if sys.version_info[0] < 3:
stmts = pickle.load(fh)
# Encoding argument specified here to enable compatibility with
# pickle files created with Python 2
else:
stmts = pickle.load(fh, encoding='latin1')
if isinstance(stmts, dict):
if as_dict:
return stmts
st = []
for pmid, st_list in stmts.items():
st += st_list
stmts = st
logger.info('Loaded %d statements' % len(stmts))
return stmts
[docs]@register_pipeline
def map_grounding(stmts_in, do_rename=True, grounding_map=None,
misgrounding_map=None, agent_map=None, ignores=None, use_adeft=True,
gilda_mode=None, grounding_map_policy='replace', **kwargs):
"""Map grounding using the GroundingMapper.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to map.
do_rename : Optional[bool]
If True, Agents are renamed based on their mapped grounding.
grounding_map : Optional[dict]
A user supplied grounding map which maps a string to a
dictionary of database IDs (in the format used by Agents'
db_refs).
misgrounding_map : Optional[dict]
A user supplied misgrounding map which maps a string to a known
misgrounding which can be eliminated by the grounding mapper.
ignores : Optional[list]
A user supplied list of ignorable strings which, if present as an
Agent text in a Statement, the Statement is filtered out.
use_adeft : Optional[bool]
If True, Adeft will be attempted to be used for acronym disambiguation.
Default: True
gilda_mode : Optional[str]
If None, Gilda will not be for disambiguation. If 'web', the address
set in the GILDA_URL configuration or environmental variable is
used as a Gilda web service. If 'local', the gilda package is
imported and used locally.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
grounding_map_policy : Optional[str]
If a grounding map is provided, use the policy to extend or replace
a default grounding map. Default: 'replace'.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of mapped statements.
"""
from indra.preassembler.grounding_mapper import GroundingMapper,\
default_agent_map, default_grounding_map, default_ignores, \
default_misgrounding_map
logger.info('Mapping grounding on %d statements...' % len(stmts_in))
ignores = ignores if ignores else default_ignores
gm = grounding_map
if not gm:
gm = default_grounding_map
elif grounding_map_policy == 'extend':
default_gm = {k: v for (k, v) in default_grounding_map.items()}
default_gm.update(gm)
gm = default_gm
misgm = misgrounding_map if misgrounding_map else default_misgrounding_map
agent_map = agent_map if agent_map else default_agent_map
gm = GroundingMapper(gm, agent_map=agent_map,
misgrounding_map=misgm, ignores=ignores,
use_adeft=use_adeft, gilda_mode=gilda_mode)
stmts_out = gm.map_stmts(stmts_in, do_rename=do_rename)
# Patch wrong locations in Translocation statements
for stmt in stmts_out:
if isinstance(stmt, Translocation):
if not stmt.from_location:
stmt.from_location = None
if not stmt.to_location:
stmt.to_location = None
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def merge_groundings(stmts_in):
"""Gather and merge original grounding information from evidences.
Each Statement's evidences are traversed to find original grounding
information. These groundings are then merged into an overall consensus
grounding dict with as much detail as possible.
The current implementation is only applicable to Statements whose
concept/agent roles are fixed. Complexes, Associations and Conversions
cannot be handled correctly.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of INDRA Statements whose groundings should be merged. These
Statements are meant to have been preassembled and potentially have
multiple pieces of evidence.
Returns
-------
stmts_out : list[indra.statements.Statement]
The list of Statements now with groundings merged at the Statement
level.
"""
def surface_grounding(stmt):
# Find the "best" grounding for a given concept and its evidences
# and surface that
for idx, concept in enumerate(stmt.agent_list()):
if concept is None:
continue
aggregate_groundings = {}
for ev in stmt.evidence:
if 'agents' in ev.annotations:
groundings = ev.annotations['agents']['raw_grounding'][idx]
for ns, value in groundings.items():
if ns not in aggregate_groundings:
aggregate_groundings[ns] = []
if isinstance(value, list):
aggregate_groundings[ns] += value
else:
aggregate_groundings[ns].append(value)
best_groundings = get_best_groundings(aggregate_groundings)
concept.db_refs = best_groundings
def get_best_groundings(aggregate_groundings):
best_groundings = {}
for ns, values in aggregate_groundings.items():
# There are 3 possibilities here
# 1. All the entries in the list are scored in which case we
# get unique entries and sort them by score
if all([isinstance(v, (tuple, list)) for v in values]):
best_groundings[ns] = []
for unique_value in {v[0] for v in values}:
scores = [v[1] for v in values if v[0] == unique_value]
best_groundings[ns].append((unique_value, max(scores)))
best_groundings[ns] = \
sorted(best_groundings[ns], key=lambda x: x[1],
reverse=True)
# 2. All the entries in the list are unscored in which case we
# get the highest frequency entry
elif all([not isinstance(v, (tuple, list)) for v in values]):
best_groundings[ns] = max(set(values), key=values.count)
# 3. There is a mixture, which can happen when some entries were
# mapped with scores and others had no scores to begin with.
# In this case, we again pick the highest frequency non-scored
# entry assuming that the unmapped version is more reliable.
else:
unscored_vals = [v for v in values
if not isinstance(v, (tuple, list))]
best_groundings[ns] = max(set(unscored_vals),
key=unscored_vals.count)
return best_groundings
stmts_out = []
for stmt in stmts_in:
if not isinstance(stmt, (Complex, Conversion)):
surface_grounding(stmt)
stmts_out.append(stmt)
return stmts_out
[docs]@register_pipeline
def map_sequence(stmts_in, do_methionine_offset=True,
do_orthology_mapping=True, do_isoform_mapping=True, **kwargs):
"""Map sequences using the SiteMapper.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to map.
do_methionine_offset : boolean
Whether to check for off-by-one errors in site position (possibly)
attributable to site numbering from mature proteins after
cleavage of the initial methionine. If True, checks the reference
sequence for a known modification at 1 site position greater
than the given one; if there exists such a site, creates the
mapping. Default is True.
do_orthology_mapping : boolean
Whether to check sequence positions for known modification sites
in mouse or rat sequences (based on PhosphoSitePlus data). If a
mouse/rat site is found that is linked to a site in the human
reference sequence, a mapping is created. Default is True.
do_isoform_mapping : boolean
Whether to check sequence positions for known modifications
in other human isoforms of the protein (based on PhosphoSitePlus
data). If a site is found that is linked to a site in the human
reference sequence, a mapping is created. Default is True.
use_cache : boolean
If True, a cache will be created/used from the laction specified by
SITEMAPPER_CACHE_PATH, defined in your INDRA config or the environment.
If False, no cache is used. For more details on the cache, see the
SiteMapper class definition.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of mapped statements.
"""
from indra.preassembler.sitemapper import SiteMapper, default_site_map
logger.info('Mapping sites on %d statements...' % len(stmts_in))
sm = SiteMapper(default_site_map,
use_cache=kwargs.pop('use_cache', False),
do_methionine_offset=do_methionine_offset,
do_orthology_mapping=do_orthology_mapping,
do_isoform_mapping=do_isoform_mapping)
valid, mapped = sm.map_sites(stmts_in)
correctly_mapped_stmts = []
for ms in mapped:
correctly_mapped = all([mm.has_mapping() for mm in ms.mapped_mods])
if correctly_mapped:
correctly_mapped_stmts.append(ms.mapped_stmt)
stmts_out = valid + correctly_mapped_stmts
logger.info('%d statements with valid sites' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
del sm
return stmts_out
[docs]@register_pipeline
def run_preassembly(stmts_in, return_toplevel=True, poolsize=None,
size_cutoff=None, belief_scorer=None, ontology=None,
matches_fun=None, refinement_fun=None,
flatten_evidence=False, flatten_evidence_collect_from=None,
normalize_equivalences=False, normalize_opposites=False,
normalize_ns='WM', run_refinement=True, filters=None,
**kwargs):
"""Run preassembly on a list of statements.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to preassemble.
return_toplevel : Optional[bool]
If True, only the top-level statements are returned. If False,
all statements are returned irrespective of level of specificity.
Default: True
poolsize : Optional[int]
The number of worker processes to use to parallelize the
comparisons performed by the function. If None (default), no
parallelization is performed. NOTE: Parallelization is only
available on Python 3.4 and above.
size_cutoff : Optional[int]
Groups with size_cutoff or more statements are sent to worker
processes, while smaller groups are compared in the parent process.
Default value is 100. Not relevant when parallelization is not
used.
belief_scorer : Optional[indra.belief.BeliefScorer]
Instance of BeliefScorer class to use in calculating Statement
probabilities. If None is provided (default), then the default
scorer is used.
ontology : Optional[IndraOntology]
IndraOntology object to use for preassembly
matches_fun : Optional[function]
A function to override the built-in matches_key function of statements.
refinement_fun : Optional[function]
A function to override the built-in refinement_of function of
statements.
flatten_evidence : Optional[bool]
If True, evidences are collected and flattened via supports/supported_by
links. Default: False
flatten_evidence_collect_from : Optional[str]
String indicating whether to collect and flatten evidence from the
`supports` attribute of each statement or the `supported_by` attribute.
If not set, defaults to 'supported_by'.
Only relevant when flatten_evidence is True.
normalize_equivalences : Optional[bool]
If True, equivalent groundings are rewritten to a single standard one.
Default: False
normalize_opposites : Optional[bool]
If True, groundings that have opposites in the ontology are rewritten
to a single standard one.
normalize_ns : Optional[str]
The name space with respect to which equivalences and opposites are
normalized.
filters : Optional[list[:py:class:indra.preassembler.refinement.RefinementFilter]]
A list of RefinementFilter classes that implement filters on
possible statement refinements. For details on how to
construct such a filter, see the documentation of
:py:class:`indra.preassembler.refinement.RefinementFilter`.
If no user-supplied filters are provided, the default ontology-based
filter is applied. If a list of filters is provided here, the
:py:class:`indra.preassembler.refinement.OntologyRefinementFilter`
isn't appended by default, and should be added by the user, if
necessary. Default: None
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
save_unique : Optional[str]
The name of a pickle file to save the unique statements into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of preassembled top-level statements.
"""
dump_pkl_unique = kwargs.get('save_unique')
use_ontology = ontology if ontology is not None else bio_ontology
be = BeliefEngine(scorer=belief_scorer, matches_fun=matches_fun)
pa = Preassembler(use_ontology, stmts_in, matches_fun=matches_fun,
refinement_fun=refinement_fun)
if normalize_equivalences:
logger.info('Normalizing equals on %d statements' % len(pa.stmts))
pa.normalize_equivalences(normalize_ns)
if normalize_opposites:
logger.info('Normalizing opposites on %d statements' % len(pa.stmts))
pa.normalize_opposites(normalize_ns)
dedupl_stmts = run_preassembly_duplicate(pa, be, save=dump_pkl_unique)
if not run_refinement:
return dedupl_stmts
dump_pkl = kwargs.get('save')
size_cutoff = size_cutoff if size_cutoff else 100
if not flatten_evidence_collect_from:
flatten_evidence_collect_from = 'supported_by'
options = {'save': dump_pkl, 'return_toplevel': return_toplevel,
'poolsize': poolsize, 'size_cutoff': size_cutoff,
'flatten_evidence': flatten_evidence,
'flatten_evidence_collect_from': flatten_evidence_collect_from,
'filters': filters
}
stmts_out = run_preassembly_related(pa, be, **options)
return stmts_out
[docs]def run_preassembly_duplicate(preassembler, beliefengine, **kwargs):
"""Run deduplication stage of preassembly on a list of statements.
Parameters
----------
preassembler : indra.preassembler.Preassembler
A Preassembler instance
beliefengine : indra.belief.BeliefEngine
A BeliefEngine instance.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of unique statements.
"""
logger.info('Combining duplicates on %d statements...' %
len(preassembler.stmts))
dump_pkl = kwargs.get('save')
stmts_out = preassembler.combine_duplicates()
beliefengine.set_prior_probs(stmts_out)
logger.info('%d unique statements' % len(stmts_out))
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_by_type(stmts_in, stmt_type, invert=False, **kwargs):
"""Filter to a given statement type.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
stmt_type : str or indra.statements.Statement
The class of the statement type to filter for. Alternatively,
a string matching the name of the statement class, e.g.,
"Activation" can be used.
Example: indra.statements.Modification or "Modification"
invert : Optional[bool]
If True, the statements that are not of the given type
are returned. Default: False
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
if isinstance(stmt_type, str):
stmt_type = get_statement_by_name(stmt_type)
logger.info('Filtering %d statements for type %s%s...' %
(len(stmts_in), 'not ' if invert else '',
stmt_type.__name__))
if not invert:
stmts_out = [st for st in stmts_in if isinstance(st, stmt_type)]
else:
stmts_out = [st for st in stmts_in if not isinstance(st, stmt_type)]
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
def _agent_is_grounded(agent, score_threshold):
grounded = True
db_names = list(set(agent.db_refs.keys()) - set(['TEXT', 'TEXT_NORM']))
# If there are no entries at all other than possibly TEXT
if not db_names:
grounded = False
# If there are entries but they point to None / empty values
if not any([agent.db_refs[db_name] for db_name in db_names]):
grounded = False
# If we are looking for scored groundings with a threshold
if score_threshold:
any_passing = False
for db_name in db_names:
val = agent.db_refs[db_name]
# If it's a list with some values, find the
# highest scoring match and compare to threshold
if isinstance(val, list) and val:
high_score = sorted(val, key=lambda x: x[1],
reverse=True)[0][1]
if high_score > score_threshold:
any_passing = True
break
if not any_passing:
grounded = False
return grounded
def _remove_bound_conditions(agent, keep_criterion):
"""Removes bound conditions of agent such that keep_criterion is False.
Parameters
----------
agent: Agent
The agent whose bound conditions we evaluate
keep_criterion: function
Evaluates removal_criterion(a) for each agent a in a bound condition
and if it evaluates to False, removes a from agent's bound_conditions
"""
new_bc = []
for ind in range(len(agent.bound_conditions)):
if keep_criterion(agent.bound_conditions[ind].agent):
new_bc.append(agent.bound_conditions[ind])
agent.bound_conditions = new_bc
def _any_bound_condition_fails_criterion(agent, criterion):
"""Returns True if any bound condition fails to meet the specified
criterion.
Parameters
----------
agent: Agent
The agent whose bound conditions we evaluate
criterion: function
Evaluates criterion(a) for each a in a bound condition and returns True
if any agents fail to meet the criterion.
Returns
-------
any_meets: bool
True if and only if any of the agents in a bound condition fail to match
the specified criteria
"""
bc_agents = [bc.agent for bc in agent.bound_conditions]
for b in bc_agents:
if not criterion(b):
return True
return False
[docs]@register_pipeline
def filter_grounded_only(stmts_in, score_threshold=None, remove_bound=False,
**kwargs):
"""Filter to statements that have grounded agents.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
score_threshold : Optional[float]
If scored groundings are available in a list and the highest score
if below this threshold, the Statement is filtered out.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
remove_bound: Optional[bool]
If true, removes ungrounded bound conditions from a statement.
If false (default), filters out statements with ungrounded bound
conditions.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements for grounded agents...' %
len(stmts_in))
stmts_out = []
for st in stmts_in:
grounded = True
for agent in st.agent_list():
if agent is not None:
criterion = lambda x: _agent_is_grounded(x, score_threshold)
if not criterion(agent):
grounded = False
break
if not isinstance(agent, Agent):
continue
if remove_bound:
_remove_bound_conditions(agent, criterion)
elif _any_bound_condition_fails_criterion(agent, criterion):
grounded = False
break
if grounded:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
def _agent_is_gene(agent, specific_only):
"""Returns whether an agent is for a gene.
Parameters
----------
agent: Agent
The agent to evaluate
specific_only : Optional[bool]
If True, only elementary genes/proteins evaluate as genes and families
will be filtered out. If False, families are also included.
Returns
-------
is_gene: bool
Whether the agent is a gene
"""
if not specific_only:
if not(agent.db_refs.get('HGNC') or \
agent.db_refs.get('UP') or \
agent.db_refs.get('FPLX')):
return False
else:
if not(agent.db_refs.get('HGNC') or \
agent.db_refs.get('UP')):
return False
return True
[docs]@register_pipeline
def filter_genes_only(stmts_in, specific_only=False, remove_bound=False,
**kwargs):
"""Filter to statements containing genes only.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
specific_only : Optional[bool]
If True, only elementary genes/proteins will be kept and families
will be filtered out. If False, families are also included in the
output. Default: False
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
remove_bound: Optional[bool]
If true, removes bound conditions that are not genes
If false (default), filters out statements with non-gene bound
conditions
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements for ones containing genes only...' %
len(stmts_in))
stmts_out = []
for st in stmts_in:
genes_only = True
for agent in st.agent_list():
if agent is not None:
criterion = lambda a: _agent_is_gene(a, specific_only)
if not criterion(agent):
genes_only = False
break
if remove_bound:
_remove_bound_conditions(agent, criterion)
else:
if _any_bound_condition_fails_criterion(agent, criterion):
genes_only = False
break
if genes_only:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_belief(stmts_in, belief_cutoff, **kwargs):
"""Filter to statements with belief above a given cutoff.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
belief_cutoff : float
Only statements with belief above the belief_cutoff will be returned.
Here 0 < belief_cutoff < 1.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
dump_pkl = kwargs.get('save')
logger.info('Filtering %d statements to above %f belief' %
(len(stmts_in), belief_cutoff))
# The first round of filtering is in the top-level list
stmts_out = []
# Now we eliminate supports/supported-by
for stmt in stmts_in:
if stmt.belief < belief_cutoff:
continue
stmts_out.append(stmt)
supp_by = []
supp = []
for st in stmt.supports:
if st.belief >= belief_cutoff:
supp.append(st)
for st in stmt.supported_by:
if st.belief >= belief_cutoff:
supp_by.append(st)
stmt.supports = supp
stmt.supported_by = supp_by
logger.info('%d statements after filter...' % len(stmts_out))
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_gene_list(stmts_in, gene_list, policy, allow_families=False,
remove_bound=False, invert=False, **kwargs):
"""Return statements that contain genes given in a list.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
gene_list : list[str]
A list of gene symbols to filter for.
policy : str
The policy to apply when filtering for the list of genes. "one": keep
statements that contain at least one of the list of genes and
possibly others not in the list "all": keep statements that only
contain genes given in the list
allow_families : Optional[bool]
Will include statements involving FamPlex families containing one
of the genes in the gene list. Default: False
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
remove_bound: Optional[str]
If true, removes bound conditions that are not genes in the list
If false (default), looks at agents in the bound conditions in addition
to those participating in the statement directly when applying the
specified policy.
invert : Optional[bool]
If True, the statements that do not match according to the policy
are returned. Default: False
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
if policy not in ('one', 'all'):
logger.error('Policy %s is invalid, not applying filter.' % policy)
else:
genes_str = ', '.join(gene_list)
inv_str = 'not ' if invert else ''
logger.info(('Filtering %d statements for ones %scontaining "%s" of: '
'%s...') % (len(stmts_in), inv_str, policy, genes_str))
# If we're allowing families, make a list of all FamPlex IDs that
# contain members of the gene list, and add them to the filter list
filter_list = copy(gene_list)
if allow_families:
for hgnc_name in gene_list:
hgnc_id = hgnc_client.get_hgnc_id(hgnc_name)
if not hgnc_id:
logger.warning('Could not get HGNC ID for %s.' % hgnc_name)
continue
parents = bio_ontology.get_parents('HGNC', hgnc_id)
filter_list += [db_id for db_ns, db_id in parents
if db_ns == 'FPLX']
stmts_out = []
if remove_bound:
# If requested, remove agents whose names are not in the list from
# all bound conditions
if not invert:
keep_criterion = lambda a: a.name in filter_list
else:
keep_criterion = lambda a: a.name not in filter_list
for st in stmts_in:
for agent in st.agent_list():
_remove_bound_conditions(agent, keep_criterion)
if policy == 'one':
for st in stmts_in:
found_gene = False
if not remove_bound:
agent_list = st.agent_list_with_bound_condition_agents()
else:
agent_list = st.agent_list()
for agent in agent_list:
if agent is not None:
if agent.name in filter_list:
found_gene = True
break
if (found_gene and not invert) or (not found_gene and invert):
stmts_out.append(st)
elif policy == 'all':
for st in stmts_in:
found_genes = True
if not remove_bound:
agent_list = st.agent_list_with_bound_condition_agents()
else:
agent_list = st.agent_list()
for agent in agent_list:
if agent is not None:
if agent.name not in filter_list:
found_genes = False
break
if (found_genes and not invert) or (not found_genes and invert):
stmts_out.append(st)
else:
stmts_out = stmts_in
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_concept_names(stmts_in, name_list, policy, invert=False, **kwargs):
"""Return Statements that refer to concepts/agents given as a list of names.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of Statements to filter.
name_list : list[str]
A list of concept/agent names to filter for.
policy : str
The policy to apply when filtering for the list of names. "one": keep
Statements that contain at least one of the list of names and
possibly others not in the list "all": keep Statements that only
contain names given in the list
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
invert : Optional[bool]
If True, the Statements that do not match according to the policy
are returned. Default: False
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered Statements.
"""
if policy not in ('one', 'all'):
logger.error('Policy %s is invalid, not applying filter.' % policy)
else:
name_str = ', '.join(name_list)
inv_str = 'not ' if invert else ''
logger.info(('Filtering %d statements for ones %scontaining "%s" of: '
'%s...') % (len(stmts_in), inv_str, policy, name_str))
stmts_out = []
if policy == 'one':
for st in stmts_in:
found = False
agent_list = st.agent_list()
for agent in agent_list:
if agent is not None:
if agent.name in name_list:
found = True
break
if (found and not invert) or (not found and invert):
stmts_out.append(st)
elif policy == 'all':
for st in stmts_in:
found = True
agent_list = st.agent_list()
for agent in agent_list:
if agent is not None:
if agent.name not in name_list:
found = False
break
if (found and not invert) or (not found and invert):
stmts_out.append(st)
else:
stmts_out = stmts_in
logger.info('%d Statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_by_db_refs(stmts_in, namespace, values, policy, invert=False,
match_suffix=False, **kwargs):
"""Filter to Statements whose agents are grounded to a matching entry.
Statements are filtered so that the db_refs entry (of the given namespace)
of their Agent/Concept arguments take a value in the given list of values.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of Statements to filter.
namespace : str
The namespace in db_refs to which the filter should apply.
values : list[str]
A list of values in the given namespace to which the filter should
apply.
policy : str
The policy to apply when filtering for the db_refs. "one": keep
Statements that contain at least one of the list of db_refs and
possibly others not in the list "all": keep Statements that only
contain db_refs given in the list
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
invert : Optional[bool]
If True, the Statements that do not match according to the policy
are returned. Default: False
match_suffix : Optional[bool]
If True, the suffix of the db_refs entry is matches agains the list
of entries
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered Statements.
"""
if policy not in ('one', 'all'):
logger.error('Policy %s is invalid, not applying filter.' % policy)
return
else:
name_str = ', '.join(values)
rev_mod = 'not ' if invert else ''
logger.info(('Filtering %d statements for those with %s agents %s'
'grounded to: %s in the %s namespace...') %
(len(stmts_in), policy, rev_mod, name_str, namespace))
def meets_criterion(agent):
if namespace not in agent.db_refs:
return False
entry = agent.db_refs[namespace]
if isinstance(entry, list):
entry = entry[0][0]
ret = False
# Match suffix or entire entry
if match_suffix:
if any([entry.endswith(e) for e in values]):
ret = True
else:
if entry in values:
ret = True
# Invert if needed
if invert:
return not ret
else:
return ret
enough = all if policy == 'all' else any
stmts_out = [s for s in stmts_in
if enough([meets_criterion(ag) for ag in s.agent_list()
if ag is not None])]
logger.info('%d Statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_human_only(stmts_in, remove_bound=False, **kwargs):
"""Filter out statements that are grounded, but not to a human gene.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
remove_bound: Optional[bool]
If true, removes all bound conditions that are grounded but not to human
genes. If false (default), filters out statements with boundary
conditions that are grounded to non-human genes.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
from indra.databases import uniprot_client
dump_pkl = kwargs.get('save')
logger.info('Filtering %d statements for human genes only...' %
len(stmts_in))
stmts_out = []
def criterion(agent):
upid = agent.db_refs.get('UP')
if upid and not uniprot_client.is_human(upid):
return False
else:
return True
for st in stmts_in:
human_genes = True
for agent in st.agent_list():
if agent is not None:
if not criterion(agent):
human_genes = False
break
if remove_bound:
_remove_bound_conditions(agent, criterion)
elif _any_bound_condition_fails_criterion(agent, criterion):
human_genes = False
break
if human_genes:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_direct(stmts_in, **kwargs):
"""Filter to statements that are direct interactions
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
def get_is_direct(stmt):
"""Returns true if there is evidence that the statement is a direct
interaction.
If any of the evidences associated with the statement
indicates a direct interatcion then we assume the interaction
is direct. If there is no evidence for the interaction being indirect
then we default to direct.
"""
any_indirect = False
for ev in stmt.evidence:
if ev.epistemics.get('direct') is True:
return True
elif ev.epistemics.get('direct') is False:
# This guarantees that we have seen at least
# some evidence that the statement is indirect
any_indirect = True
if any_indirect:
return False
return True
logger.info('Filtering %d statements to direct ones...' % len(stmts_in))
stmts_out = []
for st in stmts_in:
if get_is_direct(st):
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_no_hypothesis(stmts_in, **kwargs):
"""Filter to statements that are not marked as hypothesis in epistemics.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to no hypothesis...' % len(stmts_in))
stmts_out = []
for st in stmts_in:
all_hypotheses = True
ev = None
for ev in st.evidence:
if not ev.epistemics.get('hypothesis', False):
all_hypotheses = False
break
if ev is None:
all_hypotheses = False
if not all_hypotheses:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_no_negated(stmts_in, **kwargs):
"""Filter to statements that are not marked as negated in epistemics.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to not negated...' % len(stmts_in))
stmts_out = []
for st in stmts_in:
all_negated = True
ev = None
for ev in st.evidence:
if not ev.epistemics.get('negated', False):
all_negated = False
break
if ev is None:
all_negated = False
if not all_negated:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_evidence_source(stmts_in, source_apis, policy='one', **kwargs):
"""Filter to statements that have evidence from a given set of sources.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
source_apis : list[str]
A list of sources to filter for. Examples: biopax, bel, reach
policy : Optional[str]
If 'one', a statement that hase evidence from any of the sources is
kept. If 'all', only those statements are kept which have evidence
from all the input sources specified in source_apis.
If 'none', only those statements are kept that don't have evidence
from any of the sources specified in source_apis.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to evidence source "%s" of: %s...' %
(len(stmts_in), policy, ', '.join(source_apis)))
stmts_out = []
for st in stmts_in:
sources = set([ev.source_api for ev in st.evidence])
if policy == 'one':
if sources.intersection(source_apis):
stmts_out.append(st)
if policy == 'all':
if sources.intersection(source_apis) == set(source_apis):
stmts_out.append(st)
if policy == 'none':
if not sources.intersection(source_apis):
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_retracted_sources(stmts_in, **kwargs):
"""Filter out evidence from retracted sources.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
stmts_out = []
for stmt in stmts_in:
ev_out = []
for ev in stmt.evidence:
pmid = ev.pmid or ev.text_refs.get('PMID')
if pmid and is_retracted(pmid):
continue
ev_out.append(ev)
if ev_out:
stmt.evidence = ev_out
stmts_out.append(stmt)
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_top_level(stmts_in, **kwargs):
"""Filter to statements that are at the top-level of the hierarchy.
Here top-level statements correspond to most specific ones.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements for top-level...' % len(stmts_in))
stmts_out = [st for st in stmts_in if not st.supports]
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_inconsequential_mods(stmts_in, whitelist=None, **kwargs):
"""Filter out Modifications that modify inconsequential sites
Inconsequential here means that the site is not mentioned / tested
in any other statement. In some cases specific sites should be
preserved, for instance, to be used as readouts in a model.
In this case, the given sites can be passed in a whitelist.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
whitelist : Optional[dict]
A whitelist containing agent modification sites whose
modifications should be preserved even if no other statement
refers to them. The whitelist parameter is a dictionary in which
the key is a gene name and the value is a list of tuples of
(modification_type, residue, position). Example:
whitelist = {'MAP2K1': [('phosphorylation', 'S', '222')]}
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
if whitelist is None:
whitelist = {}
logger.info('Filtering %d statements to remove' % len(stmts_in) +
' inconsequential modifications...')
states_used = whitelist
for stmt in stmts_in:
for agent in stmt.agent_list():
if agent is not None:
if agent.mods:
for mc in agent.mods:
mod = (mc.mod_type, mc.residue, mc.position)
try:
states_used[agent.name].append(mod)
except KeyError:
states_used[agent.name] = [mod]
for k, v in states_used.items():
states_used[k] = list(set(v))
stmts_out = []
for stmt in stmts_in:
skip = False
if isinstance(stmt, Modification):
mod_type = modclass_to_modtype[stmt.__class__]
if isinstance(stmt, RemoveModification):
mod_type = modtype_to_inverse[mod_type]
mod = (mod_type, stmt.residue, stmt.position)
used = states_used.get(stmt.sub.name, [])
if mod not in used:
skip = True
if not skip:
stmts_out.append(stmt)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_inconsequential_acts(stmts_in, whitelist=None, **kwargs):
"""Filter out Activations that modify inconsequential activities
Inconsequential here means that the site is not mentioned / tested
in any other statement. In some cases specific activity types should be
preserved, for instance, to be used as readouts in a model.
In this case, the given activities can be passed in a whitelist.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
whitelist : Optional[dict]
A whitelist containing agent activity types which should be preserved
even if no other statement refers to them.
The whitelist parameter is a dictionary in which
the key is a gene name and the value is a list of activity types.
Example: whitelist = {'MAP2K1': ['kinase']}
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
if whitelist is None:
whitelist = {}
logger.info('Filtering %d statements to remove' % len(stmts_in) +
' inconsequential activations...')
states_used = whitelist
for stmt in stmts_in:
for agent in stmt.agent_list():
if agent is not None:
if agent.activity:
act = agent.activity.activity_type
try:
states_used[agent.name].append(act)
except KeyError:
states_used[agent.name] = [act]
for k, v in states_used.items():
states_used[k] = list(set(v))
stmts_out = []
for stmt in stmts_in:
skip = False
if isinstance(stmt, RegulateActivity):
used = states_used.get(stmt.obj.name, [])
if stmt.obj_activity not in used:
skip = True
if not skip:
stmts_out.append(stmt)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
def get_unreachable_mods(stmts_in):
mods_set = {}
for stmt in stmts_in:
if isinstance(stmt, Modification):
mod_type = modclass_to_modtype[stmt.__class__]
if isinstance(stmt, RemoveModification):
mod_type = modtype_to_inverse[mod_type]
mod = (mod_type, stmt.residue, stmt.position)
if stmt.sub.name not in mods_set:
mods_set[stmt.sub.name] = set([mod])
else:
mods_set[stmt.sub.name].add(mod)
unreachable_mods = {}
for stmt in stmts_in:
for agent in stmt.agent_list():
if agent is None or not agent.mods:
continue
for mc in agent.mods:
mod = (mc.mod_type, mc.residue, mc.position)
if mod not in mods_set.get(agent.name, []):
msg = '%s not reachable for %s' % (mod, agent.name)
logger.warning(msg)
if agent.name not in unreachable_mods:
unreachable_mods[agent.name] = set([mod])
else:
unreachable_mods[agent.name].add(mod)
return unreachable_mods
[docs]@register_pipeline
def filter_mutation_status(stmts_in, mutations, deletions, **kwargs):
"""Filter statements based on existing mutations/deletions
This filter helps to contextualize a set of statements to a given
cell type. Given a list of deleted genes, it removes statements that refer
to these genes. It also takes a list of mutations and removes statements
that refer to mutations not relevant for the given context.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
mutations : dict
A dictionary whose keys are gene names, and the values are lists of
tuples of the form (residue_from, position, residue_to).
Example: mutations = {'BRAF': [('V', '600', 'E')]}
deletions : list
A list of gene names that are deleted.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
if 'remove_bound' in kwargs and kwargs['remove_bound']:
remove_bound = True
else:
remove_bound = False
def criterion(agent):
if agent is not None and agent.name in deletions:
return False
if agent is not None and agent.mutations:
muts = mutations.get(agent.name, [])
for mut in agent.mutations:
mut_tup = (mut.residue_from, mut.position, mut.residue_to)
if mut_tup not in muts:
return False
return True
logger.info('Filtering %d statements for mutation status...' %
len(stmts_in))
stmts_out = []
for stmt in stmts_in:
skip = False
for agent in stmt.agent_list():
if not criterion(agent):
skip = True
break
if remove_bound:
_remove_bound_conditions(agent, criterion)
elif _any_bound_condition_fails_criterion(agent, criterion):
skip = True
break
if not skip:
stmts_out.append(stmt)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_enzyme_kinase(stmts_in, **kwargs):
"""Filter Phosphorylations to ones where the enzyme is a known kinase.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to remove ' % len(stmts_in) +
'phosphorylation by non-kinases...')
path = os.path.dirname(os.path.abspath(__file__))
kinase_table = read_unicode_csv(path + '/../resources/kinases.tsv',
delimiter='\t')
gene_names = [lin[1] for lin in list(kinase_table)[1:]]
stmts_out = []
for st in stmts_in:
if isinstance(st, Phosphorylation):
if st.enz is not None:
if st.enz.name in gene_names:
stmts_out.append(st)
else:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_mod_nokinase(stmts_in, **kwargs):
"""Filter non-phospho Modifications to ones with a non-kinase enzyme.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to remove ' % len(stmts_in) +
'non-phospho modifications by kinases...')
path = os.path.dirname(os.path.abspath(__file__))
kinase_table = read_unicode_csv(path + '/../resources/kinases.tsv',
delimiter='\t')
gene_names = [lin[1] for lin in list(kinase_table)[1:]]
stmts_out = []
for st in stmts_in:
if isinstance(st, Modification) and not \
isinstance(st, Phosphorylation):
if st.enz is not None:
if st.enz.name not in gene_names:
stmts_out.append(st)
else:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_transcription_factor(stmts_in, **kwargs):
"""Filter out RegulateAmounts where subject is not a transcription factor.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements to remove ' % len(stmts_in) +
'amount regulations by non-transcription-factors...')
path = os.path.dirname(os.path.abspath(__file__))
tf_table = \
read_unicode_csv(path + '/../resources/transcription_factors.csv')
gene_names = [lin[1] for lin in list(tf_table)[1:]]
stmts_out = []
for st in stmts_in:
if isinstance(st, RegulateAmount):
if st.subj is not None:
if st.subj.name in gene_names:
stmts_out.append(st)
else:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_uuid_list(stmts_in, uuids, invert=True, **kwargs):
"""Filter to Statements corresponding to given UUIDs
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
uuids : list[str]
A list of UUIDs to filter for.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
invert : Optional[bool]
Invert the filter to remove the Statements corresponding to the given
UUIDs.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered statements.
"""
logger.info('Filtering %d statements for %d UUID%s...' %
(len(stmts_in), len(uuids), 's' if len(uuids) > 1 else ''))
stmts_out = []
for st in stmts_in:
if not invert:
if st.uuid in uuids:
stmts_out.append(st)
else:
if st.uuid not in uuids:
stmts_out.append(st)
logger.info('%d statements after filter...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def filter_by_curation(stmts_in, curations, incorrect_policy='any',
correct_tags=None, update_belief=True):
"""Filter out statements and update beliefs based on curations.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to filter.
curations : list[dict]
A list of curations for evidences. Curation object should have
(at least) the following attributes:
pa_hash (preassembled statement hash), source_hash (evidence hash) and
tag (e.g. 'correct', 'wrong_relation', etc.)
incorrect_policy : str
A policy for filtering out statements given incorrect curations. The
'any' policy filters out a statement if at least one of its evidences
is curated as incorrect and no evidences are curated as correct, while
the 'all' policy only filters out a statement if all of its evidences
are curated as incorrect.
correct_tags : list[str] or None
A list of tags to be considered correct. If no tags are provided,
only the 'correct' tag is considered correct.
update_belief : Option[bool]
If True, set a belief score to 1 for statements curated as correct.
Default: True
"""
if correct_tags is None:
correct_tags = ['correct']
# Here correct is a set of hashes of statements that were curated as
# correct (it's not taken into account whether they also have incorrect
# curations). Incorrect is a set of hashes of statements that only have
# incorrect curations (for all or some of the evidences). These sets do
# not intersect.
correct = {c['pa_hash'] for c in curations if c['tag'] in correct_tags}
incorrect = {c['pa_hash'] for c in curations if c['pa_hash'] not in correct}
# Store evidence level curations for overall correct statements
correct_stmt_evid = {}
for c in curations:
pa_hash = c['pa_hash']
if pa_hash in correct:
if pa_hash not in correct_stmt_evid:
correct_stmt_evid[pa_hash] = defaultdict(set)
if c['tag'] in correct_tags:
correct_stmt_evid[pa_hash]['correct'].add(c['source_hash'])
else:
correct_stmt_evid[pa_hash]['incorrect'].add(c['source_hash'])
stmts_out = []
logger.info('Filtering %d statements with %s incorrect curations...' %
(len(stmts_in), incorrect_policy))
def _is_incorrect(stmt_hash, evid_hash):
# Evidence is incorrect if it was only curated as incorrect
if evid_hash in correct_stmt_evid[stmt_hash]['incorrect'] and \
evid_hash not in correct_stmt_evid[stmt_hash]['correct']:
return True
return False
def process_and_append(stmt, stmt_list):
# Filter out incorrect evidences for correct statements
stmt_hash = stmt.get_hash() # Already refreshed when this is called
if stmt_hash in correct_stmt_evid:
evidence = []
for evid in stmt.evidence:
if _is_incorrect(stmt_hash, evid.get_source_hash()):
continue
else:
evidence.append(evid)
# If no evidence left, skip the statement
if len(evidence) == 0:
return
stmt.evidence = evidence
# Set belief to one for statements with correct curations
if update_belief and stmt_hash in correct:
stmt.belief = 1
stmt_list.append(stmt)
if incorrect_policy == 'any':
# Filter statements that have SOME incorrect and NO correct curations
# (i.e. their hashes are in incorrect set)
for stmt in stmts_in:
stmt_hash = stmt.get_hash(refresh=True)
if stmt_hash not in incorrect:
process_and_append(stmt, stmts_out)
elif incorrect_policy == 'all':
# Filter out statements in which ALL evidences are curated
# as incorrect.
# First, map curated statements to curated evidences.
incorrect_stmt_evid = defaultdict(set)
for c in curations:
if c['pa_hash'] in incorrect:
incorrect_stmt_evid[c['pa_hash']].add(c['source_hash'])
for stmt in stmts_in:
# Compare set of evidence hashes of given statements to set of
# hashes of curated evidences.
stmt_hash = stmt.get_hash(refresh=True)
if stmt_hash in incorrect_stmt_evid and (
{ev.get_source_hash() for ev in stmt.evidence} <=
incorrect_stmt_evid[stmt_hash]):
continue
else:
process_and_append(stmt, stmts_out)
logger.info('%d statements after filter...' % len(stmts_out))
return stmts_out
[docs]@register_pipeline
def expand_families(stmts_in, **kwargs):
"""Expand FamPlex Agents to individual genes.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to expand.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of expanded statements.
"""
from indra.tools.expand_families import Expander
logger.info('Expanding families on %d statements...' % len(stmts_in))
expander = Expander(bio_ontology)
stmts_out = expander.expand_families(stmts_in)
logger.info('%d statements after expanding families...' % len(stmts_out))
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def reduce_activities(stmts_in, **kwargs):
"""Reduce the activity types in a list of statements
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to reduce activity types in.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of reduced activity statements.
"""
logger.info('Reducing activities on %d statements...' % len(stmts_in))
stmts_out = [deepcopy(st) for st in stmts_in]
ml = MechLinker(stmts_out)
ml.gather_explicit_activities()
ml.reduce_activities()
stmts_out = ml.statements
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def strip_agent_context(stmts_in, **kwargs):
"""Strip any context on agents within each statement.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements whose agent context should be stripped.
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of stripped statements.
"""
logger.info('Stripping agent context on %d statements...' % len(stmts_in))
stmts_out = []
for st in stmts_in:
new_st = deepcopy(st)
for agent in new_st.agent_list():
if agent is None:
continue
agent.mods = []
agent.mutations = []
agent.activity = None
agent.location = None
agent.bound_conditions = []
stmts_out.append(new_st)
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]@register_pipeline
def standardize_names_groundings(stmts):
"""Standardize the names of Concepts with respect to an ontology.
NOTE: this function is currently optimized for Influence Statements
obtained from Eidos, Hume, Sofia and CWMS. It will possibly yield
unexpected results for biology-specific Statements.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of statements whose Concept names should be standardized.
"""
print('Standardize names to groundings')
for stmt in stmts:
for concept in stmt.agent_list():
db_ns, db_id = concept.get_grounding()
if db_id is not None:
if isinstance(db_id, list):
db_id = db_id[0][0].split('/')[-1]
else:
db_id = db_id.split('/')[-1]
db_id = db_id.replace('|', ' ')
db_id = db_id.replace('_', ' ')
db_id = db_id.replace('ONT::', '')
db_id = db_id.capitalize()
concept.name = db_id
return stmts
[docs]def dump_stmt_strings(stmts, fname):
"""Save printed statements in a file.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements to save in a text file.
fname : Optional[str]
The name of a text file to save the printed statements into.
"""
with open(fname, 'wb') as fh:
for st in stmts:
fh.write(('%s\n' % st).encode('utf-8'))
[docs]@register_pipeline
def rename_db_ref(stmts_in, ns_from, ns_to, **kwargs):
"""Rename an entry in the db_refs of each Agent.
This is particularly useful when old Statements in pickle files
need to be updated after a namespace was changed such as
'BE' to 'FPLX'.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements whose Agents' db_refs need to be changed
ns_from : str
The namespace identifier to replace
ns_to : str
The namespace identifier to replace to
save : Optional[str]
The name of a pickle file to save the results (stmts_out) into.
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of Statements with Agents' db_refs changed.
"""
logger.info('Remapping "%s" to "%s" in db_refs on %d statements...' %
(ns_from, ns_to, len(stmts_in)))
stmts_out = [deepcopy(st) for st in stmts_in]
for stmt in stmts_out:
for agent in stmt.agent_list():
if agent is not None and ns_from in agent.db_refs:
agent.db_refs[ns_to] = agent.db_refs.pop(ns_from)
dump_pkl = kwargs.get('save')
if dump_pkl:
dump_statements(stmts_out, dump_pkl)
return stmts_out
[docs]def align_statements(stmts1, stmts2, keyfun=None):
"""Return alignment of two lists of statements by key.
Parameters
----------
stmts1 : list[indra.statements.Statement]
A list of INDRA Statements to align
stmts2 : list[indra.statements.Statement]
A list of INDRA Statements to align
keyfun : Optional[function]
A function that takes a Statement as an argument
and returns a key to align by. If not given,
the default key function is a tuble of the names
of the Agents in the Statement.
Return
------
matches : list(tuple)
A list of tuples where each tuple has two elements,
the first corresponding to an element of the stmts1
list and the second corresponding to an element
of the stmts2 list. If a given element is not matched,
its corresponding pair in the tuple is None.
"""
def name_keyfun(stmt):
return tuple(a.name if a is not None else None for
a in stmt.agent_list())
if not keyfun:
keyfun = name_keyfun
matches = []
keys1 = [keyfun(s) for s in stmts1]
keys2 = [keyfun(s) for s in stmts2]
for stmt, key in zip(stmts1, keys1):
try:
match_idx = keys2.index(key)
match_stmt = stmts2[match_idx]
matches.append((stmt, match_stmt))
except ValueError:
matches.append((stmt, None))
for stmt, key in zip(stmts2, keys2):
try:
match_idx = keys1.index(key)
except ValueError:
matches.append((None, stmt))
return matches
[docs]@register_pipeline
def filter_complexes_by_size(stmts_in, members_allowed=5):
"""Filter out Complexes if the number of members exceeds specified allowed
number.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of statements from which large Complexes need to be filtered out
members_allowed : Optional[int]
Allowed number of members to include. Default: 5
Returns
-------
stmts_out : list[indra.statements.Statement]
A list of filtered Statements.
"""
stmts_out = []
logger.info('Filtering out Complexes with more than %d members from %d '
'statements...' % (members_allowed, len(stmts_in)))
for stmt in stmts_in:
if isinstance(stmt, Complex) and len(stmt.members) > members_allowed:
continue
else:
stmts_out.append(stmt)
logger.info('%d statements after filter...' % len(stmts_out))
return stmts_out
def _load_db_refs_map():
filepath = get_resource_path('db_refs_map.csv')
rows = read_unicode_csv(filepath)
db_refs_map = {}
for (db_ns, old_id, new_id) in rows:
db_refs_map[(db_ns, old_id)] = new_id
return db_refs_map
[docs]@register_pipeline
def map_db_refs(stmts_in, db_refs_map=None):
"""Update entries in db_refs to those provided in db_refs_map.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of INDRA Statements to update db_refs in.
db_refs_map : Optional[dict]
A dictionary where each key is a tuple (db_ns, db_id) representing old
db_refs pair that has to be updated and each value is a new db_id to
replace the old value with. If not provided, the default db_refs_map
will be loaded.
"""
if not db_refs_map:
db_refs_map = _load_db_refs_map()
stmts_out = []
def update_agent_db_refs(ag_db_refs, db_refs_map):
for (db_ns, old_db_id), new_id in db_refs_map.items():
if ag_db_refs.get(db_ns) == old_db_id:
ag_db_refs[db_ns] = new_id
return ag_db_refs
for stmt in stmts_in:
new_stmt = deepcopy(stmt)
for ag in new_stmt.agent_list():
if ag is not None:
ag.db_refs = update_agent_db_refs(ag.db_refs, db_refs_map)
stmts_out.append(new_stmt)
return stmts_out
[docs]@register_pipeline
def strip_supports(stmts):
"""Remove supports and supported by from statements."""
logger.info('Removing supports and supported by from statements')
for stmt in stmts:
stmt.supports = []
stmt.supported_by = []
return stmts
[docs]@register_pipeline
def run_mechlinker(
stmts_in, reduce_activities=False, reduce_modifications=False,
replace_activations=False, require_active_forms=False, implicit=False):
"""Instantiate MechLinker and run its methods in defined order.
Parameters
----------
stmts_in : list[indra.statements.Statement]
A list of INDRA Statements to run mechanism linking on.
reduce_activities : Optional[bool]
If True, agent activities are reduced to their most specific,
unambiguous form. Default: False
reduce_modifications : Optional[bool]
If True, agent modifications are reduced to their most specific,
unambiguous form. Default: False
replace_activations : Optional[bool]
If True, if there is compatible pair of Modification(X, Y) and
ActiveForm(Y) statements, then any Activation(X,Y) statements
are filtered out. Default: False
require_active_forms : Optional[bool]
If True, agents in active positions are rewritten to be in their
active forms. Default: False
implicit : Optional[bool]
If True, active forms of an agent are inferred from multiple statement
types implicitly, otherwise only explicit ActiveForm statements
are taken into account. Default: False
Returns
-------
list[indra.statements.Statement]
A list of INDRA Statements that have gone through mechanism linking.
"""
ml = MechLinker(stmts_in)
if reduce_activities:
if implicit:
ml.gather_implicit_activities()
else:
ml.gather_explicit_activities()
ml.reduce_activities()
if reduce_modifications:
ml.gather_modifications()
ml.reduce_modifications()
if replace_activations:
if implicit:
ml.gather_implicit_activities()
else:
ml.gather_explicit_activities()
ml.replace_activations()
if require_active_forms:
if implicit:
ml.gather_implicit_activities()
else:
ml.gather_explicit_activities()
ml.require_active_forms()
return ml.statements
[docs]@register_pipeline
def filter_inconsequential(
stmts, mods=True, mod_whitelist=None, acts=True, act_whitelist=None):
"""Keep filtering inconsequential modifications and activities until there
is nothing else to filter.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of INDRA Statements to filter.
mods : Optional[bool]
If True, inconsequential modifications are filtered out.
Default: True
mod_whitelist : Optional[dict]
A whitelist containing agent modification sites whose
modifications should be preserved even if no other statement
refers to them. The whitelist parameter is a dictionary in which
the key is a gene name and the value is a list of tuples of
(modification_type, residue, position). Example:
whitelist = {'MAP2K1': [('phosphorylation', 'S', '222')]}
acts : Optional[bool]
If True, inconsequential activations are filtered out.
Default: True
act_whitelist : Optional[dict]
A whitelist containing agent activity types which should be preserved
even if no other statement refers to them.
The whitelist parameter is a dictionary in which
the key is a gene name and the value is a list of activity types.
Example: whitelist = {'MAP2K1': ['kinase']}
Returns
-------
list[indra.statements.Statement]
The filtered list of statements.
"""
num_stmts = len(stmts)
while True:
if mods:
stmts = filter_inconsequential_mods(stmts, mod_whitelist)
if acts:
stmts = filter_inconsequential_acts(stmts, act_whitelist)
if num_stmts == len(stmts):
break
num_stmts = len(stmts)
return stmts
[docs]@register_pipeline
def fix_invalidities(stmts: List[Statement],
in_place: bool = False,
print_report_before: bool = False,
print_report_after: bool = False,
prior_hash_annots: bool = False) -> List[Statement]:
"""Fix invalidities in a list of statements.
Parameters
----------
stmts : list[indra.statements.Statement]
A list of statements to fix invalidities in
in_place : Optional[bool]
If True, the statement objects are changed in place if an invalidity
is fixed. Otherwise, a deepcopy is done before running fixes.
print_report_before : Optional[bool]
Run and print a validation report on the statements before running
fixing.
print_report_after : Optional[bool]
Run and print a validation report on the statements after running
fixing to check if any issues remain that weren't handled by the
fixing module.
prior_hash_annots : Optional[bool]
If True, an annotation is added to each evidence of a statement
with the hash of the statement prior to any fixes being applied.
This is useful if this function is applied as a post-processing
step on assembled statements and it is necessary to refer back
to the original hash of statements before an invalidity fix
here potentially changes it. Default: False
Returns
-------
statements : list[indra.statements.Statement]
The list of statements with validation issues fixed and some
invalid statements filtered out.
"""
logger.info('Fixing invalidities in %d statements' % len(stmts))
if print_report_before:
logger.info('Any invalidities detected before fixing are printed below')
print_validation_report(stmts)
if not in_place:
logger.info('Making deepcopy of statements')
stmts = deepcopy(stmts)
# If desired, we add prior hash annotations to each evidence
if prior_hash_annots:
for stmt in stmts:
for ev in stmt.evidence:
ev.annotations['prior_hash'] = stmt.get_hash()
# And now apply the fixing function
stmts_out = indra.tools.fix_invalidities.fix_invalidities(stmts)
if print_report_after:
logger.info('Any remaining detected invalidities are printed below')
print_validation_report(stmts_out)
logger.info('%d statements after validity fixing' % len(stmts_out))
return stmts_out