import logging
import tqdm
from collections import Counter
from indra.statements.validate import assert_valid_statements
from indra.databases import hgnc_client, uniprot_client
from indra.statements import Agent, Phosphorylation, Autophosphorylation, \
Evidence, BioContext, RefContext, get_valid_residue, \
InvalidResidueError, MutCondition
logger = logging.getLogger(__name__)
[docs]class RlimspProcessor(object):
"""Convert RLIMS-P JSON into INDRA Statements."""
def __init__(self, rlimsp_json, doc_id_type=None):
self._json = rlimsp_json
self.statements = []
self.doc_id_type = doc_id_type
self.processed_texts = []
return
[docs]class RlimspParagraph(object):
"""An object that represents a single RLIMS-P Paragraph."""
def __init__(self, p_info, doc_id_type):
self._text = p_info['text']
self._sentences = []
self._sentence_starts = []
for s in p_info['sentence']:
start = s['charStart']
stop = s['charEnd']
self._sentences.append(self._text[start:stop])
self._sentence_starts.append(start)
if 'pmid' in p_info and 'pmcid' in p_info:
self._text_refs = {n.upper(): p_info[n] for n in ['pmid', 'pmcid']
if p_info[n]}
elif doc_id_type:
self._text_refs = {doc_id_type.upper(): p_info['docId']}
else:
logger.info('Could not establish text refs for paragraph.')
self._text_refs = {}
self._relations = p_info['relation']
self._entity_dict = p_info['entity']
return
def _get_agent(self, entity_id):
"""Convert the entity dictionary into an INDRA Agent."""
if entity_id is None:
return None
entity_info = self._entity_dict.get(entity_id)
if entity_info is None:
logger.warning("Entity key did not resolve to entity.")
return None
return get_agent_from_entity_info(entity_info)
def _get_site(self, site_id):
def get_aa_code(residue_str):
try:
res = get_valid_residue(residue_str)
return res
except InvalidResidueError as e:
logger.info('%s' % e)
return None
if site_id is None:
return None, None, None
site_info = self._entity_dict[site_id]
site_text = site_info['attribute'][0]['value']
site_parts = site_text.split('-')
position = None
if len(site_parts) == 2:
residue_str = site_parts[0]
residue = get_aa_code(residue_str)
position = site_parts[1]
else:
residue = get_aa_code(site_text)
coords = (site_info['charStart'], site_info['charEnd'])
return residue, position, coords
def _get_evidence(self, trigger_info, args, agent_coords, site_coords):
"""Get the evidence using the info in the trigger entity."""
# Get the sentence index from the trigger word.
s_idx_set = {self._entity_dict[eid]['sentenceIndex']
for eid in args.values()
if 'sentenceIndex' in self._entity_dict[eid]}
if s_idx_set:
i_min = min(s_idx_set)
i_max = max(s_idx_set)
text = '. '.join(self._sentences[i_min:(i_max+1)]) + '.'
s_start = self._sentence_starts[i_min]
annotations = {
'agents': {'coords': [_fix_coords(coords, s_start)
for coords in agent_coords]},
'trigger': {'coords': _fix_coords([trigger_info['charStart'],
trigger_info['charEnd']],
s_start),
'text': trigger_info['entityText']}
}
else:
logger.info('Unable to get sentence index')
annotations = {}
text = None
if site_coords:
annotations['site'] = {'coords': _fix_coords(site_coords, s_start)}
return Evidence(text_refs=self._text_refs.copy(), text=text,
source_api='rlimsp', pmid=self._text_refs.get('PMID'),
annotations=annotations)
def get_statements(self):
stmts = []
for rel_key, rel_info in self._relations.items():
# Turn the arguments into a dict.
args = {e['role']: e['entity_duid'] for e in rel_info['argument']}
entity_args = args.copy()
# Remove some special cases.
trigger_id = entity_args.pop('TRIGGER')
trigger_info = self._entity_dict[trigger_id]
site_id = entity_args.pop('SITE', None)
# Get the entity ids.
entities = {role: self._get_agent(eid)
for role, eid in entity_args.items()}
rel_type = rel_info['relationType']
if rel_type == 'PHOSPHORYLATION':
# Get the agents.
enz, enz_coords = entities.get('KINASE', (None, None))
sub, sub_coords = entities.get('SUBSTRATE', (None, None))
if sub is None:
continue
trigger_text = trigger_info.get('entityText')
if enz is not None and enz.name == sub.name and \
'auto' in trigger_text:
is_autophos = True
else:
is_autophos = False
# Get the site
residue, position, site_coords = self._get_site(site_id)
# Get the evidence
ev = self._get_evidence(trigger_info, args,
[enz_coords, sub_coords],
site_coords)
# Turn taxonomy into context, sub TAX takes precedence
tax = None
if enz and 'TAX' in enz.db_refs:
tax = enz.db_refs.pop('TAX')
if sub and 'TAX' in sub.db_refs:
tax = sub.db_refs.pop('TAX')
if tax is not None:
context = \
BioContext(species=RefContext(tax,
{'TAXONOMY': tax}))
ev.context = context
if is_autophos:
stmt = Autophosphorylation(sub, residue=residue,
position=position,
evidence=[ev])
else:
stmt = Phosphorylation(enz, sub, residue=residue,
position=position,
evidence=[ev])
stmts.append(stmt)
else:
logger.warning("Unhandled statement type: %s" % rel_type)
return stmts
[docs]def get_agent_from_entity_info(entity_info):
"""Return an INDRA Agent by processing an entity_info dict."""
# This will be the default name. If we get a gene name, it will
# override this rawtext name.
raw_text = entity_info['entityText']
name = raw_text
# Get the db refs.
refs = {'TEXT': raw_text}
entries = entity_info['entityId']
if entries is None or entries == {'$undefined': True}:
entries = []
ref_counts = Counter([entry['source'] for entry in entries])
for source, count in ref_counts.items():
if source in ('Entrez', 'UniProt') and count > 1:
logger.info('%s has %d entries for %s, skipping'
% (raw_text, count, source))
return None, None
muts = []
for id_dict in entries:
if id_dict['source'] == 'Entrez':
refs['EGID'] = id_dict['idString']
hgnc_id = hgnc_client.get_hgnc_from_entrez(id_dict['idString'])
if hgnc_id is not None:
# Check against what we may have already inferred from
# UniProt. If it disagrees with this, let it be. Inference
# from Entrez isn't as reliable.
if 'HGNC' in refs.keys():
if refs['HGNC'] != hgnc_id:
msg = ('HGNC:%s previously set does not'
' match HGNC:%s from EGID:%s') % \
(refs['HGNC'], hgnc_id, refs['EGID'])
logger.info(msg)
else:
refs['HGNC'] = hgnc_id
elif id_dict['source'] == 'UniProt':
refs['UP'] = id_dict['idString']
hgnc_id = uniprot_client.get_hgnc_id(id_dict['idString'])
if hgnc_id:
# Check to see if we have a conflict with an HGNC id
# found from the Entrez id. If so, overwrite with this
# one, in which we have greater faith.
if 'HGNC' in refs.keys() and refs['HGNC'] != hgnc_id:
msg = ('Inferred HGNC:%s from UP:%s does not'
' match HGNC:%s from EGID:%s') % \
(refs['HGNC'], refs['UP'], hgnc_id,
refs['EGID'])
logger.info(msg)
refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
else:
gene_name = uniprot_client.get_gene_name(id_dict['idString'])
if gene_name is not None:
name = gene_name
elif id_dict['source'] in ('Tax', 'NCBI'):
# Note that TAX is non-standard but it's popped out later in the
# extraction process
refs['TAX'] = id_dict['idString']
elif id_dict['source'] == 'CHEBI':
refs['CHEBI'] = 'CHEBI:%s' % id_dict['idString']
# These we take as is
elif id_dict['source'] in ('MESH', 'OMIM'):
if ';' in id_dict['idString']:
refs[id_dict['source']] = id_dict['idString'].split(';')[0]
else:
refs[id_dict['source']] = id_dict['idString']
# CTD is sometimes used for MESH chemical IDs but can also be just '-'
elif id_dict['source'] == 'CTD':
if id_dict['idString'] != '-':
refs['MESH'] = id_dict['idString']
# Handle mutations
elif id_dict['source'] == 'Unk' and \
id_dict['entityType'] == 'ProteinMutation':
# {'idString': 'p|SUB|Y|268|A', 'source': 'Unk',
# 'tool': 'PubTator', 'entityType': 'ProteinMutation'}
# Mpk1(Y268A)'
if id_dict['idString'].startswith('p|SUB|'):
try:
# Handle special cases like p|SUB|A|30|P;RS#:104893878
parts = id_dict['idString'].split(';')[0].split('|')
residue_from, pos, residue_to = parts[2:5]
mut = MutCondition(pos, residue_from, residue_to)
muts.append(mut)
except Exception as e:
logger.info('Could not process mutation %s' %
id_dict['idString'])
else:
logger.info('Unhandled mutation: %s' % id_dict['idString'])
else:
logger.warning("Unhandled id type: {source}={idString}"
.format(**id_dict))
raw_coords = (entity_info['charStart'], entity_info['charEnd'])
return Agent(name, db_refs=refs, mutations=muts), raw_coords
def _fix_coords(coords, offset):
"""Adjust the entity coordinates to the beginning of the sentence."""
if coords is None:
return None
return tuple([n - offset for n in coords])