Source code for heros.zenoh
import zenoh
import json
import atexit
from .helper import log
zenoh_default_config = {}
[docs]
class ZenohSessionManager:
def __init__(self, config_dict: dict | None = None):
config_dict = {} if config_dict is None else config_dict
self._config_dict = {}
self._config_dict.update(zenoh_default_config)
self._config_dict.update(config_dict)
self._session = None
self._referrers = []
[docs]
def request_session(self, obj: object) -> zenoh.Session:
"""
Request the global zenoh session.
Args:
obj: The object that requests the session
"""
if self._session is None:
config = zenoh.Config()
for key, value in self._config_dict.items():
config.insert_json5(key, json.dumps(value))
self._session = zenoh.open(config)
atexit.register(self._session.close)
if obj not in self._referrers:
self._referrers.append(obj)
return self._session
[docs]
def release_session(self, obj: object) -> None:
"""
Release from the global zenoh session.
Args:
obj: The object that wants to release from the global zenoh session
"""
if obj in self._referrers:
del self._referrers[self._referrers.index(obj)]
else:
return
if len(self._referrers) <= 0 and self._session is not None:
try:
self._session.close()
except zenoh.ZError:
msg = "Timeout occurred when closing Zenoh session. This can lead to stale peers and indicates connection issues."
log.exception(msg)
self._session = None
[docs]
def update_config(self, config_dict: dict) -> None:
self._config_dict.update(config_dict)
[docs]
def force_close(self) -> None:
if self._session is not None:
self._session.close()
self._referrers = []
zenoh.try_init_log_from_env()
session_manager = ZenohSessionManager()