diff --git a/src/lerobot/envs/robotwin.py b/src/lerobot/envs/robotwin.py index d3b301d69..48a74b2fe 100644 --- a/src/lerobot/envs/robotwin.py +++ b/src/lerobot/envs/robotwin.py @@ -40,6 +40,9 @@ ACTION_DIM = 14 # 7 DOF × 2 arms ACTION_LOW = -1.0 ACTION_HIGH = 1.0 DEFAULT_EPISODE_LENGTH = 300 +# D435 dims from task_config/_camera_config.yml (what demo_clean.yml selects). +DEFAULT_CAMERA_H = 240 +DEFAULT_CAMERA_W = 320 # Complete task list from RoboTwin 2.0 (60 tasks, as listed on the leaderboard). ROBOTWIN_TASKS: tuple[str, ...] = ( @@ -240,12 +243,11 @@ class RoboTwinEnv(gym.Env): self.episode_index = episode_index self._reset_stride = n_envs self.camera_names = list(camera_names) - # Match the declared observation_space to the camera resolution in - # RoboTwin's YAML — otherwise gym.vector's concatenate pre-allocates - # a buffer of the wrong shape and np.stack raises ValueError. - setup_kwargs = _load_robotwin_setup_kwargs(task_name) - self.observation_height = observation_height or setup_kwargs["head_camera_h"] - self.observation_width = observation_width or setup_kwargs["head_camera_w"] + # Default to D435 dims (the camera type baked into task_config/demo_clean.yml). + # The YAML-driven lookup is deferred to reset() so construction doesn't + # import RoboTwin's `envs` module — fast-tests run without RoboTwin installed. + self.observation_height = observation_height or DEFAULT_CAMERA_H + self.observation_width = observation_width or DEFAULT_CAMERA_W self.episode_length = episode_length self._max_episode_steps = episode_length # lerobot_eval.rollout reads this self.render_mode = render_mode diff --git a/tests/envs/test_robotwin.py b/tests/envs/test_robotwin.py index 24b6bb22d..d07244e74 100644 --- a/tests/envs/test_robotwin.py +++ b/tests/envs/test_robotwin.py @@ -15,12 +15,14 @@ # limitations under the License. """Unit tests for the RoboTwin 2.0 Gymnasium wrapper. -These tests mock out the SAPIEN-based RoboTwin runtime so they run without -the full RoboTwin installation (SAPIEN, CuRobo, mplib, etc.). +These tests mock out the SAPIEN-based RoboTwin runtime (task modules + +YAML config loader) so they run without the full RoboTwin installation +(SAPIEN, CuRobo, mplib, asset downloads, etc.). """ from __future__ import annotations +from contextlib import contextmanager from unittest.mock import MagicMock, patch import gymnasium as gym @@ -42,13 +44,21 @@ from lerobot.envs.robotwin import ( def _make_mock_task_env( - height: int = 480, - width: int = 640, + height: int = 240, + width: int = 320, cameras: tuple[str, ...] = ROBOTWIN_CAMERA_NAMES, ) -> MagicMock: - """Return a mock that mimics the RoboTwin task class API.""" - obs_dict = {f"{cam}_rgb": np.zeros((height, width, 3), dtype=np.uint8) for cam in cameras} - obs_dict["joint_action"] = np.zeros(ACTION_DIM, dtype=np.float64) + """Return a mock that mimics the RoboTwin task class API. + + RoboTwin's real get_obs returns + {"observation": {cam: {"rgb": img}}, "joint_action": {"vector": np.ndarray}, ...} + so the mock follows the same nested shape. + """ + obs_dict = { + "observation": {cam: {"rgb": np.zeros((height, width, 3), dtype=np.uint8)} for cam in cameras}, + "joint_action": {"vector": np.zeros(ACTION_DIM, dtype=np.float64)}, + "endpose": {}, + } mock = MagicMock() mock.get_obs.return_value = obs_dict @@ -60,10 +70,28 @@ def _make_mock_task_env( return mock -def _patch_load(mock_task_instance: MagicMock): - """Context manager that patches _load_robotwin_task to return a mock class.""" +@contextmanager +def _patch_runtime(mock_task_instance: MagicMock): + """Patch both the task-class loader and the YAML config loader so the + env can construct + reset without a real RoboTwin install.""" task_cls = MagicMock(return_value=mock_task_instance) - return patch("lerobot.envs.robotwin._load_robotwin_task", return_value=task_cls) + fake_setup = { + "head_camera_h": 240, + "head_camera_w": 320, + "left_embodiment_config": {}, + "right_embodiment_config": {}, + "left_robot_file": "", + "right_robot_file": "", + "dual_arm_embodied": True, + "render_freq": 0, + "task_name": "beat_block_hammer", + "task_config": "demo_clean", + } + with ( + patch("lerobot.envs.robotwin._load_robotwin_task", return_value=task_cls), + patch("lerobot.envs.robotwin._load_robotwin_setup_kwargs", return_value=fake_setup), + ): + yield # --------------------------------------------------------------------------- @@ -79,12 +107,12 @@ class TestRoboTwinEnv: task_name="beat_block_hammer", observation_height=h, observation_width=w, - camera_names=["head_camera", "front_camera"], + camera_names=["head_camera", "left_camera"], ) pixels_space = env.observation_space["pixels"] assert pixels_space["head_camera"].shape == (h, w, 3) - assert pixels_space["front_camera"].shape == (h, w, 3) - assert "left_wrist" not in pixels_space + assert pixels_space["left_camera"].shape == (h, w, 3) + assert "right_camera" not in pixels_space def test_action_space(self): env = RoboTwinEnv(task_name="beat_block_hammer") @@ -94,7 +122,7 @@ class TestRoboTwinEnv: def test_reset_returns_correct_obs_keys(self): mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer") - with _patch_load(mock_task): + with _patch_runtime(mock_task): obs, info = env.reset() assert "pixels" in obs @@ -107,15 +135,20 @@ class TestRoboTwinEnv: def test_reset_calls_setup_demo(self): mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer") - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset(seed=42) - mock_task.setup_demo.assert_called_once_with(seed=42, is_test=True) + # setup_demo receives the full YAML-derived kwargs plus seed + is_test; + # we only assert the caller-provided bits. + assert mock_task.setup_demo.call_count == 1 + call_kwargs = mock_task.setup_demo.call_args.kwargs + assert call_kwargs["seed"] == 42 + assert call_kwargs["is_test"] is True def test_step_returns_correct_types(self): mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer") action = np.zeros(ACTION_DIM, dtype=np.float32) - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset() obs, reward, terminated, truncated, info = env.step(action) @@ -129,17 +162,17 @@ class TestRoboTwinEnv: mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer") bad_action = np.zeros(7, dtype=np.float32) # wrong dim - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset() with pytest.raises(ValueError, match="Expected 1-D action"): env.step(bad_action) def test_success_terminates_episode(self): mock_task = _make_mock_task_env() - mock_task.check_success.return_value = True # always succeeds + mock_task.check_success.return_value = True env = RoboTwinEnv(task_name="beat_block_hammer") action = np.zeros(ACTION_DIM, dtype=np.float32) - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset() _, _, terminated, _, info = env.step(action) assert terminated is True @@ -149,7 +182,7 @@ class TestRoboTwinEnv: mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer", episode_length=2) action = np.zeros(ACTION_DIM, dtype=np.float32) - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset() env.step(action) # step 1 _, _, _, truncated, _ = env.step(action) # step 2 → truncated @@ -158,25 +191,25 @@ class TestRoboTwinEnv: def test_close_calls_close_env(self): mock_task = _make_mock_task_env() env = RoboTwinEnv(task_name="beat_block_hammer") - with _patch_load(mock_task): + with _patch_runtime(mock_task): env.reset() env.close() - mock_task.close_env.assert_called_once_with(clearance=False) + mock_task.close_env.assert_called_once() def test_black_frame_for_missing_camera(self): """If a camera key is absent from get_obs(), a black frame is returned.""" - mock_task = _make_mock_task_env(cameras=("head_camera",)) # only head_camera + # Mock exposes only head_camera; we ask for both head_camera + left_camera. + mock_task = _make_mock_task_env(height=10, width=10, cameras=("head_camera",)) env = RoboTwinEnv( task_name="beat_block_hammer", - camera_names=["head_camera", "front_camera"], + camera_names=["head_camera", "left_camera"], observation_height=10, observation_width=10, ) - with _patch_load(mock_task): + with _patch_runtime(mock_task): obs, _ = env.reset() - # front_camera is missing from mock → should be a black 10×10×3 array - assert obs["pixels"]["front_camera"].shape == (10, 10, 3) - assert obs["pixels"]["front_camera"].sum() == 0 + assert obs["pixels"]["left_camera"].shape == (10, 10, 3) + assert obs["pixels"]["left_camera"].sum() == 0 def test_task_and_task_description_attributes(self): env = RoboTwinEnv(task_name="beat_block_hammer") @@ -196,12 +229,11 @@ class TestRoboTwinEnv: class TestCreateRoboTwinEnvs: def test_returns_correct_structure(self): mock_task = _make_mock_task_env() - env_cls = gym.vector.SyncVectorEnv - with _patch_load(mock_task): + with _patch_runtime(mock_task): envs = create_robotwin_envs( task="beat_block_hammer", n_envs=1, - env_cls=env_cls, + env_cls=gym.vector.SyncVectorEnv, ) assert "beat_block_hammer" in envs assert 0 in envs["beat_block_hammer"] @@ -209,12 +241,11 @@ class TestCreateRoboTwinEnvs: def test_multi_task(self): mock_task = _make_mock_task_env() - env_cls = gym.vector.SyncVectorEnv - with _patch_load(mock_task): + with _patch_runtime(mock_task): envs = create_robotwin_envs( task="beat_block_hammer,click_bell", n_envs=1, - env_cls=env_cls, + env_cls=gym.vector.SyncVectorEnv, ) assert set(envs.keys()) == {"beat_block_hammer", "click_bell"} @@ -265,12 +296,12 @@ class TestRoboTwinProcessorStep: step = RoboTwinProcessorStep() obs = { f"{OBS_IMAGES}.head_camera": torch.zeros(1, 3, 4, 4), - f"{OBS_IMAGES}.front_camera": torch.zeros(1, 3, 4, 4), + f"{OBS_IMAGES}.left_camera": torch.zeros(1, 3, 4, 4), OBS_STATE: torch.zeros(1, 14), } result = step.observation(obs) assert f"{OBS_IMAGES}.head_camera" in result - assert f"{OBS_IMAGES}.front_camera" in result + assert f"{OBS_IMAGES}.left_camera" in result assert result[OBS_STATE].dtype == torch.float32 def test_state_cast_to_float32(self):