Source code for indra.sources.hume.processor

import os
import rdflib
import logging
import objectpath
from datetime import datetime, timedelta

from indra.statements import Concept, Event, Influence, TimeContext, \
    RefContext, WorldContext, Evidence, QualitativeDelta, MovementContext, \
    Migration, QuantitativeState

logger = logging.getLogger(__name__)

# List out relation types and their default (implied) polarities.
polarities = {'causation': 1, 'precondition': 1, 'catalyst': 1,
              'mitigation': -1, 'prevention': -1,
              'temporallyPrecedes': None}

[docs]class HumeJsonLdProcessor(object): """This processor extracts INDRA Statements from Hume JSON-LD output. Parameters ---------- json_dict : dict A JSON dictionary containing the Hume extractions in JSON-LD format. Attributes ---------- tree : objectpath.Tree The objectpath Tree object representing the extractions. statements : list[indra.statements.Statement] A list of INDRA Statements that were extracted by the processor. """ def __init__(self, json_dict): self.tree = objectpath.Tree(json_dict) self.statements = [] self.document_dict = {} self.concept_dict = {} self.relation_dict = {} self.eid_stmt_dict = {} self._get_documents() self.relation_subj_obj_ids = [] def extract_relations(self): relations = self._find_relations() for relation_type, relation in relations: # Extract concepts and contexts. subj = self._get_event_and_context(relation, arg_type='source') obj = self._get_event_and_context(relation, arg_type='destination') if not subj.concept or not obj.concept: continue # Apply the naive polarity from the type of statement. For the # purpose of the multiplication here, if is # None to begin with, we assume it is positive obj_pol = obj_pol = obj_pol if obj_pol is not None else 1 rel_pol = polarities[relation_type] = rel_pol * obj_pol if rel_pol else None evidence = self._get_evidence(relation, get_states(relation)) st = Influence(subj, obj, evidence=evidence) self.eid_stmt_dict[relation['@id']] = st self.statements.append(st) def extract_events(self): events = self._find_events() for event in events: evidence = self._get_evidence(event, get_states(event)) stmt = self._get_event_and_context(event, eid=event['@id'], evidence=evidence) self.eid_stmt_dict[event['@id']] = stmt self.statements.append(stmt) def _find_events(self): """Find standalone events and return them in a list.""" # First populate self.concept_dict and self.relations_subj_obj_ids if not self.relation_dict or not self.concept_dict or \ not self.relation_subj_obj_ids: self._find_relations() # Check if events are part of relations events = [] for e in self.concept_dict.values(): label_set = set(e.get('labels', [])) if 'Event' in label_set: if e['@id'] not in self.relation_subj_obj_ids: events.append(e) if not events:'No standalone events found.') else:'%d standalone events found.' % len(events)) return events def _find_relations(self): """Find all relevant relation elements and return them in a list.""" # Get all extractions extractions = \ list(self.tree.execute("$.extractions[(@.@type is 'Extraction')]")) # Get relations from extractions relations = [] for e in extractions: label_set = set(e.get('labels', [])) # If this is a DirectedRelation if 'DirectedRelation' in label_set: self.relation_dict[e['@id']] = e subtype = e.get('subtype') if any(t in subtype for t in polarities.keys()): relations.append((subtype, e)) # Save IDs of relation's subject and object if e['arguments']: for a in e['arguments']: if a['type'] == 'source' or \ a['type'] == 'destination': self.relation_subj_obj_ids.append( a['value']['@id']) # If this is an Event or an Entity if {'Event', 'Entity'} & label_set: self.concept_dict[e['@id']] = e if not relations and not self.relation_dict:"No relations found.") else:'%d relations of types %s found' % (len(relations), ', '.join(polarities.keys())))'%d relations in dict.' % len(self.relation_dict))'%d concepts found.' % len(self.concept_dict)) return relations def _get_documents(self): """Populate sentences attribute with a dict keyed by document id.""" documents = self.tree.execute("$.documents") for doc in documents: sentences = {s['@id']: s['text'] for s in doc.get('sentences', [])} self.document_dict[doc['@id']] = {'sentences': sentences, 'location': doc['location']} def _make_world_context(self, entity): """Get place and time info from the json for this entity.""" loc_context = None time_context = None # Look for time and place contexts. for argument in entity["arguments"]: if argument["type"] in {"has_location", "has_origin_location", "has_destination_location", "has_intermediate_location"}: entity_id = argument["value"]["@id"] loc_entity = self.concept_dict[entity_id] loc_context = _resolve_geo(loc_entity) if argument["type"] in {"has_time", "has_start_time", "has_end_time"}: entity_id = argument["value"]["@id"] temporal_entity = self.concept_dict[entity_id] time_context = _resolve_time(temporal_entity) # Put context together context = None if loc_context or time_context: context = WorldContext(time=time_context, geo_location=loc_context) return context def _make_movement_context(self, entity): movement_locations = list() time_context = None # Use None for quantitative_state if no information found, default # value will be assigned when creating a Statement quantitative_state = None for argument in entity['arguments']: entity_id = argument["value"]["@id"] hume_entity = self.concept_dict[entity_id] if argument['type'] in {"has_actor", "has_affected_actor", "has_active_actor"}: for count in hume_entity.get('counts', list()): quantitative_state = QuantitativeState( entity="person", value=count['value'], unit=count['unit'], modifier=count['modifier']) if argument['type'] == "has_origin_location": movement_locations.append( {'location': _resolve_geo(hume_entity), 'role': 'origin'}) if argument['type'] == 'has_destination_location': movement_locations.append( {'location': _resolve_geo(hume_entity), 'role': 'destination'}) if argument['type'] in {"has_time", "has_start_time", "has_end_time"}: time_context = _resolve_time(hume_entity) return MovementContext(locations=movement_locations, time=time_context), quantitative_state def _make_concept(self, entity): """Return Concept from a Hume entity.""" # Use the canonical name as the name of the Concept by default name = self._sanitize(entity['canonicalName']) # But if there is a trigger head text, we prefer that since # it almost always results in a cleaner name # This is removed for now since the head word seems to be too # minimal for some concepts, e.g. it gives us only "security" # for "food security". """ trigger = entity.get('trigger') if trigger is not None: head_text = trigger.get('head text') if head_text is not None: name = head_text """ # Save raw text and Hume scored groundings as db_refs db_refs = _get_grounding(entity) concept = Concept(name, db_refs=db_refs) metadata = {arg['type']: arg['value']['@id'] for arg in entity['arguments']} return concept, metadata def _get_event_and_context(self, event, eid=None, arg_type=None, evidence=None): """Return an INDRA Event based on an event entry.""" if not eid: eid = _choose_id(event, arg_type) ev = self.concept_dict[eid] concept, metadata = self._make_concept(ev) is_migration_event = False hume_grounding = {x[0] for x in concept.db_refs['WM']} for grounding_en in hume_grounding: if "wm/concept/causal_factor/social_and_political/migration" in \ grounding_en: is_migration_event = True if is_migration_event: movement_context, quantitative_state = ( self._make_movement_context(ev)) event_obj = Migration(concept, delta=quantitative_state, context=movement_context, evidence=evidence) else: ev_delta = QualitativeDelta( polarity=get_polarity(ev), adjectives=None) context = self._make_world_context(ev) event_obj = Event(concept, delta=ev_delta, context=context, evidence=evidence) return event_obj def _get_evidence(self, event, adjectives): """Return the Evidence object for the INDRA Statement.""" provenance = event.get('provenance') # First try looking up the full sentence through provenance doc_id = provenance[0]['document']['@id'] sent_id = provenance[0]['sentence'] text = self.document_dict[doc_id]['sentences'][sent_id] text = self._sanitize(text) bounds = [provenance[0]['documentCharPositions'][k] for k in ['start', 'end']] annotations = { 'found_by': event.get('rule'), 'provenance': provenance, 'event_type': os.path.basename(event.get('type')), 'adjectives': adjectives, 'bounds': bounds } location = self.document_dict[doc_id]['location'] ev = Evidence(source_api='hume', text=text, annotations=annotations, pmid=location) return [ev] @staticmethod def _sanitize(text): """Return sanitized Hume text field for human readability.""" # TODO: any cleanup needed here? if text is None: return None text = text.replace('\n', ' ') return text
def _choose_id(event, arg_type): args = event.get('arguments', {}) obj_tag = [arg for arg in args if arg['type'] == arg_type] if obj_tag: obj_id = obj_tag[0]['value']['@id'] else: obj_id = None return obj_id def get_states(event): ret_list = [] if 'states' in event: for state_property in event['states']: if state_property['type'] != 'polarity': ret_list.append(state_property['text']) return ret_list def _get_grounding(entity): """Return Hume grounding.""" db_refs = {'TEXT': entity['text']} groundings = entity.get('grounding') if not groundings: return db_refs # Get rid of leading slash groundings = [(x['ontologyConcept'][1:], x['value']) for x in groundings] grounding_entries = sorted(list(set(groundings)), key=lambda x: (x[1], x[0].count('/'), x[0]), reverse=True) # We could get an empty list here in which case we don't add the # grounding if grounding_entries: db_refs['WM'] = grounding_entries return db_refs def get_polarity(event): pol_map = {'Positive': 1, 'Negative': -1} if 'states' in event: for state_property in event['states']: if state_property['type'] == 'polarity': return pol_map[state_property['text']] return None def _get_ontology_entries(): path_here = os.path.dirname(os.path.abspath(__file__)) onto_file = os.path.join(path_here, 'hume_ontology.rdf') G = rdflib.Graph() G.parse(onto_file, format='nt') entries = [e.toPython().split('#')[1] for e in G.all_nodes()] return entries def _resolve_geo(hume_loc_entity): place = hume_loc_entity.get('canonicalName', hume_loc_entity.get('text')) geo_id = hume_loc_entity.get('geoname_id', None) if geo_id is not None: return RefContext(name=place, db_refs={"GEOID": geo_id}) else: return RefContext(place) def _resolve_time(hume_temporal_entity): text = hume_temporal_entity['mentions'][0]['text'] if len(hume_temporal_entity.get("timeInterval", [])) < 1: return TimeContext(text=text) time = hume_temporal_entity["timeInterval"][0] start = datetime.strptime(time['start'], '%Y-%m-%dT%H:%M') end = datetime.strptime(time['end'], '%Y-%m-%dT%H:%M') end = end + timedelta(minutes=1) duration = int((end - start).total_seconds()) return TimeContext(text=text, start=start, end=end, duration=duration)