heros

Submodules

Classes

HEROPeer

A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend.

RemoteHERO

Creates a local stub object from a remote HERO such that it seems like the remote object is a local object.

LocalHERO

Base class for objects exposed through HEROS.

EventObserver

A class that can observe and handle the data emitted by one or more HEROs from a defined event name.

HEROObserver

A HEROObserver keeps track of the HEROs in a given realm by monitoring its zenoh liveliness tokens.

LocalDatasourceHERO

A datasource is a HERO that can provide information on a standardized interface.

PolledLocalDatasourceHERO

This local HERO periodically triggers the event "observable_data" to poll and publish data.

DatasourceObserver

A class that can observe and handle the data emitted by one or more datasource HEROs.

DatasourceReturnValue

A structure to store data returned from a single entity in a datasource.

DatasourceReturnSet

Collection of multiple DatasourceReturnValue.

Functions

event(func)

Decorator for events.

Package Contents

class heros.HEROPeer(realm: str = 'heros', session_manager=None)

A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend. To this end, it provides methods to send cbor-serialized messages via the zenoh network. It establishes the @object namespace and communicates in a defined realm. Methods to discover objects in the realm and to retrieve their object information are provided.

Parameters:
  • realm – Name of the realm that this HEROPeer belongs to. (default: heros)

  • session – optional zenoh session to use. If none is provided, a new zenoh session will be started

_ep_discover = '_discover'
_ep_capabilities = '_capabilities'
_ep_health = '_health'
_ns_objects = '@object'
_default_encoding
_realm = 'heros'
_session_manager
_session = None
_subscriptions = []
_queryables = []
_query_selector(*args, **kwargs) list

Send a query to an endpoint and deserialize the results. This is a low-level function.

Parameters:
  • selector – The zenoh selector.

  • target – zenoh target for the query

  • timeout – timeout for the zenoh get command

Returns:

list of deserialized results

Return type:

list

_subscribe_selector(selector: str, callback: collections.abc.Callable, *args, **kwargs)

Subscribe to a zenoh selector and a attach a callback. The callback receives the deserialized payload of the messages published.

Parameters:
  • selector – zenoh selector for the subscription. See the zenoh documentation for valid descriptors.

  • callback – method to be called for messages that match the selector. The method needs to accept one argument which is the deserialized payload of the message.

_declare_queryable(selector: str, callback: collections.abc.Callable)
_get_object_info(object_name: str, timeout: float = 2.0) dict

Retrieve the object information for a HERO in the current realm and with the given name.

Parameters:
  • object_name – name of the HERO to get the object info for. This name is inserted into a zenoh key expression and can thus contain the corresponding wildcards.

  • timeout – timeout for the discover operation in seconds (default: 2)

Returns:

{remote_object_descriptor}}

Return type:

dict of the form {name

_discover(timeout: float = 2.0) dict

Send query to discovery endpoint of all HEROs in the current realm. All alive objects will respond and send their remote object descriptor.

Parameters:

timeout – timeout for the discover operation in seconds (default: 2)

Returns:

{remote_object_descriptor}}

Return type:

