Files
lerobot/tests/envs/test_robotwin.py
T
Pepijn ad9662b4a8 fix(robotwin): defer YAML lookup and realign tests with current API
__init__ was eagerly calling _load_robotwin_setup_kwargs just to read
head_camera_h/w from the YAML. That import (`from envs import CONFIGS_PATH`)
required a real RoboTwin install, so constructing the env — and thus every
test in tests/envs/test_robotwin.py — blew up with ModuleNotFoundError
on fast-tests where RoboTwin isn't installed.

Replace the eager lookup with DEFAULT_CAMERA_H/W constants (240×320, the
D435 dims baked into task_config/demo_clean.yml). reset() still resolves
the full setup_kwargs lazily — that's fine because reset() is only
called inside the benchmark Docker image where RoboTwin is present.

Also resync the test file with the current env API:
  - mock get_obs() as the real nested {"observation": {cam: {"rgb": …}},
    "joint_action": {"vector": …}} shape
  - patch both _load_robotwin_task and _load_robotwin_setup_kwargs
    (_patch_load → _patch_runtime)
  - drop `front_camera` / `left_wrist` from assertions — aloha-agilex
    exposes head_camera + left_camera + right_camera, not those
  - black-frame test now uses left_camera as the missing camera
  - setup_demo call check loosened to the caller-provided seed/is_test
    bits (full kwargs include the YAML-derived blob)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 11:14:02 +02:00

