mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-24 19:57:27 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b6c3bf3764 | |||
| 28d8fe8858 |
@@ -66,9 +66,14 @@ class HopeJrArm(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -139,10 +144,17 @@ class HopeJrArm(Robot):
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
|
||||
@@ -102,9 +102,14 @@ class HopeJrHand(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -170,10 +175,17 @@ class HopeJrHand(Robot):
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
|
||||
@@ -68,9 +68,14 @@ class KochFollower(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -192,10 +197,17 @@ class KochFollower(Robot):
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
|
||||
@@ -72,6 +72,12 @@ class LeKiwi(Robot):
|
||||
)
|
||||
self.arm_motors = [motor for motor in self.bus.motors if motor.startswith("arm")]
|
||||
self.base_motors = [motor for motor in self.bus.motors if motor.startswith("base")]
|
||||
depth_cameras = [name for name, cfg in config.cameras.items() if getattr(cfg, "use_depth", False)]
|
||||
if depth_cameras:
|
||||
raise NotImplementedError(
|
||||
f"Depth cameras are not supported on LeKiwi (got depth-enabled cameras: {depth_cameras}). "
|
||||
"The host/client transport only carries color frames."
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@property
|
||||
|
||||
@@ -44,6 +44,13 @@ class LeKiwiClient(Robot):
|
||||
self.id = config.id
|
||||
self.robot_type = config.type
|
||||
|
||||
depth_cameras = [name for name, cfg in config.cameras.items() if getattr(cfg, "use_depth", False)]
|
||||
if depth_cameras:
|
||||
raise NotImplementedError(
|
||||
f"Depth cameras are not supported on LeKiwi (got depth-enabled cameras: {depth_cameras}). "
|
||||
"The host/client transport only carries color frames."
|
||||
)
|
||||
|
||||
self.remote_ip = config.remote_ip
|
||||
self.port_zmq_cmd = config.port_zmq_cmd
|
||||
self.port_zmq_observations = config.port_zmq_observations
|
||||
|
||||
@@ -68,9 +68,14 @@ class OmxFollower(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -175,10 +180,17 @@ class OmxFollower(Robot):
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
|
||||
@@ -101,9 +101,14 @@ class OpenArmFollower(Robot):
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
"""Camera features for observation space."""
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -242,10 +247,17 @@ class OpenArmFollower(Robot):
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} get_observation took: {dt_ms:.1f}ms")
|
||||
|
||||
@@ -80,9 +80,14 @@ class RebotB601Follower(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -213,10 +218,17 @@ class RebotB601Follower(Robot):
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
if getattr(cam, "use_rgb", True):
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.read_latest()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
if getattr(cam, "use_depth", False):
|
||||
start = time.perf_counter()
|
||||
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
|
||||
@@ -222,9 +222,14 @@ class UnitreeG1(Robot):
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
features: dict[str, tuple] = {}
|
||||
for cam in self.cameras:
|
||||
cfg = self.config.cameras[cam]
|
||||
if getattr(cfg, "use_rgb", True):
|
||||
features[cam] = (cfg.height, cfg.width, 3)
|
||||
if getattr(cfg, "use_depth", False):
|
||||
features[f"{cam}_depth"] = (cfg.height, cfg.width, 1)
|
||||
return features
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
@@ -458,7 +463,10 @@ class UnitreeG1(Robot):
|
||||
|
||||
# Cameras - read images from ZMQ cameras
|
||||
for cam_name, cam in self._cameras.items():
|
||||
obs[cam_name] = cam.read_latest()
|
||||
if getattr(cam, "use_rgb", True):
|
||||
obs[cam_name] = cam.read_latest()
|
||||
if getattr(cam, "use_depth", False):
|
||||
obs[f"{cam_name}_depth"] = cam.read_latest_depth()
|
||||
|
||||
return obs
|
||||
|
||||
|
||||
Reference in New Issue
Block a user