fix(robotwin): read nested get_obs() output and use aloha-agilex camera names

RoboTwin's base_task.get_obs() returns a nested dict:

    {"observation": {cam: {"rgb": ..., "intrinsic_matrix": ...}},
     "joint_action": {"left_arm": ..., "left_gripper": ...,
                      "right_arm": ..., "right_gripper": ...,
                      "vector": np.ndarray},
     "endpose": {...}}

Our _get_obs was reading raw["{cam}_rgb"] / raw["{cam}"] and raw["joint_action"]
as if they were flat, so np.asarray(raw["joint_action"], dtype=float64) tripped
on a dict and raised

    TypeError: float() argument must be a string or a real number, not 'dict'

Fix:
- Pull images from raw["observation"][cam]["rgb"]
- Pull joint state from raw["joint_action"]["vector"] (the flat array)
- Update the default camera tuple to (head_camera, left_camera, right_camera)
  to match RoboTwin's actual wrist-camera names (envs/camera/camera.py:135-151)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-14 18:25:08 +02:00
parent b06f134fe4
commit f40b30202b
+19 -23
View File
@@ -32,9 +32,8 @@ from lerobot.types import RobotObservation
# up keys in get_obs() output (e.g. "head_camera" → "head_camera_rgb").
ROBOTWIN_CAMERA_NAMES: tuple[str, ...] = (
"head_camera",
"front_camera",
"left_wrist",
"right_wrist",
"left_camera",
"right_camera",
)
ACTION_DIM = 14 # 7 DOF × 2 arms
@@ -278,38 +277,35 @@ class RoboTwinEnv(gym.Env):
assert self._env is not None, "_get_obs called before _ensure_env()"
raw = self._env.get_obs()
# RoboTwin's get_obs returns a nested dict:
# {"observation": {cam_name: {"rgb": img, "intrinsic_matrix": ...}, ...},
# "joint_action": {"left_arm": [...], "left_gripper": ..., "vector": np.ndarray},
# "endpose": {...}}
cameras_raw = raw.get("observation", {}) if isinstance(raw, dict) else {}
images: dict[str, np.ndarray] = {}
for cam in self.camera_names:
# RoboTwin 2.0 camera keys follow the "<name>_rgb" convention.
# Fall back to the bare name if the suffixed key is absent.
key = f"{cam}_rgb" if f"{cam}_rgb" in raw else cam
if key in raw:
img = np.asarray(raw[key], dtype=np.uint8)
# Ensure HWC and exactly 3 channels
cam_data = cameras_raw.get(cam)
img = cam_data.get("rgb") if isinstance(cam_data, dict) else None
if img is not None:
img = np.asarray(img, dtype=np.uint8)
if img.ndim == 2:
img = np.stack([img, img, img], axis=-1)
elif img.shape[-1] != 3:
img = img[..., :3]
if img.shape[0] != self.observation_height or img.shape[1] != self.observation_width:
# Resize is best done outside hot-loop; return as-is and let
# the processor handle it if needed.
pass
images[cam] = img
else:
# Camera not exposed by this task — return a black frame so the
# observation shape stays consistent across all tasks.
images[cam] = np.zeros((self.observation_height, self.observation_width, 3), dtype=np.uint8)
# Joint state: try common key names used by RoboTwin base_task.
joint_state: np.ndarray | None = None
for key in ("joint_action", "qpos", "robot_qpos", "joint_state", "obs"):
if key in raw:
arr = np.asarray(raw[key], dtype=np.float64).flatten()
if arr.size >= ACTION_DIM:
joint_state = arr[:ACTION_DIM]
break
if joint_state is None:
joint_state = np.zeros(ACTION_DIM, dtype=np.float64)
# Joint state: RoboTwin exposes it as raw["joint_action"]["vector"]
# (flat array of left_arm+left_gripper+right_arm+right_gripper).
joint_state = np.zeros(ACTION_DIM, dtype=np.float64)
ja = raw.get("joint_action") if isinstance(raw, dict) else None
if isinstance(ja, dict) and "vector" in ja:
arr = np.asarray(ja["vector"], dtype=np.float64).flatten()
if arr.size >= ACTION_DIM:
joint_state = arr[:ACTION_DIM]
return {"pixels": images, "agent_pos": joint_state}