__all__ = ['WormBaseProcessor']
import re
import tqdm
import logging
from indra.statements import *
from indra.ontology.standardize import standardize_name_db_refs
logger = logging.getLogger(__name__)
[docs]
class WormBaseProcessor(object):
"""Extracts INDRA statements from WormBase interaction data.
Miscellaneous info for WormBase interaction data (genetic and molecular):
Unique source databases:
['wormbase' 'biogrid' 'MINT' 'IntAct' 'UniProt' 'DIP']
Unique agent ID types:
['wormbase' 'entrez gene/locuslink' 'uniprotkb' 'intact']
Unique interaction ID types:
['wormbase' 'biogrid' 'intact' 'mint' 'imex' 'dip' 'wwpdb' 'emdb']
Parameters
----------
data :
Raw data from WormBase to be processed.
Attributes
----------
statements : list[indra.statements.Statements]
Extracted INDRA statements.
"""
def __init__(self, data, mappings_df):
self.statements = []
self.rows = data
self.mappings_df = mappings_df
# Transform 'dbXrefs' column in mappings_df
self.mappings_df['dbXrefs'] = \
self.mappings_df['dbXrefs'].apply(self._id_conversion)
# Create new column 'wormbase_id' that holds each gene's WormBase identifier
self.mappings_df['wormbase_id'] = self.mappings_df['dbXrefs'].apply(
lambda x: x.get('WormBase')[0] if isinstance(x, dict) and 'WormBase' in x
else x.get('WB')[0] if isinstance(x, dict) and 'WB' in x
else None
)
# Convert mappings to dictionaries for quick lookups
self.wb_to_entrez_dict = \
self.mappings_df.set_index('wormbase_id')['GeneID'].to_dict()
self.entrez_to_wb_dict = \
self.mappings_df.set_index('GeneID')['wormbase_id'].to_dict()
self.entrez_to_symbol_dict = \
self.mappings_df.set_index('GeneID')['Symbol'].to_dict()
self.symbol_to_annotation_dict = \
self.mappings_df.set_index('GeneID').to_dict(orient='index')
# Process the rows into Statements
for idx, wb_row in enumerate(tqdm.tqdm(self.rows,
desc='Processing WormBase rows')):
try:
self.process_row(wb_row)
except Exception as e:
logger.error(f"Error occurred at row {idx}: {e}")
def get_agent_name(self, aliases, alt_ids):
# Get the name of agent A
name = None
alias_info = \
self._alias_conversion(aliases) if isinstance(aliases, str) else {}
alt_ids_info = \
self._id_conversion(alt_ids) if isinstance(alt_ids, str) else {}
# If agent alias is empty, look for a valid name in alternate IDs
if not alias_info:
if not alt_ids_info:
logger.warning(
f"Agent alias and alternate ID dicts for "
f"interactor A are empty: {aliases}, {alt_ids}")
# If the alternate ids dict is not empty, look for names in the
# order below, with 'entrez gene/locuslink' and lowercase preferred.
else:
all_lowercase_names = []
all_uppercase_names = []
for key in ['entrez gene/locuslink', 'uniprot/swiss-prot',
'biogrid']:
if alt_ids_info.get(key):
lowercase_names = \
[s for s in (alt_ids_info.get(key) or [])
if s.islower()]
uppercase_names = \
[s for s in (alt_ids_info.get(key) or [])
if not s.islower()]
if lowercase_names:
all_lowercase_names.extend(lowercase_names)
if uppercase_names:
all_uppercase_names.extend(uppercase_names)
if all_lowercase_names:
name = all_lowercase_names[0]
elif all_uppercase_names:
name = all_uppercase_names[0]
else:
# If no names were found above, use whatever first value
# is in the alt. ids dict as a fallback
name = next(iter(alt_ids_info.values()), [None])[0]
# If the alias dict is not empty, look for names in the order below,
# with 'public_name' and
else:
# lowercase preferred.
all_lowercase_names = []
all_uppercase_names = []
for key in ['public_name', 'gene name', 'display_short',
'gene name synonym']:
if alias_info.get(key):
lowercase_names = \
[s for s in (alias_info.get(key) or [])
if s.islower()]
uppercase_names = \
[s for s in (alias_info.get(key) or [])
if not s.islower()]
if lowercase_names:
all_lowercase_names.extend(lowercase_names)
if uppercase_names:
all_uppercase_names.extend(uppercase_names)
if all_lowercase_names:
name = all_lowercase_names[0]
elif all_uppercase_names:
name = all_uppercase_names[0]
else:
# If no names were found above, use whatever first value is in
# the alias dict as a fallback
name = next(iter(alias_info.values()), [None])[0]
return name
def get_agent_ids(self, ids, alt_ids):
# Get db_refs using wb_row.ids_interactor_(a/b)
wormbase_id = None
entrez_id = None
up_id = None
intact_id = None
db_id_info = self._id_conversion(ids) or {}
alt_db_id_info = self._id_conversion(alt_ids) or {}
if not db_id_info:
logger.warning(f"No db_refs found for interactor A: "
f"{ids}, {alt_ids}")
else:
if db_id_info.get('wormbase'):
wormbase_id = db_id_info.get('wormbase')[0]
# Some WB ids are stored as an alternate id under 'ensemblgenomes'
elif alt_db_id_info.get('ensemblgenomes') and 'WBGene' in \
alt_db_id_info.get('ensemblgenomes'):
wormbase_id = alt_db_id_info.get('ensemblgenomes')[0]
if db_id_info.get('entrez gene/locuslink'):
entrez_id = db_id_info.get('entrez gene/locuslink')[0]
# If an entrez ID isn't found but a WB ID is, use mappings file
# to get
elif wormbase_id:
entrez_id = self.wb_to_entrez_dict.get(wormbase_id) or None
# If WB ID isn't found but an entrez ID is,
if not wormbase_id and entrez_id:
wormbase_id = self.entrez_to_wb_dict.get(entrez_id) or None
if db_id_info.get('uniprotkb'):
up_id = db_id_info.get('uniprotkb')[0]
if db_id_info.get('intact'):
intact_id = db_id_info.get('intact')[0]
return wormbase_id, entrez_id, up_id, intact_id
def override_agent_name(self, name, entrez_id):
if entrez_id:
entrez_name = \
self.entrez_to_symbol_dict.get(entrez_id) or None
if entrez_name and name and name != entrez_name:
logger.debug(f"Replacing name for interactor with Entrez "
f"symbol: {name} --> {entrez_name}")
name = entrez_name
return name
def get_agent_role_info(self, interactor_types, interactor_bio_types,
interactor_exp_types):
interactor_type_info = \
self._type_role_conversion(interactor_types) if \
interactor_types else {}
interactor_bio_role_info = \
self._type_role_conversion(interactor_bio_types) if \
interactor_bio_types else {}
interactor_exp_role_info = \
self._type_role_conversion(interactor_exp_types) if \
interactor_exp_types else {}
interactor_type = None
biological_role = None
experimental_role = None
if interactor_type_info.get('psi-mi'):
interactor_type = interactor_type_info.get('psi-mi')[0]
if interactor_bio_role_info.get('psi-mi'):
biological_role = interactor_bio_role_info.get('psi-mi')[0]
if interactor_exp_role_info.get('psi-mi'):
experimental_role = interactor_exp_role_info.get('psi-mi')[0]
return interactor_type, biological_role, experimental_role
def process_row(self, wb_row):
name_agent_a = self.get_agent_name(wb_row.aliases_interactor_a,
wb_row.alt_ids_interactor_a)
name_agent_b = self.get_agent_name(wb_row.aliases_interactor_b,
wb_row.alt_ids_interactor_b)
wormbase_id_agent_a, entrez_id_agent_a, up_id_agent_a, \
intact_id_agent_a = self.get_agent_ids(wb_row.ids_interactor_a,
wb_row.alt_ids_interactor_a)
wormbase_id_agent_b, entrez_id_agent_b, up_id_agent_b, \
intact_id_agent_b = self.get_agent_ids(wb_row.ids_interactor_b,
wb_row.alt_ids_interactor_b)
# If agent name doesn't match the corresponding name in the
# wormbase-to-entrez ID mapping file, replace it with the name in
# that file.
name_agent_a = self.override_agent_name(name_agent_a,
entrez_id_agent_a)
name_agent_b = self.override_agent_name(name_agent_b,
entrez_id_agent_b)
# Ground agents
agent_a = self._make_agent(name_agent_a, wormbase_id_agent_a,
entrez_id_agent_a, up_id_agent_a,
intact_id_agent_a) or {}
agent_b = self._make_agent(name_agent_b, wormbase_id_agent_b,
entrez_id_agent_b, up_id_agent_b,
intact_id_agent_b) or {}
# Skip any agents with no grounding
if agent_a is None or agent_b is None:
return
# Get evidence
pmid = None
doi = None
pub_id_info = self._id_conversion(wb_row.publication_identifiers) or {}
if not pub_id_info:
logger.warning(f"No publication info found: {wb_row}")
else:
if pub_id_info.get('pubmed'):
pmid = pub_id_info.get('pubmed')[0]
if pub_id_info.get('doi'):
doi = pub_id_info.get('doi')[0]
# TODO: mint and imex IDs are also available
#if pub_id_info.get('mint'):
# mint = pub_id_info.get('mint')[0]
#if pub_id_info.get('imex'):
# imex = pub_id_info.get('imex')[0]
text_refs = {}
if pmid:
text_refs['PMID'] = pmid
if doi:
text_refs['DOI'] = doi
# Prefer wormbase to get source ID if possible, otherwise choose
# the first alternative
int_id_info = self._id_conversion(wb_row.interaction_identifiers)
source = 'wormbase' if 'wormbase' in int_id_info else \
sorted(int_id_info)[0]
source_id = f'{source}:{int_id_info.get(source)[0]}' \
if source else None
# Incorporate info from the wormbase-to-entrez ID mapping file
# into Evidence as annotations
full_annotations = {}
full_annotations['interaction_info'] = wb_row._asdict()
full_annotations['entrez_info_agent_a'] = {}
full_annotations['entrez_info_agent_b'] = {}
if entrez_id_agent_a:
full_annotations['entrez_info_agent_a'] = \
self.symbol_to_annotation_dict.get(entrez_id_agent_a) or {}
if entrez_id_agent_b:
full_annotations['entrez_info_agent_b'] = \
self.symbol_to_annotation_dict.get(entrez_id_agent_b) or {}
ev = Evidence(source_api='wormbase',
source_id=source_id,
pmid=pmid,
text_refs=text_refs,
annotations=full_annotations)
# Make statement
int_type_info = \
self._type_role_conversion(wb_row.interaction_types) or {}
if not int_type_info:
logger.warning(f"No interaction type found: {wb_row}")
else:
if int_type_info.get('psi-mi'):
interaction_type = int_type_info.get('psi-mi')[0]
else:
key = next(iter(int_type_info), None)
interaction_type = (int_type_info.get(key) or [None])[0]
# Only necessary to get interactor type, biological role,
# and experimental role for one agent
agent_a_type, agent_a_bio_role, agent_a_exp_role = \
self.get_agent_role_info(wb_row.types_interactor_a,
wb_row.biological_roles_interactor_a,
wb_row.experimental_roles_interactor_a)
# TODO: Decide how/whether to use agent type (protein, gene, DNA,
# or RNA) to determine role.
subj = None
obj = None
is_two_hybrid = False
if agent_a_bio_role in ['enzyme', 'inhibitor'] or \
agent_a_exp_role in ['suppressor gene', 'enhancer gene',
'epistatic gene']:
subj = agent_a
obj = agent_b
elif agent_a_bio_role in ['enzyme target'] or \
agent_a_exp_role in ['suppressed gene', 'enhanced gene',
'hypostatic gene']:
subj = agent_b
obj = agent_a
elif agent_a_exp_role in ['bait', 'prey']:
is_two_hybrid = True
else:
return # Only continue to statement creation if subject and
# object are specified or interaction is found through a
# two-hybrid screen.
# TODO: Decide how/whether to use remaining interaction types
# Omit types 'mutual genetic enhancement' and 'mutual genetic
# enhancement (sensu unexpected)' for now and only use the
# 'genetic enhancement' type.
if 'genetic enhancement' in interaction_type and \
'mutual' not in interaction_type:
s = IncreaseAmount(subj, obj, evidence=ev)
elif any(x in interaction_type for x in
['suppression', 'epistasis (sensu Bateson)']):
s = DecreaseAmount(subj, obj, evidence=ev)
elif 'phosphorylation reaction' in interaction_type:
s = Phosphorylation(subj, obj, evidence=ev)
elif 'demethylation reaction' in interaction_type:
s = Demethylation(subj, obj, evidence=ev)
elif 'methylation reaction' in interaction_type:
s = Methylation(subj, obj, evidence=ev)
# Special case where agents do not have a subject-object
# relationship
elif is_two_hybrid:
s = Complex([agent_a, agent_b], evidence=ev)
else:
return
self.statements.append(s)
@staticmethod
def _make_agent(symbol, wormbase_id, entrez_id, up_id, intact_id):
"""Make an Agent object, appropriately grounded.
Parameters
----------
symbol : str
A plain text symbol, or None if not listed.
wormbase_id : str
WormBase identifier
entrez_id : str
Entrez identifier
up_id : str
UniProt identifier
intact_id : str
IntAct identifier
Returns
-------
agent : indra.statements.Agent
A grounded agent object.
"""
db_refs = {}
name = symbol
if wormbase_id:
db_refs['WB'] = wormbase_id
if entrez_id:
db_refs['EGID'] = entrez_id
if up_id:
if '-' in up_id:
db_refs['UP'] = up_id.split('-')[0]
db_refs['UPISO'] = up_id
else:
db_refs['UP'] = up_id
# if intact_id:
# db_refs['INTACT'] = intact_id
standard_name, db_refs = standardize_name_db_refs(db_refs)
if standard_name:
name = standard_name
# At the time of writing this, the name was never None but
# just in case
if name is None:
return None
return Agent(name, db_refs=db_refs)
@staticmethod
def _alias_conversion(raw_value: str):
"""Return dictionary with keys corresponding to name types and values
to agent names by decomposing the string value in one of 'Alias(es)
interactor A' or 'Alias(es) interactor B'.
Example string value:
'wormbase:dpy-21(public_name)|wormbase:Y59A8B.1(sequence_name)'
Parameters
----------
raw_value : str
The raw value in 'Alias(es) interactor A' or
'Alias(es) interactor B' for a particular row.
Returns
-------
name_info : dict
Dictionary with name types as keys and agent names as values
(for C. elegans interaction data, the primary name and the
one used corresponds with the key 'public_name').
"""
# import re
if not raw_value:
return {}
# Remove the strings "public name" and all double quotes (only a few
# special cases in the data have this)
cleaned_value = \
raw_value.replace('"public name: ', '').replace('"', '')
name_info = {}
# 'Alias(es) interactor _' can contain multiple aliases
# separated by "|".
for sub in cleaned_value.split('|'):
if ':' in sub and '(' in sub:
# Extract text inside parentheses
match = re.search(r'\(([^)]+)\)', sub)
if match:
key = match.group(1)
val = sub.split(':')[1].split('(')[0]
if key not in name_info:
name_info[key] = [val]
else:
name_info[key].append(val)
return name_info
@staticmethod
def _id_conversion(raw_value: str):
"""Decompose the string value in columns 'ID(s) interactor A',
'ID(s) interactor B', 'Alt. ID(s) interactor A',
'Alt. ID(s) interactor B', 'Publication ID(s)', or
'Interaction identifier(s)' and return dictionary with keys
corresponding to database/source names and values to identifiers.
Example string values: 'wormbase:WBGene00006352',
'entrez gene/locuslink:178272', 'pubmed:36969515',
'wormbase:WBInteraction000000001'.
Parameters
----------
raw_value : str
The raw value in whichever ID column is being converted.
Returns
-------
source_id_info : dict
Dictionary with database/source names as keys and identifiers
as values. Unique keys for 'ID(s) interactor _' in C. elegans
interaction data are 'wormbase' and 'entrez gene/locuslink'.
Unique keys for 'Publication ID(s)' in C. elegans interaction
data are 'pubmed'.
"""
if not raw_value or not isinstance(raw_value, str):
return {}
id_info = {}
for sub in raw_value.split('|'):
if ':' in sub:
parts = sub.split(':')
if len(parts) >= 2:
key = sub.split(':')[-2]
val = sub.split(':')[-1]
if key not in id_info:
id_info[key] = [val]
else:
id_info[key].append(val)
return id_info
@staticmethod
def _type_role_conversion(raw_value: str):
"""Decompose string value for columns 'Interaction type(s)',
'Interactor type(s) A/B', 'Biological role(s) interactor A/B',
or 'Experimental role(s) interactor A/B' and return dictionary with
keys corresponding to the 'psi-mi' tag and values to
types or roles, which reside within parentheses of the string.
Example string values: 'psi-mi:"MI:0326"(protein)',
'psi-mi:"MI:2402"(genetic interaction)', 'psi-mi:"MI:0586"(inhibitor)',
'psi-mi:"MI:0582"(suppressed gene)'.
Parameters
----------
raw_value : str
The raw value in whichever column is being converted.
Returns
-------
type_info : dict
Dictionary with 'psi-mi' as keys and types or roles as values.
"""
import re
if not raw_value:
return {}
type_info = {}
for sub in raw_value.split('|'):
if all(char in sub for char in (':', '(', ')')):
key = sub.split(':')[0]
# Extract text inside outermost parentheses
val = re.search(r'\((.*)\)', sub).group(1)
if key not in type_info:
type_info[key] = [val]
else:
type_info[key].append(val)
return type_info