Compare commits

...

2 Commits

Author SHA1 Message Date
CarolinePascal b6c3bf3764 feat(kiwi): excluding LeKiwi from depth support (for now) 2026-06-24 21:29:31 +02:00
CarolinePascal 28d8fe8858 feat(depth): add depth support for follower robots 2026-06-24 21:29:02 +02:00
9 changed files with 139 additions and 46 deletions
+19 -7
View File
@@ -66,9 +66,14 @@ class HopeJrArm(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -139,10 +144,17 @@ class HopeJrArm(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
+19 -7
View File
@@ -102,9 +102,14 @@ class HopeJrHand(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -170,10 +175,17 @@ class HopeJrHand(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
@@ -68,9 +68,14 @@ class KochFollower(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -192,10 +197,17 @@ class KochFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
+6
View File
@@ -72,6 +72,12 @@ class LeKiwi(Robot):
)
self.arm_motors = [motor for motor in self.bus.motors if motor.startswith("arm")]
self.base_motors = [motor for motor in self.bus.motors if motor.startswith("base")]
depth_cameras = [name for name, cfg in config.cameras.items() if getattr(cfg, "use_depth", False)]
if depth_cameras:
raise NotImplementedError(
f"Depth cameras are not supported on LeKiwi (got depth-enabled cameras: {depth_cameras}). "
"The host/client transport only carries color frames."
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
@@ -44,6 +44,13 @@ class LeKiwiClient(Robot):
self.id = config.id
self.robot_type = config.type
depth_cameras = [name for name, cfg in config.cameras.items() if getattr(cfg, "use_depth", False)]
if depth_cameras:
raise NotImplementedError(
f"Depth cameras are not supported on LeKiwi (got depth-enabled cameras: {depth_cameras}). "
"The host/client transport only carries color frames."
)
self.remote_ip = config.remote_ip
self.port_zmq_cmd = config.port_zmq_cmd
self.port_zmq_observations = config.port_zmq_observations
@@ -68,9 +68,14 @@ class OmxFollower(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -175,10 +180,17 @@ class OmxFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
@@ -101,9 +101,14 @@ class OpenArmFollower(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
"""Camera features for observation space."""
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -242,10 +247,17 @@ class OpenArmFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} get_observation took: {dt_ms:.1f}ms")
@@ -80,9 +80,14 @@ class RebotB601Follower(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -213,10 +218,17 @@ class RebotB601Follower(Robot):
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_rgb", True):
start = time.perf_counter()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
+12 -4
View File
@@ -222,9 +222,14 @@ class UnitreeG1(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
cfg = self.config.cameras[cam]
if getattr(cfg, "use_rgb", True):
features[cam] = (cfg.height, cfg.width, 3)
if getattr(cfg, "use_depth", False):
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -458,7 +463,10 @@ class UnitreeG1(Robot):
# Cameras - read images from ZMQ cameras
for cam_name, cam in self._cameras.items():
obs[cam_name] = cam.read_latest()
if getattr(cam, "use_rgb", True):
obs[cam_name] = cam.read_latest()
if getattr(cam, "use_depth", False):
obs[f"{cam_name}_depth"] = cam.read_latest_depth()
return obs