315 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for the RoboTwin 2.0 Gymnasium wrapper.
These tests mock out the SAPIEN-based RoboTwin runtime (task modules +
YAML config loader) so they run without the full RoboTwin installation
(SAPIEN, CuRobo, mplib, asset downloads, etc.).
"""
from __future__ import annotations
from contextlib import contextmanager
from unittest.mock import MagicMock, patch
import gymnasium as gym
import numpy as np
import pytest
import torch
from lerobot.envs.robotwin import (
ACTION_DIM,
ROBOTWIN_CAMERA_NAMES,
ROBOTWIN_TASKS,
RoboTwinEnv,
create_robotwin_envs,
)
# ---------------------------------------------------------------------------
# Fixtures / helpers
# ---------------------------------------------------------------------------
def _make_mock_task_env(
height: int = 240,
width: int = 320,
cameras: tuple[str, ...] = ROBOTWIN_CAMERA_NAMES,
) -> MagicMock:
"""Return a mock that mimics the RoboTwin task class API.
RoboTwin's real get_obs returns
{"observation": {cam: {"rgb": img}}, "joint_action": {"vector": np.ndarray}, ...}
so the mock follows the same nested shape.
"""
obs_dict = {
"observation": {cam: {"rgb": np.zeros((height, width, 3), dtype=np.uint8)} for cam in cameras},
"joint_action": {"vector": np.zeros(ACTION_DIM, dtype=np.float64)},
"endpose": {},
}
mock = MagicMock()
mock.get_obs.return_value = obs_dict
mock.setup_demo.return_value = None
mock.take_action.return_value = None
mock.eval_success = False
mock.check_success.return_value = False
mock.close_env.return_value = None
return mock
@contextmanager
def _patch_runtime(mock_task_instance: MagicMock):
"""Patch both the task-class loader and the YAML config loader so the
env can construct + reset without a real RoboTwin install."""
task_cls = MagicMock(return_value=mock_task_instance)
fake_setup = {
"head_camera_h": 240,
"head_camera_w": 320,
"left_embodiment_config": {},
"right_embodiment_config": {},
"left_robot_file": "",
"right_robot_file": "",
"dual_arm_embodied": True,
"render_freq": 0,
"task_name": "beat_block_hammer",
"task_config": "demo_clean",
}
with (
patch("lerobot.envs.robotwin._load_robotwin_task", return_value=task_cls),
patch("lerobot.envs.robotwin._load_robotwin_setup_kwargs", return_value=fake_setup),
):
yield
# ---------------------------------------------------------------------------
# RoboTwinEnv unit tests
# ---------------------------------------------------------------------------
class TestRoboTwinEnv:
def test_observation_space_shape(self):
"""observation_space should have the configured h×w×3 for every camera."""
h, w = 240, 320
env = RoboTwinEnv(
task_name="beat_block_hammer",
observation_height=h,
observation_width=w,
camera_names=["head_camera", "left_camera"],
)
pixels_space = env.observation_space["pixels"]
assert pixels_space["head_camera"].shape == (h, w, 3)
assert pixels_space["left_camera"].shape == (h, w, 3)
assert "right_camera" not in pixels_space
def test_action_space(self):
env = RoboTwinEnv(task_name="beat_block_hammer")
assert env.action_space.shape == (ACTION_DIM,)
assert env.action_space.dtype == np.float32
def test_reset_returns_correct_obs_keys(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer")
with _patch_runtime(mock_task):
obs, info = env.reset()
assert "pixels" in obs
for cam in ROBOTWIN_CAMERA_NAMES:
assert cam in obs["pixels"], f"Missing camera '{cam}' in obs"
assert "agent_pos" in obs
assert obs["agent_pos"].shape == (ACTION_DIM,)
assert info["is_success"] is False
def test_reset_calls_setup_demo(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer")
with _patch_runtime(mock_task):
env.reset(seed=42)
# setup_demo receives the full YAML-derived kwargs plus seed + is_test;
# we only assert the caller-provided bits.
assert mock_task.setup_demo.call_count == 1
call_kwargs = mock_task.setup_demo.call_args.kwargs
assert call_kwargs["seed"] == 42
assert call_kwargs["is_test"] is True
def test_step_returns_correct_types(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer")
action = np.zeros(ACTION_DIM, dtype=np.float32)
with _patch_runtime(mock_task):
env.reset()
obs, reward, terminated, truncated, info = env.step(action)
assert isinstance(obs, dict)
assert isinstance(reward, float)
assert isinstance(terminated, bool)
assert isinstance(truncated, bool)
assert isinstance(info, dict)
def test_step_wrong_action_shape_raises(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer")
bad_action = np.zeros(7, dtype=np.float32) # wrong dim
with _patch_runtime(mock_task):
env.reset()
with pytest.raises(ValueError, match="Expected 1-D action"):
env.step(bad_action)
def test_success_terminates_episode(self):
mock_task = _make_mock_task_env()
mock_task.check_success.return_value = True
env = RoboTwinEnv(task_name="beat_block_hammer")
action = np.zeros(ACTION_DIM, dtype=np.float32)
with _patch_runtime(mock_task):
env.reset()
_, _, terminated, _, info = env.step(action)
assert terminated is True
assert info["is_success"] is True
def test_truncation_after_episode_length(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer", episode_length=2)
action = np.zeros(ACTION_DIM, dtype=np.float32)
with _patch_runtime(mock_task):
env.reset()
env.step(action) # step 1
_, _, _, truncated, _ = env.step(action) # step 2 → truncated
assert truncated is True
def test_close_calls_close_env(self):
mock_task = _make_mock_task_env()
env = RoboTwinEnv(task_name="beat_block_hammer")
with _patch_runtime(mock_task):
env.reset()
env.close()
mock_task.close_env.assert_called_once()
def test_black_frame_for_missing_camera(self):
"""If a camera key is absent from get_obs(), a black frame is returned."""
# Mock exposes only head_camera; we ask for both head_camera + left_camera.
mock_task = _make_mock_task_env(height=10, width=10, cameras=("head_camera",))
env = RoboTwinEnv(
task_name="beat_block_hammer",
camera_names=["head_camera", "left_camera"],
observation_height=10,
observation_width=10,
)
with _patch_runtime(mock_task):
obs, _ = env.reset()
assert obs["pixels"]["left_camera"].shape == (10, 10, 3)
assert obs["pixels"]["left_camera"].sum() == 0
def test_task_and_task_description_attributes(self):
env = RoboTwinEnv(task_name="beat_block_hammer")
assert env.task == "beat_block_hammer"
assert isinstance(env.task_description, str)
def test_deferred_init_env_is_none_before_reset(self):
env = RoboTwinEnv(task_name="beat_block_hammer")
assert env._env is None # noqa: SLF001 (testing internal state)
# ---------------------------------------------------------------------------
# create_robotwin_envs tests
# ---------------------------------------------------------------------------
class TestCreateRoboTwinEnvs:
def test_returns_correct_structure(self):
mock_task = _make_mock_task_env()
with _patch_runtime(mock_task):
envs = create_robotwin_envs(
task="beat_block_hammer",
n_envs=1,
env_cls=gym.vector.SyncVectorEnv,
)
assert "beat_block_hammer" in envs
assert 0 in envs["beat_block_hammer"]
assert isinstance(envs["beat_block_hammer"][0], gym.vector.SyncVectorEnv)
def test_multi_task(self):
mock_task = _make_mock_task_env()
with _patch_runtime(mock_task):
envs = create_robotwin_envs(
task="beat_block_hammer,click_bell",
n_envs=1,
env_cls=gym.vector.SyncVectorEnv,
)
assert set(envs.keys()) == {"beat_block_hammer", "click_bell"}
def test_unknown_task_raises(self):
with pytest.raises(ValueError, match="Unknown RoboTwin tasks"):
create_robotwin_envs(
task="not_a_real_task",
n_envs=1,
env_cls=gym.vector.SyncVectorEnv,
)
def test_invalid_n_envs_raises(self):
with pytest.raises(ValueError, match="n_envs must be a positive int"):
create_robotwin_envs(
task="beat_block_hammer",
n_envs=0,
env_cls=gym.vector.SyncVectorEnv,
)
# ---------------------------------------------------------------------------
# ROBOTWIN_TASKS list
# ---------------------------------------------------------------------------
def test_task_list_not_empty():
assert len(ROBOTWIN_TASKS) >= 60
def test_all_tasks_are_strings():
assert all(isinstance(t, str) and t for t in ROBOTWIN_TASKS)
def test_no_duplicate_tasks():
assert len(ROBOTWIN_TASKS) == len(set(ROBOTWIN_TASKS))
# ---------------------------------------------------------------------------
# RoboTwinProcessorStep
# ---------------------------------------------------------------------------
class TestRoboTwinProcessorStep:
def test_passes_through_images_and_state(self):
from lerobot.processor.env_processor import RoboTwinProcessorStep
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
step = RoboTwinProcessorStep()
obs = {
f"{OBS_IMAGES}.head_camera": torch.zeros(1, 3, 4, 4),
f"{OBS_IMAGES}.left_camera": torch.zeros(1, 3, 4, 4),
OBS_STATE: torch.zeros(1, 14),
}
result = step.observation(obs)
assert f"{OBS_IMAGES}.head_camera" in result
assert f"{OBS_IMAGES}.left_camera" in result
assert result[OBS_STATE].dtype == torch.float32
def test_state_cast_to_float32(self):
from lerobot.processor.env_processor import RoboTwinProcessorStep
from lerobot.utils.constants import OBS_STATE
step = RoboTwinProcessorStep()
obs = {OBS_STATE: torch.zeros(1, 14, dtype=torch.float64)}
result = step.observation(obs)
assert result[OBS_STATE].dtype == torch.float32