"""Provides an API used to run and get Statements from the Sparser
reading system.
"""
from __future__ import absolute_import, print_function, unicode_literals
from builtins import dict, str
from indra import get_config
__all__ = ['process_text', 'process_nxml_str', 'process_nxml_file',
'process_sparser_output', 'process_json_dict', 'process_xml',
'run_sparser', 'get_version', 'make_nxml_from_text']
import os
import json
import logging
import subprocess as sp
import xml.etree.ElementTree as ET
import multiprocessing as mp
from indra.util import UnicodeXMLTreeBuilder as UTB
from .processor import SparserJSONProcessor
from .xml_processor import SparserXMLProcessor
logger = logging.getLogger(__name__)
sparser_path_var = 'SPARSERPATH'
sparser_path = get_config(sparser_path_var)
[docs]def process_text(text, output_fmt='json', outbuf=None, cleanup=True, key='',
**kwargs):
"""Return processor with Statements extracted by reading text with Sparser.
Parameters
----------
text : str
The text to be processed
output_fmt: Optional[str]
The output format to obtain from Sparser, with the two options being
'json' and 'xml'. Default: 'json'
outbuf : Optional[file]
A file like object that the Sparser output is written to.
cleanup : Optional[bool]
If True, the temporary file created, which is used as an input
file for Sparser, as well as the output file created by Sparser
are removed. Default: True
key : Optional[str]
A key which is embedded into the name of the temporary file
passed to Sparser for reading. Default is empty string.
Returns
-------
SparserXMLProcessor or SparserJSONProcessor depending on what output
format was chosen.
"""
nxml_str = make_nxml_from_text(text)
return process_nxml_str(nxml_str, output_fmt, outbuf, cleanup, key,
**kwargs)
[docs]def process_nxml_str(nxml_str, output_fmt='json', outbuf=None, cleanup=True,
key='', **kwargs):
"""Return processor with Statements extracted by reading an NXML string.
Parameters
----------
nxml_str : str
The string value of the NXML-formatted paper to be read.
output_fmt: Optional[str]
The output format to obtain from Sparser, with the two options being
'json' and 'xml'. Default: 'json'
outbuf : Optional[file]
A file like object that the Sparser output is written to.
cleanup : Optional[bool]
If True, the temporary file created in this function,
which is used as an input file for Sparser, as well as the
output file created by Sparser are removed. Default: True
key : Optional[str]
A key which is embedded into the name of the temporary file
passed to Sparser for reading. Default is empty string.
Returns
-------
SparserXMLProcessor or SparserJSONProcessor depending on what output
format was chosen.
"""
tmp_fname = 'PMC%s_%d.nxml' % (key, mp.current_process().pid)
with open(tmp_fname, 'wb') as fh:
fh.write(nxml_str.encode('utf-8'))
try:
sp = process_nxml_file(tmp_fname, output_fmt, outbuf, cleanup,
**kwargs)
finally:
if cleanup and os.path.exists(tmp_fname):
os.remove(tmp_fname)
return sp
[docs]def process_nxml_file(fname, output_fmt='json', outbuf=None, cleanup=True,
**kwargs):
"""Return processor with Statements extracted by reading an NXML file.
Parameters
----------
fname : str
The path to the NXML file to be read.
output_fmt: Optional[str]
The output format to obtain from Sparser, with the two options being
'json' and 'xml'. Default: 'json'
outbuf : Optional[file]
A file like object that the Sparser output is written to.
cleanup : Optional[bool]
If True, the output file created by Sparser is removed.
Default: True
Returns
-------
sp : SparserXMLProcessor or SparserJSONProcessor depending on what output
format was chosen.
"""
sp = None
out_fname = None
try:
out_fname = run_sparser(fname, output_fmt, outbuf, **kwargs)
sp = process_sparser_output(out_fname, output_fmt)
except Exception as e:
logger.error("Sparser failed to run on %s." % fname)
logger.exception(e)
finally:
if out_fname is not None and os.path.exists(out_fname) and cleanup:
os.remove(out_fname)
return sp
[docs]def process_sparser_output(output_fname, output_fmt='json'):
"""Return a processor with Statements extracted from Sparser XML or JSON
Parameters
----------
output_fname : str
The path to the Sparser output file to be processed. The file can
either be JSON or XML output from Sparser, with the output_fmt
parameter defining what format is assumed to be processed.
output_fmt : Optional[str]
The format of the Sparser output to be processed, can either be
'json' or 'xml'. Default: 'json'
Returns
-------
sp : SparserXMLProcessor or SparserJSONProcessor depending on what output
format was chosen.
"""
if output_fmt not in ['json', 'xml']:
logger.error("Unrecognized output format '%s'." % output_fmt)
return None
sp = None
with open(output_fname, 'rt') as fh:
if output_fmt == 'json':
json_dict = json.load(fh)
sp = process_json_dict(json_dict)
else:
xml_str = fh.read()
sp = process_xml(xml_str)
return sp
[docs]def process_json_dict(json_dict):
"""Return processor with Statements extracted from a Sparser JSON.
Parameters
----------
json_dict : dict
The JSON object obtained by reading content with Sparser, using the
'json' output mode.
Returns
-------
sp : SparserJSONProcessor
A SparserJSONProcessor which has extracted Statements as its
statements attribute.
"""
sp = SparserJSONProcessor(json_dict)
sp.get_statements()
return sp
[docs]def process_xml(xml_str):
"""Return processor with Statements extracted from a Sparser XML.
Parameters
----------
xml_str : str
The XML string obtained by reading content with Sparser, using the
'xml' output mode.
Returns
-------
sp : SparserXMLProcessor
A SparserXMLProcessor which has extracted Statements as its
statements attribute.
"""
try:
tree = ET.XML(xml_str, parser=UTB())
except ET.ParseError as e:
logger.error('Could not parse XML string')
logger.error(e)
return None
sp = _process_elementtree(tree)
return sp
[docs]def run_sparser(fname, output_fmt, outbuf=None, timeout=600):
"""Return the path to reading output after running Sparser reading.
Parameters
----------
fname : str
The path to an input file to be processed. Due to the Spaser
executable's assumptions, the file name needs to start with PMC
and should be an NXML formatted file.
output_fmt : Optional[str]
The format in which Sparser should produce its output, can either be
'json' or 'xml'.
outbuf : Optional[file]
A file like object that the Sparser output is written to.
timeout : int
The number of seconds to wait until giving up on this one reading. The
default is 600 seconds (i.e. 10 minutes). Sparcer is a fast reader and
the typical type to read a single full text is a matter of seconds.
Returns
-------
output_path : str
The path to the output file created by Sparser.
"""
if not sparser_path or not os.path.exists(sparser_path):
logger.error('Sparser executable not set in %s' % sparser_path_var)
return None
if output_fmt == 'xml':
format_flag = '-x'
suffix = '.xml'
elif output_fmt == 'json':
format_flag = '-j'
suffix = '.json'
else:
logger.error('Unknown output format: %s' % output_fmt)
return None
sparser_exec_path = os.path.join(sparser_path, 'save-semantics.sh')
output_path = fname.split('.')[0] + '-semantics' + suffix
for fpath in [sparser_exec_path, fname]:
if not os.path.exists(fpath):
raise Exception("'%s' is not a valid path." % fpath)
cmd_list = [sparser_exec_path, format_flag, fname]
# This is mostly a copy of the code found in subprocess.run, with the
# key change that proc.kill is replaced with os.killpg. This allows the
# process to be killed even if it has children. Solution developed from:
# https://stackoverflow.com/questions/36952245/subprocess-timeout-failure
with sp.Popen(cmd_list, stdout=sp.PIPE) as proc:
try:
stdout, stderr = proc.communicate(timeout=timeout)
except sp.TimeoutExpired:
# Yes, this is about as bad as it looks. But it is the only way to
# be sure the script actually dies.
sp.check_call(['pkill', '-f', 'r3.core.*%s' % fname])
stdout, stderr = proc.communicate()
raise sp.TimeoutExpired(proc.args, timeout, output=stdout,
stderr=stderr)
except BaseException:
# See comment on above instance.
sp.check_call(['pkill', '-f', fname])
proc.wait()
raise
retcode = proc.poll()
if retcode:
raise sp.CalledProcessError(retcode, proc.args, output=stdout,
stderr=stderr)
if outbuf is not None:
outbuf.write(stdout)
outbuf.flush()
assert os.path.exists(output_path),\
'No output file \"%s\" created by sparser.' % output_path
return output_path
[docs]def get_version():
"""Return the version of the Sparser executable on the path.
Returns
-------
version : str
The version of Sparser that is found on the Sparser path.
"""
assert sparser_path is not None, "Sparser path is not defined."
with open(os.path.join(sparser_path, 'version.txt'), 'r') as f:
version = f.read().strip()
return version
[docs]def make_nxml_from_text(text):
"""Return raw text wrapped in NXML structure.
Parameters
----------
text : str
The raw text content to be wrapped in an NXML structure.
Returns
-------
nxml_str : str
The NXML string wrapping the raw text input.
"""
text = _escape_xml(text)
header = '<?xml version="1.0" encoding="UTF-8" ?>' + \
'<OAI-PMH><article><body><sec id="s1"><p>'
footer = '</p></sec></body></article></OAI-PMH>'
nxml_str = header + text + footer
return nxml_str
def _process_elementtree(tree):
sp = SparserXMLProcessor(tree)
sp.get_modifications()
sp.get_activations()
return sp
def _escape_xml(text):
esc_map = {'"': '"', '&': '&', '\'': ''',
'<': '<', '>': '>'}
for orig, new in esc_map.items():
text = text.replace(orig, new)
return text