"""
Retrieving the results of large queries from the INDRA Database REST API
generally involves multiple individual calls. The Processor classes
defined here manage the retrieval process for results of two types, Statements
and Statement hashes. Instances of these Processors are returned by the query
functions in :py:mod:`indra.sources.indra_db_rest.api`.
"""
import logging
from copy import deepcopy
from threading import Thread
from datetime import datetime
from requests import Timeout
from indra.statements import stmts_from_json
from indra.util.statement_presentation import get_available_source_counts, \
get_available_ev_counts
from .query import Query
from .util import RecordableLogger
from .util import logger as util_logger
from .exceptions import IndraDBRestResponseError
logger = logging.getLogger('indra_db_rest.query_processor')
request_logger = RecordableLogger('indra_db_rest.request_logs')
[docs]class IndraDBQueryProcessor:
"""The parent of all db query processors.
Parameters
----------
query : :py:class:`Query`
The query to be evaluated in return for statements.
limit : int or None
Select the maximum number of statements to return. When set less than
500 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
sort_by : str or None
Options are currently 'ev_count' or 'belief'. Results will return in
order of the given parameter. If None, results will be turned in an
arbitrary order.
persist : bool
Default is True. When False, if a query comes back limited (not all
results returned), just give up and pass along what was returned.
Otherwise, make further queries to get the rest of the data (which may
take some time).
timeout : positive int or None
If an int, return after `timeout` seconds, even if query is not done.
Default is None.
strict_stop : bool
If True, the query will only be given timeout to complete before being
abandoned entirely. Otherwise the timeout will simply wait for the
thread to join for `timeout` seconds before returning, allowing other
work to continue while the query runs in the background. The default is
False. NOTE: in practice, due to overhead, the precision of the timeout
is only around +/-0.1 seconds.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can
also help gracefully handle an unreliable connection, if you're
willing to wait. Default is 3
api_key : str or None
Override or use in place of the API key given in the INDRA config file.
"""
result_type = NotImplemented
def __init__(self, query: Query, limit=None, sort_by='ev_count',
timeout=None, strict_stop=False, persist=True, tries=3,
api_key=None):
self.query = query
self.limit = limit
self.sort_by = sort_by
self.tries = tries
self.__strict_stop = strict_stop
self.__timeout = timeout
self.__timed_out = False
self.__offset = 0
self.__quota = limit
self.__api_key = api_key
self.__canceled = False
self.__start_time = None
self.__th = None
self.requests_completed = 0
self._evidence_counts = {}
self._belief_scores = {}
self._source_counts = {}
if limit != 0:
self._run(persist=persist)
# Metadata Retrieval methods.
[docs] def get_ev_counts(self):
"""Get a dictionary of evidence counts."""
return self._evidence_counts.copy()
[docs] def get_belief_scores(self):
"""Get a dictionary of belief scores."""
return self._belief_scores.copy()
[docs] def get_source_counts(self):
"""Get the source counts as a dict per statement hash."""
return deepcopy(self._source_counts)
# Process control methods
[docs] def cancel(self):
"""Cancel the job, stopping the thread running in the background."""
self.__canceled = True
[docs] def is_working(self):
"""Check if the thread is running."""
if not self.__th:
return False
return self.__th.is_alive()
[docs] def timed_out(self):
"""Check if the processor timed out."""
return self.__timed_out
[docs] def wait_until_done(self, timeout=None):
"""Wait for the background load to complete."""
if not self.__th:
raise IndraDBRestResponseError("There is no thread waiting to "
"complete.")
start = datetime.now()
self.__th.join(timeout)
dt = datetime.now() - start
if self.__th.is_alive():
logger.warning("Timed out after %0.3f seconds waiting for "
"statement load to complete." % dt.total_seconds())
ret = False
else:
logger.info("Waited %0.3f seconds for statements to finish "
"loading." % dt.total_seconds())
ret = True
return ret
[docs] @staticmethod
def print_quiet_logs():
"""Print the logs that were suppressed during the query."""
print(request_logger.get_quiet_logs())
# Helper methods
def _get_next_offset(self):
"""Get the offset of the next web request that will be made."""
return self.__offset
def _get_next_limit(self):
"""Get the limit of the next web request that will be made."""
return self.__quota
def _mark_start(self):
self.__start_time = datetime.now()
def _time_since_start(self):
dt = datetime.now() - self.__start_time
return dt.total_seconds()
def _strict_time_is_up(self):
if self.__start_time is not None and self.__strict_stop:
if self._time_since_start() > self.__timeout:
return True
return False
def _done(self):
return (self.__canceled
or self.__offset is None
or self.__offset > 0 and self.__quota == 0
or self._strict_time_is_up())
def _set_special_params(self, **params):
self.__special_params = params
def _run_query(self):
# If we are in strict stop mode, we want to be sure we give up after
# the given overall timeout, so we need to account for time spend on
# other queries.
if self.__strict_stop:
query_timeout = self.__timeout - self._time_since_start()
if query_timeout <= 0:
return
else:
query_timeout = None
# Run the query.
try:
r = self.requests_completed
nth = f"{r}{['st', 'nd', 'rd'][r-1] if 0 < r < 4 else 'th'}"
request_logger.info(f"Running {nth} request for {self.result_type}")
request_logger.info(f" LIMIT: {self.__quota}")
request_logger.info(f" OFFSET: {self.__offset}")
if query_timeout:
request_logger.info(f" TIMEOUT: {query_timeout}")
result = self.query.get(self.result_type, offset=self.__offset,
limit=self.__quota, sort_by=self.sort_by,
timeout=query_timeout, n_tries=self.tries,
api_key=self.__api_key,
**self.__special_params)
except Timeout:
# Make sure this is the timeout we think it is.
self.__timed_out = True
if not self.__strict_stop or not self._strict_time_is_up():
raise
logger.info(f"Query timed out after {self._time_since_start()} "
f"seconds, {self.requests_completed} requests, and "
f"after retrieving {len(self._evidence_counts)} "
f"results, with {self.__quota} remaining.")
return
# Update results
self._evidence_counts.update(result.evidence_counts)
self._belief_scores.update(result.belief_scores)
self._handle_new_result(result, self._source_counts)
# Update the quota
if self.__quota is not None:
self.__quota -= len(result.results)
# Increment the page
self.__offset = result.next_offset
# Increment the number of queries run.
self.requests_completed += 1
return
def _run_queries(self, persist):
"""Use paging to get all statements requested."""
self._mark_start()
self._run_query()
# Check if we want to keep going.
if not persist:
self._compile_results()
return
# Get the rest of the content.
while not self._done():
self._run_query()
# Create the actual statements.
self._compile_results()
# This is end of the loop, one way or another. Restore logging if it
# was redirected.
request_logger.unquiet()
util_logger.unquiet()
return
def _run(self, persist=True):
# Quiet the lowest level logger.
util_logger.quiet()
# Only get the query english if we aren't on a time constraint.
self.__timed_out = False
if self.__timeout is None:
query_english = self.query.get_query_english()
logger.info(f"Retrieving {self.result_type} that {query_english}.")
else:
logger.info(f"Retrieving {self.result_type} for {self.query}.")
# Handle the content if we were limited.
self.__th = Thread(target=self._run_queries,
args=[persist])
self.__th.start()
if self.__timeout is None:
logger.debug("Waiting for thread to complete...")
self.__th.join()
else:
if self.__timeout: # is not 0
logger.debug(f"Waiting at most {self.__timeout} seconds for "
f"thread to complete...")
self.__th.join(self.__timeout)
if not self._done():
request_logger.quiet()
logger.info("Leaving request to background thread. Logs "
"may be viewed using the `print_quiet_logs()` "
"method.")
return
# Child defined methods
def _compile_results(self):
raise NotImplementedError()
def _handle_new_result(self, result, source_counts):
raise NotImplementedError()
[docs]class DBQueryStatementProcessor(IndraDBQueryProcessor):
"""A Processor to get Statements from the server.
For information on thread control and other methods, see the docs for
:py:class:`IndraDBQueryProcessor`.
Parameters
----------
query : :py:class:`Query`
The query to be evaluated in return for statements.
limit : int or None
Select the maximum number of statements to return. When set less than
500 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
ev_limit : int or None
Limit the amount of evidence returned per Statement. Default is 100.
filter_ev : bool
Indicate whether evidence should have the same filters applied as
the statements themselves, where appropriate (e.g. in the case of a
filter by paper).
sort_by : str or None
Options are currently 'ev_count' or 'belief'. Results will return in
order of the given parameter. If None, results will be turned in an
arbitrary order.
persist : bool
Default is True. When False, if a query comes back limited (not all
results returned), just give up and pass along what was returned.
Otherwise, make further queries to get the rest of the data (which may
take some time).
timeout : positive int or None
If an int, return after `timeout` seconds, even if query is not done.
Default is None.
strict_stop : bool
If True, the query will only be given timeout to complete before being
abandoned entirely. Otherwise the timeout will simply wait for the
thread to join for `timeout` seconds before returning, allowing other
work to continue while the query runs in the background. The default is
False.
use_obtained_counts : Optional[bool]
If True, evidence counts and source counts are reported based
on the actual evidences returned for each statement in this query
(as opposed to all existing evidences, even if not all were returned).
Default: False
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can
also help gracefully handle an unreliable connection, if you're
willing to wait. Default is 3.
api_key : str or None
Override or use in place of the API key given in the INDRA config file.
"""
result_type = 'statements'
def __init__(self, query: Query, limit=None, sort_by='ev_count',
ev_limit=10, filter_ev=True, timeout=None, strict_stop=False,
persist=True, use_obtained_counts=False, tries=3,
api_key=None):
self.statements = []
self.statements_sample = None
self.__statement_jsons = {}
self.__started = False
self.use_obtained_counts = use_obtained_counts
self._set_special_params(ev_limit=ev_limit, filter_ev=filter_ev)
super(DBQueryStatementProcessor, self).\
__init__(query, limit=limit, sort_by=sort_by, timeout=timeout,
strict_stop=strict_stop, persist=persist, tries=tries,
api_key=api_key)
# Metadata Retrieval methods.
[docs] def get_ev_count_by_hash(self, stmt_hash):
"""Get the total evidence count for a statement hash."""
return self._evidence_counts.get(stmt_hash, 0)
[docs] def get_ev_count(self, stmt):
"""Get the total evidence count for a statement."""
return self.get_ev_count_by_hash(stmt.get_hash(shallow=True))
[docs] def get_belief_score_by_hash(self, stmt_hash):
"""Get the belief score for a statement hash."""
return self._belief_scores.get(stmt_hash, 0)
[docs] def get_belief_score_by_stmt(self, stmt):
"""Get the belief score for a statement."""
return self.get_belief_score_by_hash(stmt.get_hash(shallow=True))
[docs] def get_hash_statements_dict(self):
"""Return a dict of Statements keyed by hashes."""
res = {stmt_hash: stmts_from_json([stmt])[0]
for stmt_hash, stmt in self.__statement_jsons.items()}
return res
[docs] def get_source_count_by_hash(self, stmt_hash):
"""Get the source counts for a given statement."""
return self._source_counts.get(stmt_hash, {})
[docs] def get_source_count(self, stmt):
"""Get the source counts for a given statement."""
return self.get_source_count_by_hash(stmt.get_hash(shallow=True))
# Result merging methods
[docs] def merge_results(self, other_processor):
"""Merge the results of this processor with those of another."""
if not isinstance(other_processor, self.__class__):
raise ValueError(f"Can only extend with another "
f"{self.__class__.__name__} instance.")
# Where there is overlap, there _should_ be agreement.
self._evidence_counts.update(other_processor._evidence_counts)
self._source_counts.update(other_processor._source_counts)
self._belief_scores.update(other_processor._belief_scores)
# Merge the statement JSONs.
for k, sj in other_processor.__statement_jsons.items():
if k not in self.__statement_jsons:
self.__statement_jsons[k] = sj # This should be most of them
else:
# This should only happen rarely.
for evj in sj['evidence']:
self.__statement_jsons[k]['evidence'].append(evj)
# Recompile the statements
self._compile_results()
return
# Helper methods
def _handle_new_result(self, result, source_counts):
"""Merge these statement jsons with new jsons."""
# Merge counts.
source_counts.update(result.source_counts)
# Merge JSONs
for k, sj in result.results.items():
if k not in self.__statement_jsons:
self.__statement_jsons[k] = sj # This should be most of them
else:
# This should only happen rarely.
for evj in sj['evidence']:
self.__statement_jsons[k]['evidence'].append(evj)
# Add to the sample.
if not self.__started:
self.statements_sample = stmts_from_json(result.results.values())
self.__started = True
return
def _compile_results(self):
"""Generate statements from the jsons."""
self.statements = stmts_from_json(self.__statement_jsons.values())
if self.use_obtained_counts:
self.__source_counts = get_available_source_counts(self.statements)
self.__evidence_counts = get_available_ev_counts(self.statements)
[docs]class DBQueryHashProcessor(IndraDBQueryProcessor):
"""A processor to get hashes from the server.
Parameters
----------
query : :py:class:`Query`
The query to be evaluated in return for statements.
limit : int or None
Select the maximum number of statements to return. When set less than
500 the effect is much the same as setting persist to false, and will
guarantee a faster response. Default is None.
sort_by : str or None
Options are currently 'ev_count' or 'belief'. Results will return in
order of the given parameter. If None, results will be turned in an
arbitrary order.
persist : bool
Default is True. When False, if a query comes back limited (not all
results returned), just give up and pass along what was returned.
Otherwise, make further queries to get the rest of the data (which may
take some time).
timeout : positive int or None
If an int, return after `timeout` seconds, even if query is not done.
Default is None.
tries : int > 0
Set the number of times to try the query. The database often caches
results, so if a query times out the first time, trying again after a
timeout will often succeed fast enough to avoid a timeout. This can
also help gracefully handle an unreliable connection, if you're
willing to wait. Default is 3.
"""
result_type = 'hashes'
def __init__(self, *args, **kwargs):
self.hashes = []
super(DBQueryHashProcessor, self).__init__(*args, **kwargs)
def _handle_new_result(self, result, source_counts):
source_counts.update(result.source_counts)
self.hashes.extend(result.results)
def _compile_results(self):
pass