from .types import DatasourceReturnValue
from typing import Optional
[docs]
def lambdify(expression: str, globals_dict: dict | None = None):
"""
Turn a string into a lambda function with the given globals.
By default it enables a set of numpy functions in the expression. Further functions can be added by providing
it in ``globals_dict``.
"""
import numpy as np
globals_dict = {} if globals_dict is None else globals_dict
np_functions = [
"sin",
"cos",
"tan",
"arcsin",
"arccos",
"arctan",
"hypot",
"arctan2",
"sinh",
"cosh",
"tanh",
"arcsinh",
"arccosh",
"exp",
"exp2",
"log",
"log10",
"log2",
"log1p",
"logaddexp",
"logaddexp2",
"i0",
"sinc",
"power",
"sqrt",
]
glob = {key: value for key, value in np.__dict__.items() if key in np_functions}
glob.update(globals_dict)
try:
return eval("lambda x:" + expression, glob)
except Exception:
return None
[docs]
class BoundaryChecker:
"""
Check whether the observed value is in it's boundaries.
Therefore provide an integer feedback with rising importance where boundary = 0 says that everything is fine.
1, 2, 3 correspond to higher warning / error or fault situations.
"""
def __init__(self, boundaries: list):
self.boundaries = boundaries
[docs]
def _check_bounds(self, value):
try:
for i, (lower, upper) in enumerate(self.boundaries):
if value >= lower and value <= upper:
return i
except Exception:
pass
return len(self.boundaries)
[docs]
def __call__(self, return_value: DatasourceReturnValue):
# Do the actual boundary check here...
if len(self.boundaries) > 0:
return_value.inbound = self._check_bounds(return_value.value)
return return_value
[docs]
class Converter:
def __init__(self, conversion_code: str, unit: Optional[str] = None):
self._conversion_code = conversion_code
self._convert_lambda = lambdify(conversion_code)
self._unit = unit
[docs]
def __call__(self, return_value: DatasourceReturnValue):
return_value.unit = self._unit if self._unit is not None else return_value.raw_unit
if self._convert_lambda is not None:
# Do the actual conversion here
return_value.value = self._convert_lambda(return_value.raw_value)
return_value["calibrated"] = True
return return_value
[docs]
class Observable:
def __init__(self, id, definition: dict):
self._id = id
self.definition = definition
self._boundary_checker = BoundaryChecker(definition.get("boundaries", []))
self._converter = Converter(definition.get("conversion", None), definition.get("unit", None))
@property
def id(self):
return self._id
[docs]
def process(self, return_value: DatasourceReturnValue):
return self._boundary_checker(self._converter(return_value))
[docs]
class ObservableProcessor:
"""
An ObservableProcessor takes a configuration dict for multiple observables when it is instantiated.
An object of this class can be called with a :class:DataSourceReturnSet as an argument.
"""
def __init__(self, obs_def: dict):
self.observables = [Observable(id, definition) for id, definition in obs_def.items()]
self.observables_ids = [obs.id for obs in self.observables]
[docs]
def __call__(self, datasource_return_set):
for return_value in datasource_return_set:
if return_value.id not in self.observables_ids:
continue
obs = self.observables[self.observables_ids.index(return_value.id)]
obs.process(return_value)
return datasource_return_set