Files
lerobot/tests/test_control_robot.py
Steven Palma ca87ccd941 feat(rollout): decouple policy deployment from data recording with new lerobot-rollout CLI (#3413)
* feat(scripts): lerobot-rollout

* fix(rollout) require dataset in dagger + use duration too

* fix(docs): dagger num_episodes

* test(rollout): fix expectations

* fix(rollout): features check

* fix(rollout): device and task propagation + feature pos + warn fps + move rename_map config

* docs(rollout): edit rename_map instructions

* chore(rollout): multiple minor improvements

* chore(rollout): address coments + minor improvements

* fix(rollout): enable default

* fix(tests): default value RTCConfig

* fix(rollout): robot_observation_processor and notify_observation at policy frequency instead of interpolator rate

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): prevent relativeactions with sync inference engine

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): rtc reanchor to non normalized state

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): fixing the episode length to use hwc (#3469)

also reducing default length to 5 minutes

* feat(rollout): go back to initial position is now a config

* fix(rollout): properly propagating video_files_size_in_mb to lerobot_dataset (#3470)

* chore(rollout): note about dagger correction stage

* chore(docs): update comments and docstring

* fix(test): move rtc relative out of rollout module

* fix(rollout): address the review comments

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Maxime Ellerbach <maxime.ellerbach@huggingface.co>
2026-04-28 00:57:35 +02:00

130 lines
4.2 KiB
Python

#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import patch
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("deepdiff", reason="deepdiff is required (install lerobot[hardware])")
from lerobot.configs.dataset import DatasetRecordConfig
from lerobot.scripts.lerobot_calibrate import CalibrateConfig, calibrate
from lerobot.scripts.lerobot_record import RecordConfig, record
from lerobot.scripts.lerobot_replay import DatasetReplayConfig, ReplayConfig, replay
from lerobot.scripts.lerobot_teleoperate import TeleoperateConfig, teleoperate
from tests.fixtures.constants import DUMMY_REPO_ID
from tests.mocks.mock_robot import MockRobotConfig
from tests.mocks.mock_teleop import MockTeleopConfig
def test_calibrate():
robot_cfg = MockRobotConfig()
cfg = CalibrateConfig(robot=robot_cfg)
calibrate(cfg)
def test_teleoperate():
robot_cfg = MockRobotConfig()
teleop_cfg = MockTeleopConfig()
cfg = TeleoperateConfig(
robot=robot_cfg,
teleop=teleop_cfg,
teleop_time_s=0.1,
)
teleoperate(cfg)
def test_record_and_resume(tmp_path):
robot_cfg = MockRobotConfig()
teleop_cfg = MockTeleopConfig()
dataset_cfg = DatasetRecordConfig(
repo_id=DUMMY_REPO_ID,
single_task="Dummy task",
root=tmp_path / "record",
num_episodes=1,
episode_time_s=0.1,
reset_time_s=0,
push_to_hub=False,
)
cfg = RecordConfig(
robot=robot_cfg,
dataset=dataset_cfg,
teleop=teleop_cfg,
play_sounds=False,
)
dataset = record(cfg)
assert dataset.fps == 30
assert dataset.meta.total_episodes == dataset.num_episodes == 1
assert dataset.meta.total_frames == dataset.num_frames == 3
assert dataset.meta.total_tasks == 1
cfg.resume = True
# Mock the revision to prevent Hub calls during resume
with (
patch("lerobot.datasets.dataset_metadata.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.dataset_metadata.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "record")
dataset = record(cfg)
assert dataset.meta.total_episodes == dataset.num_episodes == 2
assert dataset.meta.total_frames == dataset.num_frames == 6
assert dataset.meta.total_tasks == 1
def test_record_and_replay(tmp_path):
robot_cfg = MockRobotConfig()
teleop_cfg = MockTeleopConfig()
record_dataset_cfg = DatasetRecordConfig(
repo_id=DUMMY_REPO_ID,
single_task="Dummy task",
root=tmp_path / "record_and_replay",
num_episodes=1,
episode_time_s=0.1,
push_to_hub=False,
)
record_cfg = RecordConfig(
robot=robot_cfg,
dataset=record_dataset_cfg,
teleop=teleop_cfg,
play_sounds=False,
)
replay_dataset_cfg = DatasetReplayConfig(
repo_id=DUMMY_REPO_ID,
episode=0,
root=tmp_path / "record_and_replay",
)
replay_cfg = ReplayConfig(
robot=robot_cfg,
dataset=replay_dataset_cfg,
play_sounds=False,
)
record(record_cfg)
# Mock the revision to prevent Hub calls during replay
with (
patch("lerobot.datasets.dataset_metadata.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.dataset_metadata.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "record_and_replay")
replay(replay_cfg)