mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-18 02:00:03 +00:00
1157fb11e6
- envs(robotwin): default `observation_height/width` in `create_robotwin_envs` to `DEFAULT_CAMERA_H/W` (240/320) so they match the D435 dims baked into `task_config/demo_clean.yml`. - envs(robotwin): resolve `task_config/demo_clean.yml` via `CONFIGS_PATH` instead of a cwd-relative path; works regardless of where `lerobot-eval` is invoked. - envs(robotwin): replace `print()` calls in `create_robotwin_envs` with `logger.info(...)` (module-level `logger = logging.getLogger`). - envs(robotwin): use `_LazyAsyncVectorEnv` for the async path so async workers start lazily (matches LIBERO / RoboCasa / VLABench). - envs(robotwin): cast `agent_pos` space + joint-state output to float32 end-to-end (was mixed float64/float32). - envs(configs): use the existing `_make_vec_env_cls(use_async, n_envs)` helper in `RoboTwinEnvConfig.create_envs`; drop the `get_env_processors` override so RoboTwin uses the identity processor inherited from `EnvConfig`. - processor: delete `RoboTwinProcessorStep` — the float32 cast now happens in the wrapper itself, so the processor is redundant. - tests: drop the `TestRoboTwinProcessorStep` suite; update the mock obs fixture to use float32 `joint_action.vector`. - ci: hoist `ROBOTWIN_POLICY` and `ROBOTWIN_TASKS` to job-level env vars so the task list and policy aren't duplicated across eval / extract / parse steps. - docker: pin RoboTwin + CuRobo upstream clones to commit SHAs (`RoboTwin@0aeea2d6`, `curobo@ca941586`) for reproducibility. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
283 lines
10 KiB
Python
283 lines
10 KiB
Python
#!/usr/bin/env python
|
||
|
||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
"""Unit tests for the RoboTwin 2.0 Gymnasium wrapper.
|
||
|
||
These tests mock out the SAPIEN-based RoboTwin runtime (task modules +
|
||
YAML config loader) so they run without the full RoboTwin installation
|
||
(SAPIEN, CuRobo, mplib, asset downloads, etc.).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from contextlib import contextmanager
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
import pytest
|
||
|
||
from lerobot.envs.robotwin import (
|
||
ACTION_DIM,
|
||
ROBOTWIN_CAMERA_NAMES,
|
||
ROBOTWIN_TASKS,
|
||
RoboTwinEnv,
|
||
create_robotwin_envs,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixtures / helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _make_mock_task_env(
|
||
height: int = 240,
|
||
width: int = 320,
|
||
cameras: tuple[str, ...] = ROBOTWIN_CAMERA_NAMES,
|
||
) -> MagicMock:
|
||
"""Return a mock that mimics the RoboTwin task class API.
|
||
|
||
RoboTwin's real get_obs returns
|
||
{"observation": {cam: {"rgb": img}}, "joint_action": {"vector": np.ndarray}, ...}
|
||
so the mock follows the same nested shape.
|
||
"""
|
||
obs_dict = {
|
||
"observation": {cam: {"rgb": np.zeros((height, width, 3), dtype=np.uint8)} for cam in cameras},
|
||
"joint_action": {"vector": np.zeros(ACTION_DIM, dtype=np.float32)},
|
||
"endpose": {},
|
||
}
|
||
|
||
mock = MagicMock()
|
||
mock.get_obs.return_value = obs_dict
|
||
mock.setup_demo.return_value = None
|
||
mock.take_action.return_value = None
|
||
mock.eval_success = False
|
||
mock.check_success.return_value = False
|
||
mock.close_env.return_value = None
|
||
return mock
|
||
|
||
|
||
@contextmanager
|
||
def _patch_runtime(mock_task_instance: MagicMock):
|
||
"""Patch both the task-class loader and the YAML config loader so the
|
||
env can construct + reset without a real RoboTwin install."""
|
||
task_cls = MagicMock(return_value=mock_task_instance)
|
||
fake_setup = {
|
||
"head_camera_h": 240,
|
||
"head_camera_w": 320,
|
||
"left_embodiment_config": {},
|
||
"right_embodiment_config": {},
|
||
"left_robot_file": "",
|
||
"right_robot_file": "",
|
||
"dual_arm_embodied": True,
|
||
"render_freq": 0,
|
||
"task_name": "beat_block_hammer",
|
||
"task_config": "demo_clean",
|
||
}
|
||
with (
|
||
patch("lerobot.envs.robotwin._load_robotwin_task", return_value=task_cls),
|
||
patch("lerobot.envs.robotwin._load_robotwin_setup_kwargs", return_value=fake_setup),
|
||
):
|
||
yield
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# RoboTwinEnv unit tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestRoboTwinEnv:
|
||
def test_observation_space_shape(self):
|
||
"""observation_space should have the configured h×w×3 for every camera."""
|
||
h, w = 240, 320
|
||
env = RoboTwinEnv(
|
||
task_name="beat_block_hammer",
|
||
observation_height=h,
|
||
observation_width=w,
|
||
camera_names=["head_camera", "left_camera"],
|
||
)
|
||
pixels_space = env.observation_space["pixels"]
|
||
assert pixels_space["head_camera"].shape == (h, w, 3)
|
||
assert pixels_space["left_camera"].shape == (h, w, 3)
|
||
assert "right_camera" not in pixels_space
|
||
|
||
def test_action_space(self):
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
assert env.action_space.shape == (ACTION_DIM,)
|
||
assert env.action_space.dtype == np.float32
|
||
|
||
def test_reset_returns_correct_obs_keys(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
with _patch_runtime(mock_task):
|
||
obs, info = env.reset()
|
||
|
||
assert "pixels" in obs
|
||
for cam in ROBOTWIN_CAMERA_NAMES:
|
||
assert cam in obs["pixels"], f"Missing camera '{cam}' in obs"
|
||
assert "agent_pos" in obs
|
||
assert obs["agent_pos"].shape == (ACTION_DIM,)
|
||
assert info["is_success"] is False
|
||
|
||
def test_reset_calls_setup_demo(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
with _patch_runtime(mock_task):
|
||
env.reset(seed=42)
|
||
# setup_demo receives the full YAML-derived kwargs plus seed + is_test;
|
||
# we only assert the caller-provided bits.
|
||
assert mock_task.setup_demo.call_count == 1
|
||
call_kwargs = mock_task.setup_demo.call_args.kwargs
|
||
assert call_kwargs["seed"] == 42
|
||
assert call_kwargs["is_test"] is True
|
||
|
||
def test_step_returns_correct_types(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
action = np.zeros(ACTION_DIM, dtype=np.float32)
|
||
with _patch_runtime(mock_task):
|
||
env.reset()
|
||
obs, reward, terminated, truncated, info = env.step(action)
|
||
|
||
assert isinstance(obs, dict)
|
||
assert isinstance(reward, float)
|
||
assert isinstance(terminated, bool)
|
||
assert isinstance(truncated, bool)
|
||
assert isinstance(info, dict)
|
||
|
||
def test_step_wrong_action_shape_raises(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
bad_action = np.zeros(7, dtype=np.float32) # wrong dim
|
||
with _patch_runtime(mock_task):
|
||
env.reset()
|
||
with pytest.raises(ValueError, match="Expected 1-D action"):
|
||
env.step(bad_action)
|
||
|
||
def test_success_terminates_episode(self):
|
||
mock_task = _make_mock_task_env()
|
||
mock_task.check_success.return_value = True
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
action = np.zeros(ACTION_DIM, dtype=np.float32)
|
||
with _patch_runtime(mock_task):
|
||
env.reset()
|
||
_, _, terminated, _, info = env.step(action)
|
||
assert terminated is True
|
||
assert info["is_success"] is True
|
||
|
||
def test_truncation_after_episode_length(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer", episode_length=2)
|
||
action = np.zeros(ACTION_DIM, dtype=np.float32)
|
||
with _patch_runtime(mock_task):
|
||
env.reset()
|
||
env.step(action) # step 1
|
||
_, _, _, truncated, _ = env.step(action) # step 2 → truncated
|
||
assert truncated is True
|
||
|
||
def test_close_calls_close_env(self):
|
||
mock_task = _make_mock_task_env()
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
with _patch_runtime(mock_task):
|
||
env.reset()
|
||
env.close()
|
||
mock_task.close_env.assert_called_once()
|
||
|
||
def test_black_frame_for_missing_camera(self):
|
||
"""If a camera key is absent from get_obs(), a black frame is returned."""
|
||
# Mock exposes only head_camera; we ask for both head_camera + left_camera.
|
||
mock_task = _make_mock_task_env(height=10, width=10, cameras=("head_camera",))
|
||
env = RoboTwinEnv(
|
||
task_name="beat_block_hammer",
|
||
camera_names=["head_camera", "left_camera"],
|
||
observation_height=10,
|
||
observation_width=10,
|
||
)
|
||
with _patch_runtime(mock_task):
|
||
obs, _ = env.reset()
|
||
assert obs["pixels"]["left_camera"].shape == (10, 10, 3)
|
||
assert obs["pixels"]["left_camera"].sum() == 0
|
||
|
||
def test_task_and_task_description_attributes(self):
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
assert env.task == "beat_block_hammer"
|
||
assert isinstance(env.task_description, str)
|
||
|
||
def test_deferred_init_env_is_none_before_reset(self):
|
||
env = RoboTwinEnv(task_name="beat_block_hammer")
|
||
assert env._env is None # noqa: SLF001 (testing internal state)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# create_robotwin_envs tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestCreateRoboTwinEnvs:
|
||
def test_returns_correct_structure(self):
|
||
mock_task = _make_mock_task_env()
|
||
with _patch_runtime(mock_task):
|
||
envs = create_robotwin_envs(
|
||
task="beat_block_hammer",
|
||
n_envs=1,
|
||
env_cls=gym.vector.SyncVectorEnv,
|
||
)
|
||
assert "beat_block_hammer" in envs
|
||
assert 0 in envs["beat_block_hammer"]
|
||
assert isinstance(envs["beat_block_hammer"][0], gym.vector.SyncVectorEnv)
|
||
|
||
def test_multi_task(self):
|
||
mock_task = _make_mock_task_env()
|
||
with _patch_runtime(mock_task):
|
||
envs = create_robotwin_envs(
|
||
task="beat_block_hammer,click_bell",
|
||
n_envs=1,
|
||
env_cls=gym.vector.SyncVectorEnv,
|
||
)
|
||
assert set(envs.keys()) == {"beat_block_hammer", "click_bell"}
|
||
|
||
def test_unknown_task_raises(self):
|
||
with pytest.raises(ValueError, match="Unknown RoboTwin tasks"):
|
||
create_robotwin_envs(
|
||
task="not_a_real_task",
|
||
n_envs=1,
|
||
env_cls=gym.vector.SyncVectorEnv,
|
||
)
|
||
|
||
def test_invalid_n_envs_raises(self):
|
||
with pytest.raises(ValueError, match="n_envs must be a positive int"):
|
||
create_robotwin_envs(
|
||
task="beat_block_hammer",
|
||
n_envs=0,
|
||
env_cls=gym.vector.SyncVectorEnv,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ROBOTWIN_TASKS list
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def test_task_list_not_empty():
|
||
assert len(ROBOTWIN_TASKS) >= 50
|
||
|
||
|
||
def test_all_tasks_are_strings():
|
||
assert all(isinstance(t, str) and t for t in ROBOTWIN_TASKS)
|
||
|
||
|
||
def test_no_duplicate_tasks():
|
||
assert len(ROBOTWIN_TASKS) == len(set(ROBOTWIN_TASKS))
|