heros ===== .. py:module:: heros Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/heros/capabilities/index /autoapi/heros/datasource/index /autoapi/heros/event/index /autoapi/heros/helper/index /autoapi/heros/heros/index /autoapi/heros/inspect/index /autoapi/heros/serdes/index /autoapi/heros/zenoh/index Attributes ---------- .. autoapisummary:: heros.local_only heros.force_remote heros.default_session_manager heros.log Classes ------- .. autoapisummary:: heros.Capability heros.MethodCapability heros.AttributeCapability heros.EventCapability heros.RemoteEventDescriptor heros.HEROPeer heros.HERO heros.RemoteHERO heros.LocalHERO heros.HEROObserver heros.LocalDatasourceHERO heros.PolledLocalDatasourceHERO heros.DatasourceObserver Functions --------- .. autoapisummary:: heros.is_hero_event heros.is_local_only heros.is_force_remote heros.is_hero_method heros.mark_hero_method heros.object_name_from_keyexpr heros.event Package Contents ---------------- .. py:class:: Capability .. py:attribute:: name :type: str .. py:attribute:: flavor :type: ClassVar[str] :value: 'undefined' .. py:method:: to_dict() .. py:method:: from_dict(d: dict) :staticmethod: .. py:class:: MethodCapability Bases: :py:obj:`Capability` .. py:attribute:: flavor :type: ClassVar[str] :value: 'method' .. py:attribute:: parameters :type: list[Parameter] :value: [] .. py:attribute:: return_type :type: str :value: 'None' .. py:method:: from_method(m: Callable, name: str | None = None) -> MethodCapability :staticmethod: .. py:method:: to_signature() -> inspect.Signature .. py:method:: to_dict() -> dict .. py:method:: from_dict(d: dict) -> MethodCapability :staticmethod: Generate a method capabilities object from a defining dictionary. Args: definition of the capability according to the standard .. py:method:: call_dict(*args, **kwargs) -> dict This returns a dict that assigns the given parameter to the parameters of ourself. It takes care that positional and keyword arguments are handled correctly :param \*args: positional arguments :param \*\*kwargs: keyword arguments :returns: dict with parameter assignments :rtype: dict .. py:class:: AttributeCapability Bases: :py:obj:`Capability` An attribute capability describes a single variable of the remote object. It is exposed under the name of the capability. :param name: name of the capability :param type: data type. E.g. "str", "int", "float", "list", ... :param access: Read and/or write access. "r" for read, "w" for write, and "rw" for both .. py:attribute:: flavor :type: ClassVar[str] :value: 'attribute' .. py:attribute:: type :type: str .. py:attribute:: access :type: str :value: 'rw' .. py:method:: to_dict() -> dict .. py:method:: from_dict(d: dict) -> AttributeCapability :staticmethod: .. py:class:: EventCapability Bases: :py:obj:`Capability` An event capability describes the ability of a remote object to notify upon a certain event. .. py:attribute:: flavor :type: ClassVar[str] :value: 'event' .. py:method:: from_dict(d: dict) -> EventCapability :staticmethod: .. py:class:: RemoteEventDescriptor(func: callable = None) Bases: :py:obj:`EventDescriptor` Descriptor of remote representations of events in a `RemoteHERO`. .. py:method:: _get_event_handler_cls() :staticmethod: .. py:function:: is_hero_event(func: callable) -> bool Check if a callable is a event. :param func: callable to check :returns: The value of the marker. `False` if the marker is not present. .. py:function:: is_local_only(func: callable) -> bool Check if a callable is a local only. :param func: callable to check :returns: The value of the marker. `False` if the marker is not present. .. py:function:: is_force_remote(func: callable) -> bool Check if a callable is a force remote. :param func: callable to check :returns: The value of the marker. `False` if the marker is not present. .. py:function:: is_hero_method(func: callable) -> bool Check if a callable is a method of a (remote) hero. :param func: callable to check :returns: The value of the marker. `False` if the marker is not present. .. py:data:: local_only .. py:data:: force_remote .. py:function:: mark_hero_method(func: callable) -> callable Mark a callable as a method of a (remote) hero. :param func: callable to mark :returns: The marked callable. .. py:data:: default_session_manager .. py:function:: object_name_from_keyexpr(key_expr, ns_objects, realm, endpoint='.*') .. py:data:: log .. py:class:: HEROPeer(realm: str = 'heros', session_manager=None) A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend. To this end, it provides methods to send cbor-serialized messages via the zenoh network. It establishes the `@object` namespace and communicates in a defined realm. Methods to discover objects in the realm and to retrieve their object information are provided. :param realm: Name of the realm that this HEROPeer belongs to. (default: heros) :param session: optional zenoh session to use. If none is provided, a new zenoh session will be started .. py:attribute:: _ep_discover :value: '_discover' .. py:attribute:: _ep_capabilities :value: '_capabilities' .. py:attribute:: _ep_health :value: '_health' .. py:attribute:: _ns_objects :value: '@object' .. py:attribute:: _default_encoding .. py:attribute:: _realm :value: 'heros' .. py:attribute:: _session_manager .. py:attribute:: _session :value: None .. py:attribute:: _subscriptions :value: [] .. py:attribute:: _queryables :value: [] .. py:method:: _query_selector(*args, **kwargs) -> list Send a query to an endpoint and deserialize the results. This is a low-level function. :param selector: The zenoh selector. :param target: zenoh target for the query :param timeout: timeout for the zenoh get command :returns: list of deserialized results :rtype: list .. py:method:: _subscribe_selector(selector: str, callback: callable, *args, **kwargs) Subscribe to a zenoh selector and a attach a callback. The callback receives the deserialized payload of the messages published. :param selector: zenoh selector for the subscription. See the zenoh documentation for valid descriptors. :param callback: method to be called for messages that match the selector. The method needs to accept one argument which is the deserialized payload of the message. .. py:method:: _declare_queryable(selector: str, callback: callable) .. py:method:: _get_object_info(object_name: str, timeout: float = 10.0) -> dict Retrieve the object information for a HERO in the current realm and with the given name. :param object_name: name of the HERO to get the object info for. This name is inserted into a zenoh key expression and can thus contain the corresponding wildcards. :param timeout: timeout for the discover operation in seconds (default: 10) :returns: {remote_object_descriptor}} :rtype: dict of the form {name .. py:method:: _discover(timeout: float = 10.0) -> dict Send query to discovery endpoint of all HEROs in the current realm. All alive objects will respond and send their remote object descriptor. :param timeout: timeout for the discover operation in seconds (default: 10) :returns: {remote_object_descriptor}} :rtype: dict of the form {name .. py:method:: _serialize(obj) Serialize the given object using the serializer used for this HEROPeer. Currently only CBOR is supported. :param obj: The object to serialized. Currently only built-in types and numpy arrays are supported. .. py:method:: _deserialize(bytes: bytearray) Deserialize the given byte string using the deserializer used for this HEROPeer. Currently only CBOR is supported. :param bytes: bytearray to deserialize. .. py:method:: _destroy_hero() .. py:method:: __enter__() .. py:method:: __exit__(exc_type, exc_val, exc_tb) .. py:method:: __del__() .. py:class:: HERO(name: str, realm: str = 'heros', session_manager=None) Bases: :py:obj:`HEROPeer` A HEROPeer provides the minimal interface to establish the HERO communication on top of the zenoh backend. To this end, it provides methods to send cbor-serialized messages via the zenoh network. It establishes the `@object` namespace and communicates in a defined realm. Methods to discover objects in the realm and to retrieve their object information are provided. :param realm: Name of the realm that this HEROPeer belongs to. (default: heros) :param session: optional zenoh session to use. If none is provided, a new zenoh session will be started .. py:attribute:: _name .. py:attribute:: _endpoint_base_path :value: '@object/heros/Uninferable' .. py:attribute:: _endpoints .. py:method:: _query_endpoint(endpoint: str, *args, **kwargs) -> list Send a query to an endpoint. This is a wrapper for _query_selector that enforces to talk to the endpoint of the remote object. :param endpoint: endpoint within this HERO. If the given endpoint does not start with the endpoint base path, the endpoint base path is prepended to generate a zenoh selector. .. py:method:: _subscribe_endpoint(endpoint: str, callback: callable, *args, **kwargs) Subscribe to an endpoint of this HERO. This is a wrapper for _subscribe_selector that enforces to talk to the endpoint of the remote object. :param endpoint: endpoint within this HERO. If the given endpoint does not start with the endpoint base path, the endpoint base path is prepended to generate a zenoh selector. :param callback: method to be called for messages that match the selector. The method needs to accept one argument which is the deserialized payload of the message. .. py:class:: RemoteHERO(name: str, realm: str = 'heros', *args, **kwargs) Bases: :py:obj:`HERO` Creates a local stub object from a remote HERO such that it seems like the remote object is a local object. The remote HERO is identified by its name and has to be available at the given realm. Attribute and method capabilities of the remote object are directly mapped to attributes and methods of the stub object, respectively. The signature of the methods is adapted accordingly. The remote attributes do not exist locally but are directly changed and read on the remote end. Event capabilities of the remote object are mapped to `RemoteEvent` objects that are members of this class. By connecting one or more callbacks to this event, the RemoteHERO can react on events triggered at the remote site. To be able to attach attributes to this class, every instance of a `RemoteHERO` is created from a dynamically generated child class of `RemoteHERO` with the name `RemoteHERO__`. .. note:: To discover which objects are available in a certain realm, see :class:HEROObserver. :param name: name/identifier of the remote object :param realm: realm (think namespace) at which the object is registered. default is "heros" .. py:attribute:: _hero_tags .. py:attribute:: _hero_implements .. py:attribute:: _remote_capabilities .. py:attribute:: _liveliness_subscription .. py:method:: _liveliness_changed(sample) .. py:method:: _get_capabilities() Obtain capabilities from remote object. :returns: List of capabilities of the remote device :rtype: list[Capability] .. py:method:: _setattr_remote_capabilities() Attach functions to the instance that reflect the name and signature of the capabilities of the remote object. .. py:method:: __eq__(other) .. py:method:: _destroy_hero() .. py:method:: __hash__() .. py:class:: LocalHERO(name: str, *args, realm: str = 'heros', implements: list[str] | None = None, tags: list[str] | None = None, **kwargs) Bases: :py:obj:`HERO` Base class for objects exposed through HEROS. Any object that should be able to be accessed remotely must be based off this class. :param name: name/identifier under which the object is available. Make sure this name is unique in the realm. :param realm: realm the HERO should exist in. default is "heros" :param implements: list of interfaces that are implemented by the hero :param tags: list of tags to identify and classify the hero .. py:attribute:: liveliness_token .. py:method:: _capabilities() Analyze ourself (i.e. the current object) and automatically generate the capabilities of the HERO from this. For every method that doesn't start with _ a method capability is announced. Every defined class attribute becomes an attribute capability. Every method that is defined in the class with the @event decorator becomes an event. While scanning for the capabilities, this method directly creates the necessary callbacks and defines the zenoh queryables for the capabilities. .. py:method:: _destroy_hero() .. py:method:: _connect_local_hero_callback(event: callable, remote_hero_method: callable, origin: str = None) -> str Connect a method of `RemoteHERO` as a callback to an event of the `LocalHERO`. This leads to a new, direct P2P connection between the `RemoteHERO` and the `LocalHERO` to call the method. :param event: the event `callable`, i.e. a method that is decorated with `@event` in the `LocalHERO`. :param remote_hero_method: `callable` to connect as a callback. :param origin: optional `str` indicating the semantic origin of the connection. :returns: name of the callback. :rtype: str .. py:method:: _disconnect_local_hero_callback(event: callable, remote_hero_method: callable) -> bool Disconnect a method of `RemoteHERO` from an event of the `LocalHERO`. :param event: the event `callable`, i.e. a method that is decorated with `@event` in the `LocalHERO`. :param remote_hero_method: `callable` to connect as a callback. :returns: truth value if the remote method was indeed a callback. :rtype: bool .. py:method:: _get_local_hero_callbacks(event: callable) -> list Get a list of dictionary representations of the callbacks of an event of the `LocalHERO`. :param event: the event `callable`, i.e. a method that is decorated with `@event` in the `LocalHERO`. :returns: dictionary representations of the callbacks. :rtype: list .. py:class:: HEROObserver(*args, **kwargs) Bases: :py:obj:`HEROPeer` A HEROObserver keeps track of the HEROs in a given realm by monitoring its zenoh liveliness tokens. The member attribute ``known_objects`` always holds a list of all HEROs known to the observer. :param realm: Name of the realm that this HEROPeer belongs to. (default: heros) :param session: optional zenoh session to use. If none is provided, a new zenoh session will be started .. py:attribute:: known_objects .. py:attribute:: _object_added_callbacks :value: [] .. py:attribute:: _object_removed_callbacks :value: [] .. py:method:: _handle_status_change(sample) Handle the status change of liveliness tokens. .. py:method:: register_object_added_callback(func: callable) -> None Register a callback that should be called when a new HERO joins the realm. :param func: function to call when a new HERO joins the realm .. py:method:: register_object_removed_callback(func: callable) -> None Register a callback that should be called when a new HERO leaves the realm. :param func: function to call when a new HERO leaves the realm .. py:method:: get_object(object_name: str) -> RemoteHERO Get the RemoteHERO object for the HERO with the given name. :param object_name: name of the HERO .. py:class:: LocalDatasourceHERO(*args, observables: dict | None = None, **kwargs) Bases: :py:obj:`heros.LocalHERO` A datasource is a HERO that can provide information on a standardized interface. This interface is the event `new_data`. Instances in the zenoh network interested in the data provided by data sources can simply subscribe to the key expression `@objects/realm/*/new_data` or use the :class:`DatasourceObserver`. To make your class a LocalDatasourceHERO make it inherit this class. This class is meant for datasources that create asynchronous events on their own. When processing such an event call `new_data` to publish the data from this datasource. :param name: name/identifier under which the object is available. Make sure this name is unique in the realm. :param realm: realm the HERO should exist in. default is "heros" .. py:attribute:: observable_processor .. py:method:: _process_data(data) .. py:method:: new_data(data) .. py:class:: PolledLocalDatasourceHERO(*args, loop, interval: float = 5, **kwargs) Bases: :py:obj:`LocalDatasourceHERO` This local HERO periodically triggers the event "new_data" to poll and publish data. This class is meant for datasources that do not generate events on their own an thus should be polled on a periodical basis. To make your class a PolledLocalDatasourceHERO make it inherit this class an implement the method `_new_data`. The method will get called periodically and the return value will be published as an event. .. note:: The periodic calling is realized via asyncio and will thus only work if the asyncio mainloop is started. :param name: name/identifier under which the object is available. Make sure this name is unique in the realm. :param realm: realm the HERO should exist in. default is "heros" :param interval: time interval in seconds between consecutive calls of `new_data` event .. py:attribute:: datasource_interval :value: 5 .. py:attribute:: _loop .. py:method:: _trigger_datasource() :async: .. py:method:: new_data() .. py:method:: _new_data() :abstractmethod: .. py:class:: DatasourceObserver(object_selector: str = '*', *args, **kwargs) Bases: :py:obj:`heros.HEROPeer` A class that can observe and handle the data emitted by one or more datasource HEROs. In particular, this class provides an efficient way to listen to the data emitted by all datasource HEROs in the realm. By not instantiating the HEROs themselves but just subscribing to the topics for the datasource, this reduces the pressure on the backing zenoh network. If, however, only the data of a few HEROs should be observed, it might make more sense to just instantiate the according RemoteHEROs and connect a callback to their `new_data` signal. :param object_selector: selector to specify which objects to observe. This becomes part of a zenoh selector and thus :param can be anything that makes sense in the selector. Defaults to * to observe all HEROs in the realm.: .. py:attribute:: _object_selector :value: '*' .. py:attribute:: _new_data_callbacks :value: [] .. py:attribute:: _subscription .. py:method:: _handle_new_data(key_expr: str, data) .. py:method:: register_new_data_callback(func: callable) Register a callback that should be called when a new HERO joins the realm. :param func: function to call when a new HERO joins the realm .. py:function:: event(func: callable) Decorator for events. .. note:: Only use on methods bound to objects.