dict of the form {name

_serialize(obj)

Serialize the given object using the serializer used for this HEROPeer. Currently only CBOR is supported.

Parameters:

obj – The object to serialized. Currently only built-in types and numpy arrays are supported.

_deserialize(bytes: bytearray)

Deserialize the given byte string using the deserializer used for this HEROPeer. Currently only CBOR is supported.

Parameters:

bytes – bytearray to deserialize.

_destroy_hero()
__enter__()
__exit__(exc_type, exc_val, exc_tb)
__del__()
class heros.RemoteHERO(name: str, realm: str = 'heros', *args, **kwargs)

Bases: HERO

Creates a local stub object from a remote HERO such that it seems like the remote object is a local object. The remote HERO is identified by its name and has to be available at the given realm.

Attribute and method capabilities of the remote object are directly mapped to attributes and methods of the stub object, respectively. The signature of the methods is adapted accordingly. The remote attributes do not exist locally but are directly changed and read on the remote end. Event capabilities of the remote object are mapped to RemoteEvent objects that are members of this class. By connecting one or more callbacks to this event, the RemoteHERO can react on events triggered at the remote site.

To be able to attach attributes to this class, every instance of a RemoteHERO is created from a dynamically generated child class of RemoteHERO with the name RemoteHERO_<realm>_<HERO name>.

Note

To discover which objects are available in a certain realm, see :class:HEROObserver.

Parameters:
  • name – name/identifier of the remote object

  • realm – realm (think namespace) at which the object is registered. default is “heros”

_hero_tags
_hero_implements
_remote_capabilities
_liveliness_subscription
_liveliness_changed(sample)
_get_capabilities()

Obtain capabilities from remote object.

Returns:

List of capabilities of the remote device

Return type:

list[Capability]

_setattr_remote_capabilities()

Attach functions to the instance that reflect the name and signature of the capabilities of the remote object.

__eq__(other)
_destroy_hero()
__hash__()
class heros.LocalHERO(name: str, *args, realm: str = 'heros', implements: list[str] | None = None, tags: list[str] | None = None, **kwargs)

Bases: HERO

Base class for objects exposed through HEROS. Any object that should be able to be accessed remotely must be based off this class.

Parameters:
  • name – name/identifier under which the object is available. Make sure this name is unique in the realm.

  • realm – realm the HERO should exist in. default is “heros”

  • implements – list of interfaces that are implemented by the hero

  • tags – list of tags to identify and classify the hero

liveliness_token
_capabilities()

Analyze ourself (i.e. the current object) and automatically generate the capabilities of the HERO from this.

For every method that doesn’t start with _ a method capability is announced. Every defined class attribute becomes an attribute capability. Every method that is defined in the class with the @event decorator becomes an event.

While scanning for the capabilities, this method directly creates the necessary callbacks and defines the zenoh queryables for the capabilities.

_destroy_hero()
_connect_local_hero_callback(event: collections.abc.Callable, remote_hero_method: collections.abc.Callable, origin: str = None) str

Connect a method of RemoteHERO as a callback to an event of the LocalHERO. This leads to a new, direct P2P connection between the RemoteHERO and the LocalHERO to call the method.

Parameters:
  • event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.

  • remote_hero_methodcallable to connect as a callback.

  • origin – optional str indicating the semantic origin of the connection.

Returns:

name of the callback.

Return type:

str

_disconnect_local_hero_callback(event: collections.abc.Callable, remote_hero_method: collections.abc.Callable) bool

Disconnect a method of RemoteHERO from an event of the LocalHERO.

Parameters:
  • event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.

  • remote_hero_methodcallable to connect as a callback.

Returns:

truth value if the remote method was indeed a callback.

Return type:

bool

_get_local_hero_callbacks(event: collections.abc.Callable) list

Get a list of dictionary representations of the callbacks of an event of the LocalHERO.

Parameters:

event – the event callable, i.e. a method that is decorated with @event in the LocalHERO.

Returns:

dictionary representations of the callbacks.

Return type:

list

class heros.EventObserver(object_selector: str, event_name: str, *args, **kwargs)

Bases: HEROPeer

A class that can observe and handle the data emitted by one or more HEROs from a defined event name. In particular, this class provides an efficient way to listen to the data emitted by multiple HEROs in the realm. By not instantiating the HEROs themselves but just subscribing to the topics for the event, this reduces the pressure on the backing zenoh network. If, however, only the data of a few HEROs should be observed, it might make more sense to just instantiate the according RemoteHEROs and connect a callback to their events.

Parameters:

object_selector – Selector to specify which objects to observe. This becomes part of a zenoh selector and thus

:param can be anything that makes sense in the selector. Use * to observe all HEROs in the realm.: :param event_name: Name of the event to observe.

_object_selector
_event_name
_event_callbacks
_subscription
_handle_event(key_expr: str, data)
register_callback(func: collections.abc.Callable) str | bool

Register a callback that should be called on events.

Parameters:

func – Function to call.

Returns:

The uuid of the callback or False if the callback was already present.

remove_callback(func: collections.abc.Callable) bool

Remove a callback.

Parameters:

func – Function to remove.

Returns:

True if the callback could be removed, False otherwise.

remove_callback_uid(uid: str) bool

Remove a callback by its uid.

Parameters:

uid – Uid of the callback.

Returns:

True

class heros.HEROObserver(*args, **kwargs)

Bases: HEROPeer

A HEROObserver keeps track of the HEROs in a given realm by monitoring its zenoh liveliness tokens. The member attribute known_objects always holds a list of all HEROs known to the observer.

Parameters:
  • realm – Name of the realm that this HEROPeer belongs to. (default: heros)

  • session – optional zenoh session to use. If none is provided, a new zenoh session will be started

known_objects
_object_added_callbacks = []
_object_removed_callbacks = []
_handle_status_change(sample)

Handle the status change of liveliness tokens.

register_object_added_callback(func: collections.abc.Callable) None

Register a callback that should be called when a new HERO joins the realm.

Parameters:

func – function to call when a new HERO joins the realm

register_object_removed_callback(func: collections.abc.Callable) None

Register a callback that should be called when a new HERO leaves the realm.

Parameters:

func – function to call when a new HERO leaves the realm

get_object(object_name: str) RemoteHERO

Get the RemoteHERO object for the HERO with the given name.

Parameters:

object_name – name of the HERO

class heros.LocalDatasourceHERO(*args, observables: dict | None = None, **kwargs)

Bases: heros.LocalHERO

A datasource is a HERO that can provide information on a standardized interface. This interface is the event observable_data. Instances in the zenoh network interested in the data provided by data sources can simply subscribe to the key expression @objects/realm/*/observable_data or use the DatasourceObserver.

To make your class a LocalDatasourceHERO make it inherit this class. This class is meant for datasources that create asynchronous events on their own. When processing such an event call observable_data to publish the data from this datasource.

Parameters:
  • name – name/identifier under which the object is available. Make sure this name is unique in the realm.

  • realm – realm the HERO should exist in. default is “heros”

observable_processor
_process_data(data)
observable_data(data)
class heros.PolledLocalDatasourceHERO(*args, loop, interval: float = 5, **kwargs)

Bases: LocalDatasourceHERO

This local HERO periodically triggers the event “observable_data” to poll and publish data. This class is meant for datasources that do not generate events on their own an thus should be polled on a periodical basis.

To make your class a PolledLocalDatasourceHERO make it inherit this class an implement the method _observable_data. The method will get called periodically and the return value will be published as an event.

Note

The periodic calling is realized via asyncio and will thus only work if the asyncio mainloop is started.

Parameters:
  • name – name/identifier under which the object is available. Make sure this name is unique in the realm.

  • realm – realm the HERO should exist in. default is “heros”

  • interval – time interval in seconds between consecutive calls of observable_data event

datasource_interval = 5
_loop
_stop_loop
_loop_task
async _trigger_datasource()
_destroy_hero()
observable_data()
_observable_data()
class heros.DatasourceObserver(object_selector: str = '*', *args, **kwargs)

Bases: heros.EventObserver

A class that can observe and handle the data emitted by one or more datasource HEROs. In particular, this class provides an efficient way to listen to the data emitted by all datasource HEROs in the realm. By not instantiating the HEROs themselves but just subscribing to the topics for the datasource, this reduces the pressure on the backing zenoh network. If, however, only the data of a few HEROs should be observed, it might make more sense to just instantiate the according RemoteHEROs and connect a callback to their observable_data signal.

Parameters:
  • object_selector – selector to specify which objects to observe. This becomes part of a zenoh selector and thus

  • realm. (can be anything that makes sense in the selector. Defaults to * to observe all HEROs in the)

_handle_event(key_expr: str, data)
register_observable_data_callback(func: callable)

Register a callback that should be called on observable_data. This method passes the function to EventObserver.register_callback

Parameters:

func – function to call on observable_data.

class heros.DatasourceReturnValue(id: str = None, time: float = None, value: float = None, unit: str = None, raw_value: float = None, raw_unit: str = None, inbound: int = -1, calibrated: bool = False, **kwargs)

Bases: dict

A structure to store data returned from a single entity in a datasource. A datasource can return multiple entities at once. In this case many DatasourceReturnValues are stored in a DatasourceReturnSet.

Default return values from datasource. They can be converted using a calibration.
param raw_value:

(float)

param raw_unit:

(str[10])

param time:

(int) creation time of the rawValue.

property id
property raw_value
property raw_unit
property value

(float) value in specified units.

property unit

SI Unit of the current tuple.

property time
property inbound

Boundary level (int) -1=unbound, 0=ok, 1=warn,error, fault

__str__()

Return str(self).

__repr__()

Return repr(self).

class heros.DatasourceReturnSet

Bases: tuple

Collection of multiple DatasourceReturnValue.

static from_data(data)
We try to build a DatasourceReturnSet by guessing the data format from the following options:
  • [FLOAT, FLOAT, ..] -> A list of raw_values

  • [(FLOAT, STR), (FLOAT, STR), ..] -> a list of (raw_value, raw_unit) tuples

  • {STR: FLOAT, STR: FLOAT, ..} -> a dict with id: raw_value

  • {STR: (FLOAT, STR), STR: (FLOAT, STR), …} a dict with id: (raw_value, raw_unit)

  • FLOAT -> raw_value

  • (FLOAT, STR) -> (raw_value, raw_unit)

heros.event(func: callable)

Decorator for events.

Note

Only use on methods bound to objects.