heros¶
Submodules¶
Classes¶
A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend. |
|
Creates a local stub object from a remote HERO such that it seems like the remote object is a local object. |
|
Base class for objects exposed through HEROS. |
|
A class that can observe and handle the data emitted by one or more HEROs from a defined event name. |
|
A HEROObserver keeps track of the HEROs in a given realm by monitoring its zenoh liveliness tokens. |
|
A datasource is a HERO that can provide information on a standardized interface. |
|
This local HERO periodically triggers the event "observable_data" to poll and publish data. |
|
A class that can observe and handle the data emitted by one or more datasource HEROs. |
|
A structure to store data returned from a single entity in a datasource. |
|
Collection of multiple |
Functions¶
|
Decorator for events. |
Package Contents¶
- class heros.HEROPeer(realm: str = 'heros', session_manager=None)¶
A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend. To this end, it provides methods to send cbor-serialized messages via the zenoh network. It establishes the @object namespace and communicates in a defined realm. Methods to discover objects in the realm and to retrieve their object information are provided.
- Parameters:
realm – Name of the realm that this HEROPeer belongs to. (default: heros)
session – optional zenoh session to use. If none is provided, a new zenoh session will be started
- _ep_discover = '_discover'¶
- _ep_capabilities = '_capabilities'¶
- _ep_health = '_health'¶
- _ns_objects = '@object'¶
- _default_encoding¶
- _realm = 'heros'¶
- _session_manager¶
- _session = None¶
- _subscriptions = []¶
- _queryables = []¶
- _query_selector(*args, **kwargs) list¶
Send a query to an endpoint and deserialize the results. This is a low-level function.
- Parameters:
selector – The zenoh selector.
target – zenoh target for the query
timeout – timeout for the zenoh get command
- Returns:
list of deserialized results
- Return type:
list
- _subscribe_selector(selector: str, callback: collections.abc.Callable, *args, **kwargs)¶
Subscribe to a zenoh selector and a attach a callback. The callback receives the deserialized payload of the messages published.
- Parameters:
selector – zenoh selector for the subscription. See the zenoh documentation for valid descriptors.
callback – method to be called for messages that match the selector. The method needs to accept one argument which is the deserialized payload of the message.
- _declare_queryable(selector: str, callback: collections.abc.Callable)¶
- _get_object_info(object_name: str, timeout: float = 2.0) dict¶
Retrieve the object information for a HERO in the current realm and with the given name.
- Parameters:
object_name – name of the HERO to get the object info for. This name is inserted into a zenoh key expression and can thus contain the corresponding wildcards.
timeout – timeout for the discover operation in seconds (default: 2)
- Returns:
{remote_object_descriptor}}
- Return type:
dict of the form {name
- _discover(timeout: float = 2.0) dict¶
Send query to discovery endpoint of all HEROs in the current realm. All alive objects will respond and send their remote object descriptor.
- Parameters:
timeout – timeout for the discover operation in seconds (default: 2)
- Returns:
{remote_object_descriptor}}
- Return type:
dict of the form {name
- _serialize(obj)¶
Serialize the given object using the serializer used for this HEROPeer. Currently only CBOR is supported.
- Parameters:
obj – The object to serialized. Currently only built-in types and numpy arrays are supported.
- _deserialize(bytes: bytearray)¶
Deserialize the given byte string using the deserializer used for this HEROPeer. Currently only CBOR is supported.
- Parameters:
bytes – bytearray to deserialize.
- _destroy_hero()¶
- __enter__()¶
- __exit__(exc_type, exc_val, exc_tb)¶
- __del__()¶
- class heros.RemoteHERO(name: str, realm: str = 'heros', *args, **kwargs)¶
Bases:
HEROCreates a local stub object from a remote HERO such that it seems like the remote object is a local object. The remote HERO is identified by its name and has to be available at the given realm.
Attribute and method capabilities of the remote object are directly mapped to attributes and methods of the stub object, respectively. The signature of the methods is adapted accordingly. The remote attributes do not exist locally but are directly changed and read on the remote end. Event capabilities of the remote object are mapped to RemoteEvent objects that are members of this class. By connecting one or more callbacks to this event, the RemoteHERO can react on events triggered at the remote site.
To be able to attach attributes to this class, every instance of a RemoteHERO is created from a dynamically generated child class of RemoteHERO with the name RemoteHERO_<realm>_<HERO name>.
Note
To discover which objects are available in a certain realm, see :class:HEROObserver.
- Parameters:
name – name/identifier of the remote object
realm – realm (think namespace) at which the object is registered. default is “heros”
- _hero_tags¶
- _hero_implements¶
- _remote_capabilities¶
- _liveliness_subscription¶
- _liveliness_changed(sample)¶
- _get_capabilities()¶
Obtain capabilities from remote object.
- Returns:
List of capabilities of the remote device
- Return type:
list[Capability]
- _setattr_remote_capabilities()¶
Attach functions to the instance that reflect the name and signature of the capabilities of the remote object.
- __eq__(other)¶
- _destroy_hero()¶
- __hash__()¶
- class heros.LocalHERO(name: str, *args, realm: str = 'heros', implements: list[str] | None = None, tags: list[str] | None = None, **kwargs)¶
Bases:
HEROBase class for objects exposed through HEROS. Any object that should be able to be accessed remotely must be based off this class.
- Parameters:
name – name/identifier under which the object is available. Make sure this name is unique in the realm.
realm – realm the HERO should exist in. default is “heros”
implements – list of interfaces that are implemented by the hero
tags – list of tags to identify and classify the hero
- liveliness_token¶
- _capabilities()¶
Analyze ourself (i.e. the current object) and automatically generate the capabilities of the HERO from this.
For every method that doesn’t start with _ a method capability is announced. Every defined class attribute becomes an attribute capability. Every method that is defined in the class with the @event decorator becomes an event.
While scanning for the capabilities, this method directly creates the necessary callbacks and defines the zenoh queryables for the capabilities.
- _destroy_hero()¶
- _connect_local_hero_callback(event: collections.abc.Callable, remote_hero_method: collections.abc.Callable, origin: str = None) str¶
Connect a method of RemoteHERO as a callback to an event of the LocalHERO. This leads to a new, direct P2P connection between the RemoteHERO and the LocalHERO to call the method.
- Parameters:
event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.
remote_hero_method – callable to connect as a callback.
origin – optional str indicating the semantic origin of the connection.
- Returns:
name of the callback.
- Return type:
str
- _disconnect_local_hero_callback(event: collections.abc.Callable, remote_hero_method: collections.abc.Callable) bool¶
Disconnect a method of RemoteHERO from an event of the LocalHERO.
- Parameters:
event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.
remote_hero_method – callable to connect as a callback.
- Returns:
truth value if the remote method was indeed a callback.
- Return type:
bool
- _get_local_hero_callbacks(event: collections.abc.Callable) list¶
Get a list of dictionary representations of the callbacks of an event of the LocalHERO.
- Parameters:
event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.
- Returns:
dictionary representations of the callbacks.
- Return type:
list
- class heros.EventObserver(object_selector: str, event_name: str, *args, **kwargs)¶
Bases:
HEROPeerA class that can observe and handle the data emitted by one or more HEROs from a defined event name. In particular, this class provides an efficient way to listen to the data emitted by multiple HEROs in the realm. By not instantiating the HEROs themselves but just subscribing to the topics for the event, this reduces the pressure on the backing zenoh network. If, however, only the data of a few HEROs should be observed, it might make more sense to just instantiate the according RemoteHEROs and connect a callback to their events.
- Parameters:
object_selector – Selector to specify which objects to observe. This becomes part of a zenoh selector and thus
:param can be anything that makes sense in the selector. Use
*to observe all HEROs in the realm.: :param event_name: Name of the event to observe.- _object_selector¶
- _event_name¶
- _event_callbacks¶
- _subscription¶
- _handle_event(key_expr: str, data)¶
- register_callback(func: collections.abc.Callable) str | bool¶
Register a callback that should be called on events.
- Parameters:
func – Function to call.
- Returns:
The uuid of the callback or False if the callback was already present.
- remove_callback(func: collections.abc.Callable) bool¶
Remove a callback.
- Parameters:
func – Function to remove.
- Returns:
True if the callback could be removed, False otherwise.
- remove_callback_uid(uid: str) bool¶
Remove a callback by its uid.
- Parameters:
uid – Uid of the callback.
- Returns:
True
- class heros.HEROObserver(*args, **kwargs)¶
Bases:
HEROPeerA HEROObserver keeps track of the HEROs in a given realm by monitoring its zenoh liveliness tokens. The member attribute
known_objectsalways holds a list of all HEROs known to the observer.- Parameters:
realm – Name of the realm that this HEROPeer belongs to. (default: heros)
session – optional zenoh session to use. If none is provided, a new zenoh session will be started
- known_objects¶
- _object_added_callbacks = []¶
- _object_removed_callbacks = []¶
- _handle_status_change(sample)¶
Handle the status change of liveliness tokens.
- register_object_added_callback(func: collections.abc.Callable) None¶
Register a callback that should be called when a new HERO joins the realm.
- Parameters:
func – function to call when a new HERO joins the realm
- register_object_removed_callback(func: collections.abc.Callable) None¶
Register a callback that should be called when a new HERO leaves the realm.
- Parameters:
func – function to call when a new HERO leaves the realm
- get_object(object_name: str) RemoteHERO¶
Get the RemoteHERO object for the HERO with the given name.
- Parameters:
object_name – name of the HERO
- class heros.LocalDatasourceHERO(*args, observables: dict | None = None, **kwargs)¶
Bases:
heros.LocalHEROA datasource is a HERO that can provide information on a standardized interface. This interface is the event observable_data. Instances in the zenoh network interested in the data provided by data sources can simply subscribe to the key expression @objects/realm/*/observable_data or use the
DatasourceObserver.To make your class a LocalDatasourceHERO make it inherit this class. This class is meant for datasources that create asynchronous events on their own. When processing such an event call observable_data to publish the data from this datasource.
- Parameters:
name – name/identifier under which the object is available. Make sure this name is unique in the realm.
realm – realm the HERO should exist in. default is “heros”
- observable_processor¶
- _process_data(data)¶
- observable_data(data)¶
- class heros.PolledLocalDatasourceHERO(*args, loop, interval: float = 5, **kwargs)¶
Bases:
LocalDatasourceHEROThis local HERO periodically triggers the event “observable_data” to poll and publish data. This class is meant for datasources that do not generate events on their own an thus should be polled on a periodical basis.
To make your class a PolledLocalDatasourceHERO make it inherit this class an implement the method _observable_data. The method will get called periodically and the return value will be published as an event.
Note
The periodic calling is realized via asyncio and will thus only work if the asyncio mainloop is started.
- Parameters:
name – name/identifier under which the object is available. Make sure this name is unique in the realm.
realm – realm the HERO should exist in. default is “heros”
interval – time interval in seconds between consecutive calls of observable_data event
- datasource_interval = 5¶
- _loop¶
- _stop_loop¶
- _loop_task¶
- async _trigger_datasource()¶
- _destroy_hero()¶
- observable_data()¶
- _observable_data()¶
- class heros.DatasourceObserver(object_selector: str = '*', *args, **kwargs)¶
Bases:
heros.EventObserverA class that can observe and handle the data emitted by one or more datasource HEROs. In particular, this class provides an efficient way to listen to the data emitted by all datasource HEROs in the realm. By not instantiating the HEROs themselves but just subscribing to the topics for the datasource, this reduces the pressure on the backing zenoh network. If, however, only the data of a few HEROs should be observed, it might make more sense to just instantiate the according RemoteHEROs and connect a callback to their observable_data signal.
- Parameters:
object_selector – selector to specify which objects to observe. This becomes part of a zenoh selector and thus
realm. (can be anything that makes sense in the selector. Defaults to * to observe all HEROs in the)
- _handle_event(key_expr: str, data)¶
- register_observable_data_callback(func: callable)¶
Register a callback that should be called on observable_data. This method passes the function to EventObserver.register_callback
- Parameters:
func – function to call on observable_data.
- class heros.DatasourceReturnValue(id: str = None, time: float = None, value: float = None, unit: str = None, raw_value: float = None, raw_unit: str = None, inbound: int = -1, calibrated: bool = False, **kwargs)¶
Bases:
dictA structure to store data returned from a single entity in a datasource. A datasource can return multiple entities at once. In this case many DatasourceReturnValues are stored in a
DatasourceReturnSet.- Default return values from datasource. They can be converted using a calibration.
- param raw_value:
(float)
- param raw_unit:
(str[10])
- param time:
(int) creation time of the rawValue.
- property id¶
- property raw_value¶
- property raw_unit¶
- property value¶
(float) value in specified units.
- property unit¶
SI Unit of the current tuple.
- property time¶
- property inbound¶
Boundary level (int) -1=unbound, 0=ok, 1=warn,error, fault
- __str__()¶
Return str(self).
- __repr__()¶
Return repr(self).
- class heros.DatasourceReturnSet¶
Bases:
tupleCollection of multiple
DatasourceReturnValue.- static from_data(data)¶
- We try to build a DatasourceReturnSet by guessing the data format from the following options:
[FLOAT, FLOAT, ..] -> A list of raw_values
[(FLOAT, STR), (FLOAT, STR), ..] -> a list of (raw_value, raw_unit) tuples
{STR: FLOAT, STR: FLOAT, ..} -> a dict with id: raw_value
{STR: (FLOAT, STR), STR: (FLOAT, STR), …} a dict with id: (raw_value, raw_unit)
FLOAT -> raw_value
(FLOAT, STR) -> (raw_value, raw_unit)
- heros.event(func: callable)¶
Decorator for events.
Note
Only use on methods bound to objects.