mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-14 16:19:45 +00:00
818892a38b
* feat: HIL data collection, RTC interpolator, and action queue improvements - Add Human-in-the-Loop (HIL) data collection examples (sync + RTC) - Add HIL data collection documentation - Add ActionInterpolator for smoother policy control at higher rates - Integrate interpolator into lerobot-record and eval_with_real_robot - Add action queue clear() and get_processed_left_over() methods - Add rtc/__init__.py for cleaner imports * docs: expand Related Work section with paper summaries * fix: only record dataset frames at original fps, not at interpolated rate The interpolator speeds up robot control (e.g. 2x) but dataset frames should still be recorded at the original fps. Interpolated-only iterations now only send actions to the robot without writing to the dataset. * refactor: merge HIL sync and RTC scripts into single file with --rtc.enabled toggle Combines hil_data_collection.py and hil_data_collection_rtc.py into one script. RTC is toggled via --rtc.enabled=true (defaults to off for sync inference). Deletes the separate hil_data_collection_rtc.py and updates docs to reflect the single-script usage. * test: add ActionInterpolator test suite (29 tests) Covers constructor validation, passthrough (multiplier=1), 2x and 3x interpolation with exact value checks, reset/episode boundaries, control interval calculation, multi-dim actions, and simulated control loop integration. * test: add ActionQueue + ActionInterpolator integration tests Verifies the interpolator doesn't interfere with RTC's leftover chunk tracking: queue consumption rate matches base fps regardless of multiplier, get_left_over/get_processed_left_over only change on queue.get(), merge preserves smooth interpolation across chunks, and interpolator reset is independent of queue state. * feat: register SO follower/leader configs in HIL script Adds SOFollowerRobotConfig and SOLeaderTeleopConfig imports so SO100/SO101 robots can be used via --robot.type=so_follower and --teleop.type=so_leader. Updates docs accordingly. Made-with: Cursor * docs: remove em dashes from HIL documentation Made-with: Cursor * refactor: rename examples/rac to examples/hil Updates directory name and all references in docs and script docstrings. Made-with: Cursor * fix: encorperate pr feedback comments * refactor(tests): enhance ActionInterpolator test structure and add detailed docstrings * feedback pr and test fix * fix(test): pass correct real_delay in interpolator delay test The test was passing real_delay=0 and relying on _check_delays to silently override it with the index-based diff. Now passes real_delay=3 to match the 3 actions consumed during the simulated inference period. * fix pr feedback * ordering * update hil script * fix * default name * fix(bi_openarm): use kw_only=True to fix dataclass field ordering BiOpenArmFollowerConfig overrides `id` with a default, making it positional in the child — non-default `left_arm_config` then follows a default field, which Python dataclasses forbid. Adding kw_only=True (matching the parent RobotConfig) removes positional constraints. Made-with: Cursor * style: format long line in hil_data_collection.py Made-with: Cursor * pr feedback --------- Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co>
229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Shared utilities for Human-in-the-Loop data collection scripts."""
|
|
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from lerobot.processor import (
|
|
IdentityProcessorStep,
|
|
RobotAction,
|
|
RobotObservation,
|
|
RobotProcessorPipeline,
|
|
)
|
|
from lerobot.processor.converters import (
|
|
observation_to_transition,
|
|
robot_action_observation_to_transition,
|
|
transition_to_observation,
|
|
transition_to_robot_action,
|
|
)
|
|
from lerobot.robots import Robot
|
|
from lerobot.teleoperators import Teleoperator
|
|
from lerobot.utils.control_utils import is_headless
|
|
from lerobot.utils.robot_utils import precise_sleep
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class HILDatasetConfig:
|
|
repo_id: str
|
|
single_task: str
|
|
root: str | Path | None = None
|
|
fps: int = 30
|
|
episode_time_s: float = 120
|
|
num_episodes: int = 50
|
|
video: bool = True
|
|
push_to_hub: bool = True
|
|
private: bool = False
|
|
tags: list[str] | None = None
|
|
num_image_writer_processes: int = 0
|
|
num_image_writer_threads_per_camera: int = 4
|
|
video_encoding_batch_size: int = 1
|
|
vcodec: str = "auto"
|
|
streaming_encoding: bool = True
|
|
encoder_queue_maxsize: int = 30
|
|
encoder_threads: int | None = None
|
|
rename_map: dict[str, str] = field(default_factory=dict)
|
|
|
|
|
|
def teleop_has_motor_control(teleop: Teleoperator) -> bool:
|
|
"""Check if teleoperator has motor control capabilities."""
|
|
return all(hasattr(teleop, attr) for attr in ("enable_torque", "disable_torque", "write_goal_positions"))
|
|
|
|
|
|
def teleop_disable_torque(teleop: Teleoperator) -> None:
|
|
"""Disable teleop torque if supported."""
|
|
if hasattr(teleop, "disable_torque"):
|
|
teleop.disable_torque()
|
|
|
|
|
|
def teleop_enable_torque(teleop: Teleoperator) -> None:
|
|
"""Enable teleop torque if supported."""
|
|
if hasattr(teleop, "enable_torque"):
|
|
teleop.enable_torque()
|
|
|
|
|
|
def teleop_smooth_move_to(teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50):
|
|
"""Smoothly move teleop to target position if motor control is available."""
|
|
if not teleop_has_motor_control(teleop):
|
|
logger.warning("Teleop does not support motor control - cannot mirror robot position")
|
|
return
|
|
|
|
teleop_enable_torque(teleop)
|
|
current = teleop.get_action()
|
|
steps = max(int(duration_s * fps), 1)
|
|
|
|
for step in range(steps + 1):
|
|
t = step / steps
|
|
interp = {}
|
|
for k in current:
|
|
if k in target_pos:
|
|
interp[k] = current[k] * (1 - t) + target_pos[k] * t
|
|
else:
|
|
interp[k] = current[k]
|
|
teleop.write_goal_positions(interp)
|
|
time.sleep(1 / fps)
|
|
|
|
|
|
def init_keyboard_listener():
|
|
"""Initialize keyboard listener with HIL controls."""
|
|
events = {
|
|
"exit_early": False,
|
|
"rerecord_episode": False,
|
|
"stop_recording": False,
|
|
"policy_paused": False,
|
|
"correction_active": False,
|
|
"resume_policy": False,
|
|
"in_reset": False,
|
|
"start_next_episode": False,
|
|
}
|
|
|
|
if is_headless():
|
|
logger.warning("Headless environment - keyboard controls unavailable")
|
|
return None, events
|
|
|
|
from pynput import keyboard
|
|
|
|
def on_press(key):
|
|
try:
|
|
if events["in_reset"]:
|
|
if key in [keyboard.Key.space, keyboard.Key.right]:
|
|
logger.info("[HIL] Starting next episode...")
|
|
events["start_next_episode"] = True
|
|
elif hasattr(key, "char") and key.char == "c":
|
|
events["start_next_episode"] = True
|
|
elif key == keyboard.Key.esc:
|
|
logger.info("[HIL] ESC - Stop recording, pushing to hub...")
|
|
events["stop_recording"] = True
|
|
events["start_next_episode"] = True
|
|
else:
|
|
if key == keyboard.Key.space:
|
|
if not events["policy_paused"] and not events["correction_active"]:
|
|
logger.info("[HIL] PAUSED - Press 'c' to take control or 'p' to resume policy")
|
|
events["policy_paused"] = True
|
|
elif hasattr(key, "char") and key.char == "c":
|
|
if events["policy_paused"] and not events["correction_active"]:
|
|
logger.info("[HIL] Taking control...")
|
|
events["start_next_episode"] = True
|
|
elif hasattr(key, "char") and key.char == "p":
|
|
if events["policy_paused"] or events["correction_active"]:
|
|
logger.info("[HIL] Resuming policy...")
|
|
events["resume_policy"] = True
|
|
elif key == keyboard.Key.right:
|
|
logger.info("[HIL] End episode")
|
|
events["exit_early"] = True
|
|
elif key == keyboard.Key.left:
|
|
logger.info("[HIL] Re-record episode")
|
|
events["rerecord_episode"] = True
|
|
events["exit_early"] = True
|
|
elif key == keyboard.Key.esc:
|
|
logger.info("[HIL] ESC - Stop recording...")
|
|
events["stop_recording"] = True
|
|
events["exit_early"] = True
|
|
except Exception as e:
|
|
logger.info(f"Key error: {e}")
|
|
|
|
listener = keyboard.Listener(on_press=on_press)
|
|
listener.start()
|
|
return listener, events
|
|
|
|
|
|
def make_identity_processors():
|
|
"""Create identity processors for recording."""
|
|
teleop_proc = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
|
steps=[IdentityProcessorStep()],
|
|
to_transition=robot_action_observation_to_transition,
|
|
to_output=transition_to_robot_action,
|
|
)
|
|
obs_proc = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
|
steps=[IdentityProcessorStep()],
|
|
to_transition=observation_to_transition,
|
|
to_output=transition_to_observation,
|
|
)
|
|
return teleop_proc, obs_proc
|
|
|
|
|
|
def reset_loop(robot: Robot, teleop: Teleoperator, events: dict, fps: int):
|
|
"""Reset period where human repositions environment."""
|
|
logger.info("[HIL] RESET")
|
|
|
|
events["in_reset"] = True
|
|
events["start_next_episode"] = False
|
|
|
|
obs = robot.get_observation()
|
|
robot_pos = {k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features}
|
|
teleop_smooth_move_to(teleop, robot_pos, duration_s=2.0, fps=50)
|
|
|
|
logger.info("Press any key to enable teleoperation")
|
|
while not events["start_next_episode"] and not events["stop_recording"]:
|
|
precise_sleep(0.05)
|
|
|
|
if events["stop_recording"]:
|
|
return
|
|
|
|
events["start_next_episode"] = False
|
|
teleop_disable_torque(teleop)
|
|
logger.info("Teleop enabled - press any key to start episode")
|
|
|
|
while not events["start_next_episode"] and not events["stop_recording"]:
|
|
loop_start = time.perf_counter()
|
|
action = teleop.get_action()
|
|
robot.send_action(action)
|
|
precise_sleep(1 / fps - (time.perf_counter() - loop_start))
|
|
|
|
events["in_reset"] = False
|
|
events["start_next_episode"] = False
|
|
events["exit_early"] = False
|
|
events["policy_paused"] = False
|
|
events["correction_active"] = False
|
|
events["resume_policy"] = False
|
|
|
|
|
|
def print_controls(rtc: bool = False):
|
|
"""Print control instructions."""
|
|
mode = "Human-in-the-Loop Data Collection" + (" (RTC)" if rtc else "")
|
|
logger.info(
|
|
"%s\n Controls:\n"
|
|
" SPACE - Pause policy\n"
|
|
" c - Take control\n"
|
|
" p - Resume policy after pause/correction\n"
|
|
" → - End episode\n"
|
|
" ESC - Stop and push to hub",
|
|
mode,
|
|
)
|