from dataclasses import dataclass, field
import inspect
from heros import __proto_version__
from heros.helper import log
from .inspect import is_hero_event
import traceback
import typing
from typing import Callable, ClassVar, Literal
from types import UnionType
if typing.TYPE_CHECKING:
from heros.heros import LocalHERO
from zenoh import Query
[docs]
def type_to_str(annotation: type | str) -> str:
"""
Transforms annotation given as `types` to strings.
Args:
annotation: The typing annotation.
Returns:
Annotation as string.
"""
if annotation is not inspect.Parameter.empty:
if type(annotation) is str:
return annotation
elif isinstance(annotation, UnionType):
return repr(annotation)
else:
return annotation.__name__
else:
return "undefined"
[docs]
def build_capability(
hero_obj: "LocalHERO", member_name: str, annotation: None | str = None
) -> "Capability | Literal[False]":
"""Build a capability object from a given member name and optional annotation to infer type.
If no annotation is given the signature is inferred from the member itself by calling `getattr`. For callables (e.g.
methods) this needs to be done, as the full signature is not inferable from the type annotation.
Args:
hero_obj: host HERO object
member_name: name of the attribute/method to build as capability
annotation: optional annotation to infer signature.
Returns:
:py:class:`Capability` object for the given member, `False` if no signature can be inferred from the given
input.
"""
if annotation is None:
is_callable = callable(getattr(hero_obj, member_name))
elif typing.get_origin(annotation) is Callable:
is_callable = True
else:
is_callable = False
if is_callable:
# Callable typing does not contain information to build a signature from them, so we need to get the full attr.
if is_hero_event(getattr(hero_obj, member_name)):
log.debug(f"found event with name {member_name}!")
return EventCapability(name=member_name)
else:
try:
return MethodCapability.from_method(name=member_name, m=getattr(hero_obj, member_name))
except ValueError:
# this occurs if the callable cannot be inspected, we thus skip it
log.warn(f"Skipping {getattr(hero_obj, member_name)} since the signature cannot be inferred!")
return False
log.debug(f"register method queryable for {hero_obj._endpoint_base_path}/{member_name}")
else:
if annotation is None:
annotation = type(getattr(hero_obj, member_name))
return AttributeCapability(name=member_name, type=type_to_str(annotation))
[docs]
@dataclass
class Parameter:
name: str
type: str
default: str
kind: inspect._ParameterKind
[docs]
@staticmethod
def from_signature_parameter(p: inspect.Parameter):
param = Parameter(
name=p.name,
type=type_to_str(p.annotation),
default=p.default if p.default is not inspect.Parameter.empty else "undefined",
kind=p.kind,
)
return param
[docs]
def has_default(self):
return self.default != "undefined"
[docs]
def to_dict(self):
return {"name": self.name, "type": self.type, "default": self.default, "kind": self.kind}
[docs]
@staticmethod
def from_dict(d: dict, proto_version: float = __proto_version__):
if "name" not in d:
raise AttributeError("required field 'name' not in dict")
param = Parameter(
name=d["name"],
type=d["type"] if "type" in d else "undefined",
default=d["default"] if "default" in d else "undefined",
kind=d["kind"]
if proto_version > 0.1
else (inspect.Parameter.KEYWORD_ONLY if "default" in d else inspect.Parameter.VAR_POSITIONAL),
)
return param
[docs]
@dataclass
class Capability:
name: str
flavor: ClassVar[str] = "undefined"
[docs]
def to_dict(self) -> dict:
return {"name": self.name, "flavor": self.flavor}
[docs]
@staticmethod
def from_dict(d: dict, proto_version: float = __proto_version__):
if "name" not in d:
raise AttributeError("required field 'name' not in dict")
if "flavor" not in d:
raise AttributeError("required field 'flavor' not in dict")
if d["flavor"] == "attribute":
return AttributeCapability.from_dict(d, proto_version)
elif d["flavor"] == "method":
return MethodCapability.from_dict(d, proto_version)
elif d["flavor"] == "event":
return EventCapability.from_dict(d, proto_version)
else:
return None
[docs]
def get_call_wrapper(self, hero_obj: "LocalHERO") -> Callable[["Query"], None] | Literal[False]:
"""Construct a callback wrapper function for the Zenoh endpoint.
Returns:
A wrapper function which takes exactly one input, the :py:class:`zenoh.Query` object or `False` if
the capability does not need a wrapper function.
"""
return False
[docs]
@dataclass
class AttributeCapability(Capability):
"""
An attribute capability describes a single variable of the remote object.
It is exposed under the name of the capability.
Args:
name: name of the capability
type: data type. E.g. "str", "int", "float", "list", ...
access: Read and/or write access. "r" for read, "w" for write, and "rw" for both
"""
flavor: ClassVar[str] = "attribute"
type: str
access: str = "rw"
[docs]
def to_dict(self) -> dict:
d = Capability.to_dict(self)
d.update({"type": self.type, "access": self.access})
return d
[docs]
@staticmethod
def from_dict(d: dict, proto_version: float = __proto_version__) -> "AttributeCapability":
if "name" not in d:
raise AttributeError("required field 'type' not in dict")
return AttributeCapability(name=d["name"], type=d["type"], access=d["access"])
[docs]
def get_call_wrapper(self, hero_obj: "LocalHERO") -> Callable[["Query"], None]:
def wrapper(query: "Query") -> None:
if query.payload:
setattr(hero_obj, self.name, hero_obj._deserialize(query.payload.to_bytes()))
log.debug(f"I should update {self.name}")
else:
log.debug(f"I should return value of {self.name}")
# send back the result
try:
payload = hero_obj._serialize(getattr(hero_obj, self.name))
except AttributeError:
# attribute is only typed but not initialised.
log.warning("Call to non-initialised attribute %s", self.name)
payload = hero_obj._serialize(None)
query.reply(
f"{hero_obj._endpoint_base_path}/{self.name}",
payload,
encoding=hero_obj._default_encoding,
)
return wrapper
[docs]
@dataclass
class EventCapability(Capability):
"""
An event capability describes the ability of a remote object to notify upon a certain event.
"""
flavor: ClassVar[str] = "event"
[docs]
@staticmethod
def from_dict(d: dict, proto_version: float = __proto_version__) -> "EventCapability":
return EventCapability(name=d["name"])
[docs]
@dataclass
class MethodCapability(Capability):
flavor: ClassVar[str] = "method"
parameters: list[Parameter] = field(default_factory=list)
return_type: str = "None"
[docs]
@staticmethod
def from_method(m: Callable, name: str | None = None) -> "MethodCapability":
if name is None:
name = m.__name__
sig = inspect.signature(m)
cap = MethodCapability(name=name)
cap.parameters = [Parameter.from_signature_parameter(sig.parameters[pname]) for pname in sig.parameters]
if sig.return_annotation not in (inspect.Signature.empty, None):
cap.return_type = type_to_str(sig.return_annotation)
return cap
[docs]
def to_signature(self) -> inspect.Signature:
parameters = [
inspect.Parameter(
p.name,
kind=p.kind,
default=p.default if p.has_default() else inspect.Parameter.empty,
annotation=p.type if p.type != "undefined" else inspect.Parameter.empty,
)
for p in self.parameters
]
return inspect.Signature(parameters=parameters, return_annotation=self.return_type)
[docs]
def to_dict(self) -> dict:
d = Capability.to_dict(self)
d.update({"parameters": [p.to_dict() for p in self.parameters], "return_type": self.return_type})
return d
[docs]
def get_call_wrapper(self, hero_obj: "LocalHERO") -> Callable[["Query"], None]:
def wrapper(query: "Query") -> None:
params = hero_obj._deserialize(query.payload.to_bytes())
log.debug(f"I should call {self.name} with parameters {params}")
try:
# the actual method call
if isinstance(params, dict):
return_value = getattr(hero_obj, self.name)(**params)
log.warning(
f"HERO {hero_obj._name} received a payload using protocol version 0.1 which is deprecated and will be removed in a future version"
)
else:
return_value = getattr(hero_obj, self.name)(*params[0], **params[1])
# send back the result
query.reply(
f"{hero_obj._endpoint_base_path}/{self.name}",
hero_obj._serialize(return_value),
encoding=hero_obj._default_encoding,
)
except Exception as e:
query.reply_err(
hero_obj._serialize(str(e) + "\n\n" + "".join(traceback.format_tb(e.__traceback__))),
encoding=hero_obj._default_encoding,
)
return wrapper
[docs]
@staticmethod
def from_dict(d: dict, proto_version: float = __proto_version__) -> "MethodCapability":
"""
Generate a method capabilities object from a defining dictionary.
Args: definition of the capability according to the standard
"""
if "parameters" not in d:
raise AttributeError("required field 'parameters' not in dict")
cap = MethodCapability(name=d["name"])
cap.parameters = [Parameter.from_dict(par, proto_version) for par in d["parameters"]]
if "return_type" in d:
cap.return_type = d["return_type"]
return cap
[docs]
def call_dict(self, *args, **kwargs) -> dict:
"""
This returns a dict that assigns the given parameter to the parameters of
ourself. It takes care that positional and keyword arguments are handled correctly
Note:
This function is deprecated and will be removed together with the transport protocol version 0.1
Args:
*args: positional arguments
**kwargs: keyword arguments
Returns:
dict: dict with parameter assignments
"""
# TODO: type checking?
# positional arguments
d = {self.parameters[i].name: arg for i, arg in enumerate(args)}
# keyword arguments
parameter_names = [p.name for p in self.parameters]
d.update({arg_name: value for arg_name, value in kwargs.items() if arg_name in parameter_names})
return d