import logging
from collections import OrderedDict as _o
from datetime import timedelta
__all__ = ['Delta', 'QualitativeDelta', 'QuantitativeState']
logger = logging.getLogger(__name__)
class Delta(object):
"""The parent class of all delta types."""
@classmethod
def from_json(cls, json_dict):
delta_type = json_dict.get('type')
if delta_type == 'qualitative':
return QualitativeDelta.from_json(json_dict)
elif delta_type == 'quantitative':
return QuantitativeState.from_json(json_dict)
else:
raise ValueError('Unknown delta type %s' % delta_type)
def refinement_of(self, other):
if type(self) != type(other):
return False
if self.polarity and not other.polarity:
return True
if self.polarity == other.polarity:
return True
def set_polarity(self, pol):
self.polarity = pol
[docs]class QualitativeDelta(Delta):
"""Qualitative delta defining an Event.
Parameters
----------
polarity : 1, -1 or None
Polarity of an Event.
adjectives : list[str]
Adjectives describing an Event.
"""
def __init__(self, polarity=None, adjectives=None):
self.polarity = polarity
self.adjectives = adjectives if adjectives else []
def add_adjectives(self, adjectives):
for adj in adjectives:
self.adjectives.append(adj)
def is_opposite(self, other):
return ((self.polarity == 1 and other.polarity == -1) or
(self.polarity == -1 and other.polarity == 1))
def equals(self, other):
return (self.polarity == other.polarity and
set(self.adjectives) == set(other.adjectives))
def to_json(self):
json_dict = _o({'type': 'qualitative', 'polarity': self.polarity})
if self.adjectives:
json_dict['adjectives'] = self.adjectives
return json_dict
@classmethod
def from_json(cls, json_dict):
polarity = json_dict.get('polarity')
adjectives = json_dict.get('adjectives')
delta = QualitativeDelta(polarity=polarity, adjectives=adjectives)
return delta
def __str__(self):
return "%s(polarity=%s, adjectives=%s)" % (type(self).__name__,
str(self.polarity),
self.adjectives)
def __repr__(self):
return self.__str__()
[docs]class QuantitativeState(Delta):
"""An object representing numerical value of something.
Parameters
----------
entity : str
An entity to capture the quantity of.
value : float or int
Quantity of a unit (or range?)
unit : str
Measurement unit of value (e.g. absolute, daily, percentage, etc.)
modifier : str
Modifier to value (e.g. more than, at least, approximately, etc.)
text : str
Natural language text describing quantitative state.
polarity : 1, -1 or None
Polarity of an Event.
"""
def __init__(self, entity=None, value=None, unit=None, modifier=None,
text=None, polarity=None):
self.entity = entity
self.value = value if value else None
self.unit = unit
self.modifier = modifier
self.text = text
self.polarity = polarity
def equals(self, other):
return (self.entity == other.entity and self.value == other.value and
self.unit == other.unit and self.modifier == other.modifier and
self.text == other.text and self.polarity == other.polarity)
def refinement_of(self, other):
ref = super().refinement_of(other)
if not ref:
return False
if self.entity != other.entity:
return False
if not other.value:
return True
elif not self.value:
return False
if self.value == other.value and self.unit == other.unit:
return True
else:
return False
def to_json(self):
json_dict = {'type': 'quantitative',
'entity': self.entity if self.entity else None,
'value': self.value if self.value else None,
'unit': self.unit if self.unit else None,
'modifier': self.modifier if self.modifier else None,
'text': self.text if self.text else None,
'polarity': self.polarity if self.polarity else None}
return json_dict
@classmethod
def from_json(cls, json_dict):
entity = json_dict.get('entity')
value = json_dict.get('value')
unit = json_dict.get('unit')
modifier = json_dict.get('modifier')
text = json_dict.get('text')
polarity = json_dict.get('polarity')
return cls(entity=entity, value=value, unit=unit, modifier=modifier,
text=text, polarity=polarity)
def __str__(self):
return ("QuantitativeState(entity=%s, value=%s, unit=%s, modifier=%s,"
" text=%s, polarity=%s)" % (
self.entity, str(self.value), self.unit,
self.modifier, self.text, str(self.polarity)))
def __repr__(self):
return self.__str__()
# Arithmetic operations
def __add__(self, other):
# Return value and unit
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for addition")
# If original units are the same, return the result in same unit
if self.unit == other.unit:
return ((self.value + other.value), self.unit)
# Otherwise, return result per second
values = self._standardize_units(other, target_unit='second')
total_per_second = values[0] + values[1]
# result is per second
return (total_per_second, 'second')
def __sub__(self, other):
# Return value and unit
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for subtraction")
# If original units are the same, return the result in same unit
if self.unit == other.unit:
return ((self.value - other.value), self.unit)
# Otherwise, return result per second
values = self._standardize_units(other, target_unit='second')
diff_per_second = values[0] - values[1]
# result is per second
return (diff_per_second, 'second')
def __gt__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value > other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] > values[1])
def __ge__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value >= other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] >= values[1])
def __lt__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value < other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] < values[1])
def __le__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value <= other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] <= values[1])
def __eq__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value == other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] == values[1])
def __ne__(self, other):
# Only proceed if entities are the same
if not self.entity == other.entity:
raise ValueError("Entities have to be the same for comparison")
# Compare values right away if units are the same
if self.unit == other.unit:
return self.value != other.value
# If units are different, convert to seconds first
values = self._standardize_units(other, target_unit='second')
return (values[0] != values[1])
# Unit conversions
[docs] @staticmethod
def convert_unit(source_unit, target_unit, source_value,
source_period=None, target_period=None):
"""Convert value per unit from source to target unit. If a unit is
absolute, total timedelta period has to be provided. If a unit is a
month or a year, it is recommended to pass timedelta period object
directly, if not provided, the approximation will be used.
"""
if (target_unit == 'absolute' and not target_period) or \
(source_unit == 'absolute' and not source_period):
raise ValueError('Absolute period needs to be provided.')
return
if not source_period:
source_period = QuantitativeState._get_period_from_unit(
source_unit)
if not target_period:
target_period = QuantitativeState._get_period_from_unit(
target_unit)
if source_unit == 'second':
return QuantitativeState.from_seconds(source_value, target_period)
if target_unit == 'second':
return QuantitativeState.value_per_second(
source_value, source_period)
return round(source_value * (target_period / source_period))
[docs] @staticmethod
def value_per_second(value, period):
"""Get value per second given total value per period and a timedelta
period object."""
seconds = period.total_seconds()
per_second = value / seconds
return per_second
[docs] @staticmethod
def from_seconds(value_per_second, period):
"""Get total value per given period given timedelta period object and
value per second."""
seconds = period.total_seconds()
total_value = value_per_second * seconds
return total_value
def _standardize_units(self, other, target_unit='second',
target_period=None):
values = []
for state in [self, other]:
if state.unit != target_unit:
new_value = self.convert_unit(source_unit=state.unit,
target_unit=target_unit,
source_value=state.value,
target_period=target_period)
values.append(new_value)
return values
@staticmethod
def _get_period_from_unit(unit):
if unit == 'day':
return timedelta(days=1)
if unit == 'week':
return timedelta(weeks=1)
if unit == 'hour':
return timedelta(hours=1)
if unit == 'minute':
return timedelta(minutes=1)
if unit == 'second':
return timedelta(seconds=1)
if unit == 'month':
logger.warning('Using approximate value 30 days for a month.')
return timedelta(days=30)
if unit == 'year':
logger.warning('Using approximate value 365 days for a year.')
return timedelta(days=365)
if unit == 'absolute':
logger.warning('Cannot derive absolute period, '
'it needs to be provided.')
return None