from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
import os
import csv
import sys
import pickle
from copy import deepcopy
from indra.databases import uniprot_client, hgnc_client
from itertools import groupby, chain
from collections import Counter
import logging
from indra.util import read_unicode_csv, write_unicode_csv
logger = logging.getLogger('grounding_mapper')
class GroundingMapper(object):
def __init__(self, gm):
self.gm = gm
def map_agents(self, stmts, do_rename=True):
# Make a copy of the stmts
mapped_stmts = []
num_skipped = 0
# Iterate over the statements
for stmt in stmts:
mapped_stmt = deepcopy(stmt)
# Iterate over the agents
skip_stmt = False
for agent in mapped_stmt.agent_list():
if agent is None or agent.db_refs.get('TEXT') is None:
continue
agent_text = agent.db_refs.get('TEXT')
# Look this string up in the grounding map
# If not in the map, leave agent alone and continue
try:
map_db_refs = self.gm[agent_text]
except KeyError:
continue
# If it's in the map but it maps to None, then filter out
# this statement by skipping it
if map_db_refs is None:
# Increase counter if this statement has not already
# been skipped via another agent
if not skip_stmt:
num_skipped += 1
logger.debug("Skipping %s" % agent_text)
skip_stmt = True
# If it has a value that's not None, map it and add it
else:
# Otherwise, update the agent's db_refs field
gene_name = None
map_db_refs = deepcopy(self.gm.get(agent_text))
up_id = map_db_refs.get('UP')
hgnc_sym = map_db_refs.get('HGNC')
if up_id and not hgnc_sym:
gene_name = uniprot_client.get_gene_name(up_id, False)
if gene_name:
hgnc_id = hgnc_client.get_hgnc_id(gene_name)
if hgnc_id:
map_db_refs['HGNC'] = hgnc_id
elif hgnc_sym and not up_id:
# Override the HGNC symbol entry from the grounding
# map with an HGNC ID
hgnc_id = hgnc_client.get_hgnc_id(hgnc_sym)
if hgnc_id:
map_db_refs['HGNC'] = hgnc_id
# Now get the Uniprot ID for the gene
up_id = hgnc_client.get_uniprot_id(hgnc_id)
if up_id:
map_db_refs['UP'] = up_id
# If there's no HGNC ID for this symbol, raise an
# Exception
else:
raise ValueError('No HGNC ID corresponding to gene '
'symbol %s in grounding map.' %
hgnc_sym)
# If we have both, check the gene symbol ID against the
# mapping from Uniprot
elif up_id and hgnc_sym:
# Get HGNC Symbol from Uniprot
gene_name = uniprot_client.get_gene_name(up_id)
if not gene_name:
raise ValueError('No gene name found for Uniprot '
'ID %s (expected %s)' %
(up_id, hgnc_sym))
# We got gene name, compare it to the HGNC name
else:
if gene_name != hgnc_sym:
raise ValueError('Gene name %s for Uniprot ID '
'%s does not match HGNC '
'symbol %s given in grounding '
'map.' %
(gene_name, up_id, hgnc_sym))
else:
hgnc_id = hgnc_client.get_hgnc_id(hgnc_sym)
if not hgnc_id:
raise ValueError('No HGNC ID '
'corresponding to gene '
'symbol %s in grounding '
'map.' % hgnc_sym)
# Assign the DB refs from the grounding map to the agent
agent.db_refs = map_db_refs
# Are we renaming right now?
if do_rename:
# If there's a Bioentities ID, prefer that for the name
if agent.db_refs.get('BE'):
agent.name = agent.db_refs.get('BE')
# Get the HGNC symbol or gene name (retrieved above)
elif hgnc_sym is not None:
agent.name = hgnc_sym
elif gene_name is not None:
agent.name = gene_name
# Check if we should skip the statement
if not skip_stmt:
mapped_stmts.append(mapped_stmt)
logger.info('%s statements filtered out' % num_skipped)
return mapped_stmts
def rename_agents(self, stmts):
# Make a copy of the stmts
mapped_stmts = deepcopy(stmts)
# Iterate over the statements
for stmt_ix, stmt in enumerate(mapped_stmts):
# Iterate over the agents
for agent in stmt.agent_list():
if agent is None:
continue
old_name = agent.name
# If there's a Bioentities ID, prefer that for the name
if agent.db_refs.get('BE'):
agent.name = agent.db_refs.get('BE')
# Take a HGNC name from Uniprot next
elif agent.db_refs.get('UP'):
# Try for the gene name
gene_name = uniprot_client.get_gene_name(
agent.db_refs.get('UP'),
web_fallback=False)
if gene_name:
agent.name = gene_name
hgnc_id = hgnc_client.get_hgnc_id(gene_name)
if hgnc_id:
agent.db_refs['HGNC'] = hgnc_id
# Take the text string
#if agent.db_refs.get('TEXT'):
# agent.name = agent.db_refs.get('TEXT')
# If this fails, then we continue with no change
# Fall back to the text string
#elif agent.db_refs.get('TEXT'):
# agent.name = agent.db_refs.get('TEXT')
return mapped_stmts
# TODO: handle the cases when there is more than one entry for the same
# key (e.g., ROS, ER)
def load_grounding_map(grounding_map_path, ignore_path=None):
g_map = {}
map_rows = read_unicode_csv(grounding_map_path, delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
lineterminator='\r\n')
if ignore_path and os.path.exists(ignore_path):
ignore_rows = read_unicode_csv(ignore_path, delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
lineterminator='\r\n')
else:
ignore_rows = []
csv_rows = chain(map_rows, ignore_rows)
for row in csv_rows:
key = row[0]
db_refs = {'TEXT': key}
keys = [entry for entry in row[1::2] if entry != '']
values = [entry for entry in row[2::2] if entry != '']
if len(keys) != len(values):
logger.info('ERROR: Mismatched keys and values in row %s' %
str(row))
continue
else:
db_refs.update(dict(zip(keys, values)))
if len(db_refs.keys()) > 1:
g_map[key] = db_refs
else:
g_map[key] = None
return g_map
# Some useful functions for analyzing the grounding of sets of statements
# Put together all agent texts along with their grounding
def all_agents(stmts):
agents = []
for stmt in stmts:
for agent in stmt.agent_list():
# Agents don't always have a TEXT db_refs entry (for instance
# in the case of Statements from databases) so we check for this.
if agent is not None and agent.db_refs.get('TEXT') is not None:
agents.append(agent)
return agents
def agent_texts(agents):
return [ag.db_refs.get('TEXT') for ag in agents]
def get_sentences_for_agent(text, stmts, max_sentences=None):
sentences = []
for stmt in stmts:
for agent in stmt.agent_list():
if agent is not None and agent.db_refs.get('TEXT') == text:
sentences.append((stmt.evidence[0].pmid,
stmt.evidence[0].text))
if max_sentences is not None and \
len(sentences) >= max_sentences:
return sentences
return sentences
def agent_texts_with_grounding(stmts):
allag = all_agents(stmts)
# Convert PFAM-DEF lists into tuples so that they are hashable and can
# be tabulated with a Counter
for ag in allag:
pfam_def = ag.db_refs.get('PFAM-DEF')
if pfam_def is not None:
ag.db_refs['PFAM-DEF'] = tuple(pfam_def)
refs = [tuple(ag.db_refs.items()) for ag in allag]
refs_counter = Counter(refs)
refs_counter_dict = [(dict(entry[0]), entry[1])
for entry in refs_counter.items()]
# First, sort by text so that we can do a groupby
refs_counter_dict.sort(key=lambda x: x[0].get('TEXT'))
# Then group by text
grouped_by_text = []
for k, g in groupby(refs_counter_dict, key=lambda x: x[0].get('TEXT')):
# Total occurrences of this agent text
total = 0
entry = [k]
db_ref_list = []
for db_refs, count in g:
# Check if TEXT is our only key, indicating no grounding
if list(db_refs.keys()) == ['TEXT']:
db_ref_list.append((None, None, count))
# Add any other db_refs (not TEXT)
for db, id in db_refs.items():
if db == 'TEXT':
continue
else:
db_ref_list.append((db, id, count))
total += count
# Sort the db_ref_list by the occurrences of each grounding
entry.append(tuple(sorted(db_ref_list, key=lambda x: x[2],
reverse=True)))
# Now add the total frequency to the entry
entry.append(total)
# And add the entry to the overall list
grouped_by_text.append(tuple(entry))
# Sort the list by the total number of occurrences of each unique key
grouped_by_text.sort(key=lambda x: x[2], reverse=True)
return grouped_by_text
# List of all ungrounded entities by number of mentions
def ungrounded_texts(stmts):
ungrounded = [ag.db_refs['TEXT']
for s in stmts
for ag in s.agent_list()
if ag is not None and ag.db_refs.keys() == ['TEXT']]
ungroundc = Counter(ungrounded)
ungroundc = ungroundc.items()
ungroundc.sort(key=lambda x: x[1], reverse=True)
return ungroundc
def get_agents_with_name(name, stmts):
return [ag for stmt in stmts for ag in stmt.agent_list()
if ag is not None and ag.name == name]
def save_base_map(filename, grouped_by_text):
rows = []
for group in grouped_by_text:
text_string = group[0]
for db, id, count in group[1]:
if db == 'UP':
name = uniprot_client.get_mnemonic(id)
else:
name = ''
row = [text_string, db, id, count, name]
rows.append(row)
write_unicode_csv(filename, rows, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL, lineterminator='\r\n')
[docs]def protein_map_from_twg(twg):
"""Build map of entity texts to validated protein grounding.
Looks at the grounding of the entity texts extracted from the statements
and finds proteins where there is grounding to a human protein that maps to
an HGNC name that is an exact match to the entity text. Returns a dict that
can be used to update/expand the grounding map.
"""
protein_map = {}
unmatched = 0
matched = 0
logger.info('Building grounding map for human proteins')
for agent_text, grounding_list, total_count in twg:
# If 'UP' (Uniprot) not one of the grounding entries for this text,
# then we skip it.
if not 'UP' in [entry[0] for entry in grounding_list]:
continue
# Otherwise, collect all the Uniprot IDs for this protein.
uniprot_ids = [entry[1] for entry in grounding_list
if entry[0] == 'UP']
# For each Uniprot ID, look up the species
for uniprot_id in uniprot_ids:
# If it's not a human protein, skip it
mnemonic = uniprot_client.get_mnemonic(uniprot_id)
if mnemonic is None or not mnemonic.endswith('_HUMAN'):
continue
# Otherwise, look up the gene name in HGNC and match against the
# agent text
gene_name = uniprot_client.get_gene_name(uniprot_id)
if gene_name is None:
unmatched += 1
continue
if agent_text.upper() == gene_name.upper():
matched += 1
protein_map[agent_text] = {'TEXT': agent_text, 'UP': uniprot_id}
else:
unmatched += 1
logger.info('Exact matches for %d proteins' % matched)
logger.info('No match (or no gene name) for %d proteins' % unmatched)
return protein_map
def save_sentences(twg, stmts, filename, agent_limit=300):
sentences = []
unmapped_texts = [t[0] for t in twg]
counter = 0
logger.info('Getting sentences for top %d unmapped agent texts.' %
agent_limit)
for text in unmapped_texts:
agent_sentences = get_sentences_for_agent(text, stmts)
sentences += map(lambda tup: (text,) + tup, agent_sentences)
counter += 1
if counter >= agent_limit:
break
# Write sentences to CSV file
write_unicode_csv(filename, sentences, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL, lineterminator='\r\n')
default_grounding_map_path = os.path.join(os.path.dirname(__file__),
'../../bioentities/grounding_map.csv')
default_ignore_path = os.path.join(os.path.dirname(__file__),
'../../bioentities/ignore.csv')
default_grounding_map = load_grounding_map(default_grounding_map_path, default_ignore_path)
gm = default_grounding_map
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: %s stmt_file" % sys.argv[0])
sys.exit()
statement_file = sys.argv[1]
logger.info("Opening statement file %s" % statement_file)
with open(statement_file, 'rb') as f:
st = pickle.load(f)
stmts = []
for stmt_list in st.values():
stmts += stmt_list
twg = agent_texts_with_grounding(stmts)
save_base_map('%s_twg.csv' % statement_file, twg)
# Filter out those entries that are NOT already in the grounding map
filtered_twg = [entry for entry in twg
if entry[0] not in default_grounding_map.keys()]
# For proteins that aren't explicitly grounded in the grounding map,
# check for trivial corrections by building the protein map
prot_map = protein_map_from_twg(twg)
filtered_twg = [entry for entry in filtered_twg
if entry[0] not in prot_map.keys()]
save_base_map('%s_unmapped_twg.csv' % statement_file, filtered_twg)
# For each unmapped string, get sentences and write to file
save_sentences(filtered_twg, stmts,
'%s_unmapped_sentences.csv' % statement_file)