import functools
import weakref
from collections.abc import Callable
from .helper import log
from .inspect import is_hero_method, HERO_EVENT_ATTRIBUTE
import zenoh
[docs]
def remote_hero_method_to_str(func: Callable) -> str:
return f"{func.__self__._name}.{func.__name__}"
[docs]
class Callback:
"""
Represent a callback function.
"""
def __init__(self, func: Callable, origin: str = None):
self.func = func
self.origin = origin
if is_hero_method(func):
self.name = remote_hero_method_to_str(func)
self.is_remote_hero = True
else:
self.name = repr(func)
self.is_remote_hero = False
[docs]
def __eq__(self, other) -> bool:
"""
Check for equality with other `Callback`.
Args:
other: other callback instance.
Returns:
bool: equality result.
"""
if self.is_remote_hero == other.is_remote_hero:
if self.is_remote_hero:
# both True
# check if names are equal
return self.name == other.name
# both False
# check if callback functions are equal
return self.func == other.func
# one is remote and the other one is not
return False
[docs]
def __call__(self, *args, **kwargs):
"""
Call the callback function.
"""
return self.func(*args, **kwargs)
[docs]
def __hash__(self):
"""
Generate a hash value for this callback using the name in case of remote hero methods
or the callable itself for builtins or local callables.
Returns:
int: calculated hash.
"""
if self.is_remote_hero:
return hash(self.name)
else:
return hash(self.func)
[docs]
def to_dict(self) -> dict:
"""
Generate a dictionary representation of this callback.
Returns:
dict: dictionary with keys: name, origin, is_remote_hero and func
"""
return {"name": self.name, "origin": self.origin, "is_remote_hero": self.is_remote_hero, "func": self.func}
[docs]
class CallbackStorage:
"""
Store all callbacks.
"""
def __init__(self):
self._callbacks = {}
[docs]
def __iter__(self):
"""
Generate an iteration for this iterable.
"""
return iter(self._callbacks.values())
[docs]
def __contains__(self, func: Callable) -> bool:
"""
Implements the `in` operation for this class.
"""
callback = Callback(func)
return callback.name in self._callbacks.keys()
[docs]
def append(self, func: Callable, origin: str = None) -> str:
"""
Append a given callable to the storage.
Args:
func: `callable` to append.
origin: `str` (default: `None`) indicating the origin of the callback.
Returns:
str: name of the callback.
"""
callback = Callback(func, origin)
self._callbacks[callback.name] = callback
return callback.name
[docs]
def remove(self, func: callable) -> bool:
"""
Remove a callable from storage.
Args:
func: `callable` to remove.
Returns:
bool: truth value indicating if the callable was a callback.
"""
if func in self:
callback = Callback(func)
del self._callbacks[callback.name]
return True
return False
[docs]
def is_callback(self, func: Callable) -> bool:
"""
Check if given `callable` is a callback.
Args:
func: `callable` to check.
Returns:
bool: `callable` is a callback
"""
return func in self
[docs]
def get_callbacks(self) -> list:
"""
Get a list of all callbacks dictionaries.
Returns:
list: dictionary representation of all callbacks
"""
return [cb.to_dict() for name, cb in self._callbacks.items()]
[docs]
class EventHandler:
"""
Base class for event handlers.
"""
[docs]
def connect(self, callback: Callable):
"""Connect a callback function."""
raise NotImplementedError
[docs]
def disconnect(self, callback: Callable):
"""Disconnect a callback function."""
raise NotImplementedError
[docs]
def is_callback(self, func: Callable) -> bool:
"""Check if `func` is a callback"""
raise NotImplementedError
[docs]
def get_callbacks(self):
"""Return all callbacks"""
raise NotImplementedError
[docs]
class LocalEventHandler(EventHandler):
"""
Handles event connections for a specific instance.
"""
def __init__(self, instance, func, reliability: zenoh.Reliability, congestion_control: zenoh.CongestionControl):
self.instance = weakref.ref(instance)
self.func = func
# store callbacks for this instance
self.callbacks = CallbackStorage()
# mark as an hero event
setattr(self, HERO_EVENT_ATTRIBUTE, True)
# preserve signature and metadata of `func`
functools.update_wrapper(self, func)
endpoint = f"{self.instance()._endpoint_base_path}/{self.func.__name__}"
self.publisher = self.instance()._session.declare_publisher(
endpoint, reliability=reliability, congestion_control=congestion_control
)
[docs]
def __call__(self, *args, **kwargs):
"""
Call the original function and trigger callbacks.
"""
# call the method
if self.instance() is not None:
result = self.func(self.instance(), *args, **kwargs)
# publish result (triggers RemoteEvent callbacks)
self.publisher.put(self.instance()._serialize(result))
# execute callbacks
for callback in self.callbacks:
callback(result)
return result
else:
log.error("Lost weak reference to instance for event.")
return None
[docs]
def connect(self, callback: Callable, origin: str | None = None) -> str:
"""
Connect a callback function to be triggered when the method is called.
Args:
callback: `callable` to connect.
origin: (optional) `str` indicting origin of this callback.
Returns:
str: name of the callback.
"""
callback_name = self.callbacks.append(callback, origin)
log.debug(
f"{self.__class__} connecting LocalEvent callback {self.__name__} -> {callback_name} (origin: {origin})"
)
return callback_name
[docs]
def disconnect(self, callback: Callable) -> bool:
"""
Disconnect a callback function.
Args:
callback: `callable` to disconnect.
Returns:
bool: truth value indicating if the callable was a callback.
"""
log.debug(f"{self.__class__} disconnecting LocalEvent callback {self.__name__} -> {callback}")
return self.callbacks.remove(callback)
[docs]
def is_callback(self, func: Callable) -> bool:
"""
Check if given callable is already a registered callback.
Args:
callback: `callable` to check.
Returns:
bool: truth value indicating if the callable is a callback.
"""
if is_hero_method(func):
return remote_hero_method_to_str(func) in self.remote_hero_callbacks.keys()
else:
return func in self.callbacks
[docs]
def get_callbacks(self) -> list:
"""
Return a list of registered callback functions.
Returns:
list: dictionary representation of all callbacks
"""
return [{**cb, "context": "LocalHERO"} for cb in self.callbacks.get_callbacks()]
[docs]
class RemoteEventHandler(EventHandler):
"""
Handles remote events for a specific instance.
"""
def __init__(self, instance, func: Callable | None = None):
self.instance = instance
# store callbacks for this instance
self.callbacks = CallbackStorage()
# mark as an hero event
setattr(self, HERO_EVENT_ATTRIBUTE, True)
[docs]
def __call__(self, payload):
"""Call the original function and trigger callbacks."""
for callback in self.callbacks:
callback(payload)
[docs]
def connect(self, callback: Callable) -> str:
"""
Connect a callback function to be triggered when the method is called.
Args:
callback: `callable` to connect.
origin: (optional) `str` indicting origin of this callback.
Returns:
str: name of the callback.
"""
if is_hero_method(callback):
callback_name = self.instance._connect_local_hero_callback(
event=self, remote_hero_method=callback, origin=self.instance._name
)
log.debug(f"{self.__class__} connecting LocalEvent callback {self.__name__} -> {callback}")
elif callback not in self.callbacks:
callback_name = self.callbacks.append(callback)
log.debug(f"{self.__class__} connecting RemoteEvent callback {self.__name__} -> {callback}")
return callback_name
[docs]
def disconnect(self, callback: Callable) -> None:
"""
Disconnect a callback function.
Args:
callback: `callable` to disconnect.
Returns:
bool: truth value indicating if the callable was a callback.
"""
if is_hero_method(callback):
log.debug(f"{self.__class__} disconnecting LocalEvent callback {self.__name__} -> {callback}")
return self.instance._disconnect_local_hero_callback(event=self, remote_hero_method=callback)
if callback in self.callbacks:
log.debug(f"{self.__class__} disconnecting RemoteEvent callback {self.__name__} -> {callback}")
return self.callbacks.remove(callback)
[docs]
def is_callback(self, func: Callable) -> bool:
"""
Check if given callable is already a registered callback.
Args:
callback: `callable` to check.
Returns:
bool: truth value indicating if the callable is a callback.
"""
return func in self.callbacks
[docs]
def get_callbacks(self) -> list:
"""
Return a list of registered callback functions.
Returns:
list: dictionary representation of all callbacks
"""
remote_event_cbs = [{**cb, "context": "RemoteHERO"} for cb in self.callbacks.get_callbacks()]
local_event_cbs = [{**cb, "context": "LocalHERO"} for cb in self.instance._get_local_hero_callbacks(event=self)]
return [*remote_event_cbs, *local_event_cbs]
[docs]
class EventDescriptor:
"""
A descriptor to handle instance-specific event connections.
"""
def __init__(self, func: Callable | None = None, handler_config: dict | None = None):
# store the original function
self.func = func
self.handler_config = handler_config or {}
# store callbacks for each instance
self._instances = weakref.WeakKeyDictionary()
[docs]
@staticmethod
def _get_event_handler_cls():
raise NotImplementedError
[docs]
def __get__(self, instance, owner):
"""
Ensure the method and event-handling functions are bound to the instance.
Args:
self: the EventDescriptor instance
instance: the owning `LocalHERO`/`RemoteHERO`.
owner:
"""
if instance is None:
# return descriptor itself if accessed via the class
return self
# create an event handler for this instance if not already created
if instance not in self._instances:
self._instances[instance] = self._get_event_handler_cls()(instance, self.func, **self.handler_config)
# return the instance-bound event handler
return self._instances[instance]
[docs]
class LocalEventDescriptor(EventDescriptor):
"""Descriptor of `@event` decorated methods of a `LocalHERO`."""
[docs]
@staticmethod
def _get_event_handler_cls():
return LocalEventHandler
[docs]
class RemoteEventDescriptor(EventDescriptor):
"""
Descriptor of remote representations of events in a `RemoteHERO`.
"""
[docs]
@staticmethod
def _get_event_handler_cls():
return RemoteEventHandler
[docs]
def reliable(func: Callable | LocalEventDescriptor):
"""Decorator to make an event reliable.
This decorator can be used in addition to the @event decorator to make an event use the zenoh
`RELIABLE reliability <https://zenoh-python.readthedocs.io/en/latest/api_reference.html#zenoh.Reliability>`_ and
`BLOCK congestion <https://zenoh-python.readthedocs.io/en/1.8.0/api_reference.html#zenoh.CongestionControl>`_.
This guarantees that the message is received at all subscribers. Using the default can lead to lost
messages under high load or changing network topologies. Use this if you observe missing data from e.g. cameras.
.. note::
Can be used on arbitrary events with the
:external:ref:`BOSS decorator injector mechanism <json_extra_decorators>`. The following example makes the
emitted images from a camera reliable.
.. code-block:: json
{
"_id": "my_camera",
"classname": "herosdevices.hardware.ids.PeakCompatibleCamera",
"arguments": {
"cam_id": "1409f4e7c3b1"
}
"extra_decorators": [
["acquisition_data", "heros.event.reliable"]
]
}
"""
if isinstance(func, LocalEventDescriptor):
func.handler_config["reliability"] = zenoh.Reliability.RELIABLE
func.handler_config["congestion_control"] = zenoh.CongestionControl.BLOCK
else:
func._event_reliable = zenoh.Reliability.RELIABLE # ty: ignore[invalid-assignment]
func._event_congestion_control = zenoh.CongestionControl.BLOCK # ty: ignore[invalid-assignment]
return func
[docs]
def event(func: Callable):
"""
Decorator for events.
Note:
Only use on methods bound to objects.
"""
reliability = getattr(func, "_event_reliable", zenoh.Reliability.BEST_EFFORT)
congestion_control = getattr(func, "_event_congestion_control", zenoh.CongestionControl.BLOCK)
return LocalEventDescriptor(
func, handler_config={"reliability": reliability, "congestion_control": congestion_control}
)