mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-21 11:39:50 +00:00
Merge remote-tracking branch 'origin/main' into user/rcadene/2025_04_11_dataset_v3
This commit is contained in:
@@ -1,144 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Tests for physical robots and their mocked versions.
|
||||
If the physical robots are not connected to the computer, or not working,
|
||||
the test will be skipped.
|
||||
|
||||
Example of running a specific test:
|
||||
```bash
|
||||
pytest -sx tests/test_robots.py::test_robot
|
||||
```
|
||||
|
||||
Example of running test on real robots connected to the computer:
|
||||
```bash
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch-False]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch_bimanual-False]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[aloha-False]'
|
||||
```
|
||||
|
||||
Example of running test on a mocked version of robots:
|
||||
```bash
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch-True]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch_bimanual-True]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[aloha-True]'
|
||||
```
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.common.robot_devices.robots.utils import make_robot
|
||||
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||
from tests.utils import TEST_ROBOT_TYPES, mock_calibration_dir, require_robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_robot(tmp_path, request, robot_type, mock):
|
||||
# TODO(rcadene): measure fps in nightly?
|
||||
# TODO(rcadene): test logs
|
||||
# TODO(rcadene): add compatibility with other robots
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if robot_type == "aloha" and mock:
|
||||
# To simplify unit test, we do not rerun manual calibration for Aloha mock=True.
|
||||
# Instead, we use the files from '.cache/calibration/aloha_default'
|
||||
pass
|
||||
else:
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
|
||||
# Test using robot before connecting raises an error
|
||||
robot = make_robot(**robot_kwargs)
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
robot.teleop_step()
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
robot.teleop_step(record_data=True)
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
robot.capture_observation()
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
robot.send_action(None)
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
robot.disconnect()
|
||||
|
||||
# Test deleting the object without connecting first
|
||||
del robot
|
||||
|
||||
# Test connecting (triggers manual calibration)
|
||||
robot = make_robot(**robot_kwargs)
|
||||
robot.connect()
|
||||
assert robot.is_connected
|
||||
|
||||
# Test connecting twice raises an error
|
||||
with pytest.raises(RobotDeviceAlreadyConnectedError):
|
||||
robot.connect()
|
||||
|
||||
# TODO(rcadene, aliberts): Test disconnecting with `__del__` instead of `disconnect`
|
||||
# del robot
|
||||
robot.disconnect()
|
||||
|
||||
# Test teleop can run
|
||||
robot = make_robot(**robot_kwargs)
|
||||
robot.connect()
|
||||
robot.teleop_step()
|
||||
|
||||
# Test data recorded during teleop are well formatted
|
||||
observation, action = robot.teleop_step(record_data=True)
|
||||
# State
|
||||
assert "observation.state" in observation
|
||||
assert isinstance(observation["observation.state"], torch.Tensor)
|
||||
assert observation["observation.state"].ndim == 1
|
||||
dim_state = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||||
assert observation["observation.state"].shape[0] == dim_state
|
||||
# Cameras
|
||||
for name in robot.cameras:
|
||||
assert f"observation.images.{name}" in observation
|
||||
assert isinstance(observation[f"observation.images.{name}"], torch.Tensor)
|
||||
assert observation[f"observation.images.{name}"].ndim == 3
|
||||
# Action
|
||||
assert "action" in action
|
||||
assert isinstance(action["action"], torch.Tensor)
|
||||
assert action["action"].ndim == 1
|
||||
dim_action = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||||
assert action["action"].shape[0] == dim_action
|
||||
# TODO(rcadene): test if observation and action data are returned as expected
|
||||
|
||||
# Test capture_observation can run and observation returned are the same (since the arm didnt move)
|
||||
captured_observation = robot.capture_observation()
|
||||
assert set(captured_observation.keys()) == set(observation.keys())
|
||||
for name in captured_observation:
|
||||
if "image" in name:
|
||||
# TODO(rcadene): skipping image for now as it's challenging to assess equality between two consecutive frames
|
||||
continue
|
||||
torch.testing.assert_close(captured_observation[name], observation[name], rtol=1e-4, atol=1)
|
||||
assert captured_observation[name].shape == observation[name].shape
|
||||
|
||||
# Test send_action can run
|
||||
robot.send_action(action["action"])
|
||||
|
||||
# Test disconnecting
|
||||
robot.disconnect()
|
||||
assert not robot.is_connected
|
||||
for name in robot.follower_arms:
|
||||
assert not robot.follower_arms[name].is_connected
|
||||
for name in robot.leader_arms:
|
||||
assert not robot.leader_arms[name].is_connected
|
||||
for name in robot.cameras:
|
||||
assert not robot.cameras[name].is_connected
|
||||
@@ -0,0 +1,95 @@
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from lerobot.common.robots.so100_follower import (
|
||||
SO100Follower,
|
||||
SO100FollowerConfig,
|
||||
)
|
||||
|
||||
|
||||
def _make_bus_mock() -> MagicMock:
|
||||
"""Return a bus mock with just the attributes used by the robot."""
|
||||
bus = MagicMock(name="FeetechBusMock")
|
||||
bus.is_connected = False
|
||||
|
||||
def _connect():
|
||||
bus.is_connected = True
|
||||
|
||||
def _disconnect(_disable=True):
|
||||
bus.is_connected = False
|
||||
|
||||
bus.connect.side_effect = _connect
|
||||
bus.disconnect.side_effect = _disconnect
|
||||
|
||||
@contextmanager
|
||||
def _dummy_cm():
|
||||
yield
|
||||
|
||||
bus.torque_disabled.side_effect = _dummy_cm
|
||||
|
||||
return bus
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def follower():
|
||||
bus_mock = _make_bus_mock()
|
||||
|
||||
def _bus_side_effect(*_args, **kwargs):
|
||||
bus_mock.motors = kwargs["motors"]
|
||||
motors_order: list[str] = list(bus_mock.motors)
|
||||
|
||||
bus_mock.sync_read.return_value = {motor: idx for idx, motor in enumerate(motors_order, 1)}
|
||||
bus_mock.sync_write.return_value = None
|
||||
bus_mock.write.return_value = None
|
||||
bus_mock.disable_torque.return_value = None
|
||||
bus_mock.enable_torque.return_value = None
|
||||
bus_mock.is_calibrated = True
|
||||
return bus_mock
|
||||
|
||||
with (
|
||||
patch(
|
||||
"lerobot.common.robots.so100_follower.so100_follower.FeetechMotorsBus",
|
||||
side_effect=_bus_side_effect,
|
||||
),
|
||||
patch.object(SO100Follower, "configure", lambda self: None),
|
||||
):
|
||||
cfg = SO100FollowerConfig(port="/dev/null")
|
||||
robot = SO100Follower(cfg)
|
||||
yield robot
|
||||
if robot.is_connected:
|
||||
robot.disconnect()
|
||||
|
||||
|
||||
def test_connect_disconnect(follower):
|
||||
assert not follower.is_connected
|
||||
|
||||
follower.connect()
|
||||
assert follower.is_connected
|
||||
|
||||
follower.disconnect()
|
||||
assert not follower.is_connected
|
||||
|
||||
|
||||
def test_get_observation(follower):
|
||||
follower.connect()
|
||||
obs = follower.get_observation()
|
||||
|
||||
expected_keys = {f"{m}.pos" for m in follower.bus.motors}
|
||||
assert set(obs.keys()) == expected_keys
|
||||
|
||||
for idx, motor in enumerate(follower.bus.motors, 1):
|
||||
assert obs[f"{motor}.pos"] == idx
|
||||
|
||||
|
||||
def test_send_action(follower):
|
||||
follower.connect()
|
||||
|
||||
action = {f"{m}.pos": i * 10 for i, m in enumerate(follower.bus.motors, 1)}
|
||||
returned = follower.send_action(action)
|
||||
|
||||
assert returned == action
|
||||
|
||||
goal_pos = {m: (i + 1) * 10 for i, m in enumerate(follower.bus.motors)}
|
||||
follower.bus.sync_write.assert_called_once_with("Goal_Position", goal_pos)
|
||||
Reference in New Issue
Block a user