diff --git a/docs/source/il_robots.mdx b/docs/source/il_robots.mdx index dc2e02737..53ae5af82 100644 --- a/docs/source/il_robots.mdx +++ b/docs/source/il_robots.mdx @@ -647,5 +647,6 @@ The `--strategy.type` flag selects the execution mode: - `sentry`: Continuous recording with auto-upload (useful for large-scale evaluation) - `highlight`: Ring buffer recording with keystroke save (useful for capturing interesting events) - `dagger`: Human-in-the-loop data collection (see [HIL Data Collection](./hil_data_collection)) +- `episodic`: Episode-oriented policy recording with reset phases between episodes All strategies support `--inference.type=rtc` for smooth execution with slow VLA models (Pi0, Pi0.5, SmolVLA). diff --git a/docs/source/inference.mdx b/docs/source/inference.mdx index b2874d823..78d9faa30 100644 --- a/docs/source/inference.mdx +++ b/docs/source/inference.mdx @@ -157,6 +157,44 @@ Foot pedal input is also supported via `--strategy.input_device=pedal`. Configur | `--strategy.input_device` | Input device: `keyboard` or `pedal` (default: keyboard) | | `--teleop.type` | **Required.** Teleoperator type | +### Episodic (`--strategy.type=episodic`) + +Episode-oriented recording that mirrors the behavior of `lerobot-record`. The policy drives the robot for each episode; an optional teleoperator can drive the robot during the reset phase between episodes. + +```bash +lerobot-rollout \ + --strategy.type=episodic \ + --policy.path=${HF_USER}/my_policy \ + --robot.type=so100_follower \ + --robot.port=/dev/ttyACM0 \ + --teleop.type=so100_leader \ + --teleop.port=/dev/ttyACM1 \ + --dataset.repo_id=${HF_USER}/my_eval_data \ + --dataset.num_episodes=20 \ + --dataset.episode_time_s=30 \ + --dataset.reset_time_s=10 \ + --dataset.single_task="Pick up the red cube" +``` + +Teleop is optional — if omitted the robot holds its position during the reset phase. + +**Keyboard controls:** + +| Key | Action | +| ----------- | -------------------------------- | +| `→` (right) | End the current episode early | +| `←` (left) | Discard episode and re-record it | +| `ESC` | Stop the recording session | + +| Flag | Description | +| ----------------------------------------------- | -------------------------------------------------------------------------- | +| `--dataset.num_episodes` | Number of episodes to record | +| `--dataset.episode_time_s` | Duration of each recording episode in seconds | +| `--dataset.reset_time_s` | Duration of the reset phase between episodes in seconds | +| `--teleop.type` | Optional. Teleoperator to drive the robot during resets | +| `--strategy.reset_to_initial_position` | Whether to reset the robot to its initial position between episodes | +| `--strategy.smooth_leader_to_follower_handover` | Whether to turn on or off the leader -> follower smooth handover behavior. | + --- ## Inference Backends diff --git a/src/lerobot/common/control_utils.py b/src/lerobot/common/control_utils.py index efbcc082f..ddaf77d26 100644 --- a/src/lerobot/common/control_utils.py +++ b/src/lerobot/common/control_utils.py @@ -18,6 +18,7 @@ from __future__ import annotations # Utilities ######################################################################################## import logging +import time import traceback from contextlib import nullcontext from copy import copy @@ -243,3 +244,72 @@ def sanity_check_dataset_robot_compatibility( raise ValueError( "Dataset metadata compatibility check failed with mismatches:\n" + "\n".join(mismatches) ) + + +######################################################################################## +# Teleoperator smooth handover helpers +# NOTE(Maxime): These functions use minimal type hints to maintain compatibility with utils +# being a root module. +######################################################################################## + + +def teleop_supports_feedback(teleop) -> bool: + """Return True when the teleop can receive position feedback (is actuated). + + Actuated teleops (e.g. SO-101, OpenArmMini) have non-empty ``feedback_features`` + and expose ``enable_torque`` / ``disable_torque`` motor-control methods. + + TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. + """ + return ( + bool(teleop.feedback_features) + and hasattr(teleop, "disable_torque") + and hasattr(teleop, "enable_torque") + ) + + +def teleop_smooth_move_to(teleop, target_pos: dict, duration_s: float = 2.0, fps: int = 30) -> None: + """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. + + Requires the teleoperator to support feedback (i.e. have non-empty + ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``). + + ``target_pos`` is expected to be in the teleop's action/feedback key space. + For homogeneous setups (e.g. SO-101 leader + SO-101 follower) this matches + the robot action key space directly. + + TODO(Maxime): This blocks up to ``duration_s`` seconds; during this time the + follower robot does not receive new actions, which could be an issue on LeKiwi. + """ + teleop.enable_torque() + current = teleop.get_action() + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = { + k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current + } + teleop.send_feedback(interp) + time.sleep(1 / fps) + + +def follower_smooth_move_to( + robot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30 +) -> None: + """Smoothly move the follower robot from ``current`` to ``target`` action. + + Used when the teleop is non-actuated: instead of driving the leader arm to + the follower, the follower is brought to the teleop's current pose so the + robot meets the operator's hand rather than jumping to it on the first frame. + + Both ``current`` and ``target`` must be in the robot action key space + (i.e. the output of ``robot_action_processor``). + """ + steps = max(int(duration_s * fps), 1) + + for step in range(steps + 1): + t = step / steps + interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} + robot.send_action(interp) + time.sleep(1 / fps) diff --git a/src/lerobot/rollout/__init__.py b/src/lerobot/rollout/__init__.py index a4de8ee6c..9808e3618 100644 --- a/src/lerobot/rollout/__init__.py +++ b/src/lerobot/rollout/__init__.py @@ -23,6 +23,7 @@ from .configs import ( DAggerKeyboardConfig, DAggerPedalConfig, DAggerStrategyConfig, + EpisodicStrategyConfig, HighlightStrategyConfig, RolloutConfig, RolloutStrategyConfig, @@ -49,6 +50,7 @@ from .inference import ( from .strategies import ( BaseStrategy, DAggerStrategy, + EpisodicStrategy, HighlightStrategy, RolloutStrategy, SentryStrategy, @@ -66,6 +68,8 @@ __all__ = [ "HardwareContext", "HighlightStrategy", "HighlightStrategyConfig", + "EpisodicStrategy", + "EpisodicStrategyConfig", "InferenceEngine", "InferenceEngineConfig", "PolicyContext", diff --git a/src/lerobot/rollout/configs.py b/src/lerobot/rollout/configs.py index 9d019c887..60c47cfba 100644 --- a/src/lerobot/rollout/configs.py +++ b/src/lerobot/rollout/configs.py @@ -121,6 +121,35 @@ class DAggerPedalConfig: upload: str = "KEY_C" +@RolloutStrategyConfig.register_subclass("episodic") +@dataclass +class EpisodicStrategyConfig(RolloutStrategyConfig): + """Episode-oriented recording that mirrors the behavior of ``lerobot-record``. + + Records ``dataset.num_episodes`` episodes of maximum ``dataset.episode_time_s`` each. + After each episode, runs ``dataset.reset_time_s`` seconds of reset time. + + Keyboard controls: + Right arrow — end current episode or reset phase early + Left arrow — discard current episode and re-record + Escape — stop recording session + + In between episodes: + - if there is no teleop leader, the robot is held at its initial joint positions captured at startup. + - else, the robot is moved smoothly to the position of the teleop leader. + """ + + # This only applies if there are no teleop leaders specified. + # When True (default), moves the robot back to the joint positions captured at startup. + # Otherwise, leave the robot in its current position. + reset_to_initial_position: bool = True + + # Whether to turn on or off the leader -> follower smooth handover behavior. + # When False, fallback to follower -> leader handover. + # Note that leader -> follower handover is only supported when the leader has `send_feedback` capability. + smooth_leader_to_follower_handover: bool = True + + @RolloutStrategyConfig.register_subclass("dagger") @dataclass class DAggerStrategyConfig(RolloutStrategyConfig): @@ -229,7 +258,13 @@ class RolloutConfig: # TODO(Steven): DAgger shouldn't require a dataset (user may want to just rollout+intervene without recording), but for now we require it to simplify the implementation. needs_dataset = isinstance( - self.strategy, (SentryStrategyConfig, HighlightStrategyConfig, DAggerStrategyConfig) + self.strategy, + ( + SentryStrategyConfig, + HighlightStrategyConfig, + DAggerStrategyConfig, + EpisodicStrategyConfig, + ), ) if needs_dataset and (self.dataset is None or not self.dataset.repo_id): raise ValueError(f"{self.strategy.type} strategy requires --dataset.repo_id to be set") diff --git a/src/lerobot/rollout/strategies/__init__.py b/src/lerobot/rollout/strategies/__init__.py index 554327073..2501064bd 100644 --- a/src/lerobot/rollout/strategies/__init__.py +++ b/src/lerobot/rollout/strategies/__init__.py @@ -17,6 +17,7 @@ from .base import BaseStrategy from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action from .dagger import DAggerEvents, DAggerPhase, DAggerStrategy +from .episodic import EpisodicStrategy from .factory import create_strategy from .highlight import HighlightStrategy from .sentry import SentryStrategy @@ -27,6 +28,7 @@ __all__ = [ "DAggerPhase", "DAggerStrategy", "HighlightStrategy", + "EpisodicStrategy", "RolloutStrategy", "SentryStrategy", "create_strategy", diff --git a/src/lerobot/rollout/strategies/dagger.py b/src/lerobot/rollout/strategies/dagger.py index 1bd6d9bb0..8791a5502 100644 --- a/src/lerobot/rollout/strategies/dagger.py +++ b/src/lerobot/rollout/strategies/dagger.py @@ -56,10 +56,14 @@ from typing import Any import numpy as np -from lerobot.common.control_utils import is_headless +from lerobot.common.control_utils import ( + follower_smooth_move_to, + is_headless, + teleop_smooth_move_to, + teleop_supports_feedback, +) from lerobot.datasets import VideoEncodingManager from lerobot.datasets.utils import DEFAULT_VIDEO_FILE_SIZE_IN_MB -from lerobot.teleoperators import Teleoperator from lerobot.utils.constants import ACTION, OBS_STR from lerobot.utils.feature_utils import build_dataset_frame from lerobot.utils.import_utils import _pynput_available @@ -69,7 +73,6 @@ from lerobot.utils.utils import log_say from ..configs import DAggerKeyboardConfig, DAggerPedalConfig, DAggerStrategyConfig from ..context import RolloutContext -from ..robot_wrapper import ThreadSafeRobot from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hub, send_next_action PYNPUT_AVAILABLE = _pynput_available @@ -171,64 +174,6 @@ class DAggerEvents: self.upload_requested.clear() -# --------------------------------------------------------------------------- -# Teleoperator helpers -# --------------------------------------------------------------------------- - - -def _teleop_supports_feedback(teleop: Teleoperator) -> bool: - """Return True when the teleop can receive position feedback (is actuated). - TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing. - """ - return ( - bool(teleop.feedback_features) - and hasattr(teleop, "disable_torque") - and hasattr(teleop, "enable_torque") - ) - - -def _teleop_smooth_move_to( - teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30 -) -> None: - """Smoothly move an actuated teleop to ``target_pos`` via linear interpolation. - - Requires the teleoperator to support feedback - (i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``). - - TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time - the follower robot doesn't receive new actions, this could be an issue on LeKiwi. - """ - teleop.enable_torque() - current = teleop.get_action() - steps = max(int(duration_s * fps), 1) - - for step in range(steps + 1): - t = step / steps - interp = { - k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current - } - teleop.send_feedback(interp) - time.sleep(1 / fps) - - -def _follower_smooth_move_to( - robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30 -) -> None: - """Smoothly move the follower robot from ``current`` to ``target`` action. - - Used when the teleop is non-actuated: instead of driving the leader arm - to the follower, we bring the follower to the teleop's current pose. - Both ``current`` and ``target`` must be in robot-action key space. - """ - steps = max(int(duration_s * fps), 1) - - for step in range(steps + 1): - t = step / steps - interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current} - robot.send_action(interp) - time.sleep(1 / fps) - - # --------------------------------------------------------------------------- # Input device handlers # --------------------------------------------------------------------------- @@ -756,31 +701,31 @@ class DAggerStrategy(RolloutStrategy): logger.info("Pausing engine - robot holds position") engine.pause() - if _teleop_supports_feedback(teleop) and prev_action is not None: + if teleop_supports_feedback(teleop) and prev_action is not None: # TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor). # send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101 # leader + SO-101 follower) the keys are identical so this works. If the processor pipeline # does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in - # _teleop_smooth_move_to silently no-ops and the arm doesn't move. + # teleop_smooth_move_to silently no-ops and the arm doesn't move. logger.info("Smooth handover: moving leader arm to follower position") - _teleop_smooth_move_to(teleop, prev_action) + teleop_smooth_move_to(teleop, prev_action) elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING: logger.info("Entering correction mode - human teleop control") - if not _teleop_supports_feedback(teleop) and prev_action is not None: + if not teleop_supports_feedback(teleop) and prev_action is not None: logger.info("Smooth handover: sliding follower to teleop position") obs = robot.get_observation() teleop_action = teleop.get_action() processed = ctx.processors.teleop_action_processor((teleop_action, obs)) target = ctx.processors.robot_action_processor((processed, obs)) - _follower_smooth_move_to(robot, prev_action, target) + follower_smooth_move_to(robot, prev_action, target) # unlock the teleop for human control - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.disable_torque() elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED: - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.enable_torque() elif new_phase == DAggerPhase.AUTONOMOUS: @@ -790,7 +735,7 @@ class DAggerStrategy(RolloutStrategy): engine.resume() # release teleop before resuming the policy - if _teleop_supports_feedback(teleop): + if teleop_supports_feedback(teleop): teleop.disable_torque() # ------------------------------------------------------------------ diff --git a/src/lerobot/rollout/strategies/episodic.py b/src/lerobot/rollout/strategies/episodic.py new file mode 100644 index 000000000..e925fb2ea --- /dev/null +++ b/src/lerobot/rollout/strategies/episodic.py @@ -0,0 +1,335 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Episodic rollout strategy: mirrors the behavior of ``lerobot-record``. + +- Policy drives the robot during each recording episode. +- An optional teleoperator can drive the robot during reset phases so the + operator can bring the environment back to its starting configuration. + If no teleop is connected the robot stays in its current position. +- Keyboard controls: + + Right arrow — end the current episode or reset phase early + Left arrow — discard the current episode and re-record it + Escape — stop the recording session + +Dataset naming follows the rollout convention: repo names must start with ``rollout_``. +""" + +from __future__ import annotations + +import contextlib +import logging +import time + +from lerobot.common.control_utils import ( + follower_smooth_move_to, + init_keyboard_listener, + is_headless, + teleop_smooth_move_to, + teleop_supports_feedback, +) +from lerobot.datasets import VideoEncodingManager +from lerobot.utils.constants import ACTION, OBS_STR +from lerobot.utils.feature_utils import build_dataset_frame +from lerobot.utils.robot_utils import precise_sleep +from lerobot.utils.utils import log_say +from lerobot.utils.visualization_utils import log_rerun_data + +from ..configs import EpisodicStrategyConfig +from ..context import RolloutContext +from .core import RolloutStrategy, safe_push_to_hub, send_next_action + +logger = logging.getLogger(__name__) + + +class EpisodicStrategy(RolloutStrategy): + """Policy-driven multi-episode recording, mirrors the behavior of ``lerobot-record``. + + Each recording episode runs the policy for maximum ``dataset.episode_time_s`` + seconds, recording every frame. A reset phase of ``dataset.reset_time_s`` + follows every episode (except the last) so the operator can manually + reset the environment. During the reset phase, an optional teleoperator + drives the robot; if none is present the robot returns to its initial joint positions captured at startup. + + The policy state (hidden state, RTC queue, interpolator) is reset at + the start of each recording episode. + + Keyboard events: + right arrow → end current episode or reset phase early + left arrow → discard & re-record current episode + ESC → stop the session + """ + + config: EpisodicStrategyConfig + + def __init__(self, config: EpisodicStrategyConfig) -> None: + super().__init__(config) + self._listener = None + self._events: dict | None = None + + def setup(self, ctx: RolloutContext) -> None: + """Start the inference engine and attach the keyboard listener.""" + self._init_engine(ctx) + self._listener, self._events = init_keyboard_listener() + logger.info("Episodic strategy ready") + + def run(self, ctx: RolloutContext) -> None: + """Main multi-episode recording loop.""" + cfg = ctx.runtime.cfg + dataset_cfg = cfg.dataset + robot = ctx.hardware.robot_wrapper + teleop = ctx.hardware.teleop + dataset = ctx.data.dataset + events = self._events + features = ctx.data.dataset_features + + fps = cfg.fps + episode_time_s = dataset_cfg.episode_time_s + reset_time_s = dataset_cfg.reset_time_s + num_episodes = dataset_cfg.num_episodes + single_task = dataset_cfg.single_task or cfg.task + play_sounds = cfg.play_sounds + + display_compressed = ( + True + if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None) + else cfg.display_compressed_images + ) + + with VideoEncodingManager(dataset): + try: + recorded_episodes = 0 + while recorded_episodes < num_episodes and not events["stop_recording"]: + if ctx.runtime.shutdown_event.is_set(): + break + + # Reset policy state at episode start (discard leftover hidden state / queue) + self._engine.reset() + self._interpolator.reset() + self._engine.resume() + + log_say(f"Recording episode {dataset.num_episodes}", play_sounds) + self._policy_loop( + ctx=ctx, + robot=robot, + events=events, + features=features, + fps=fps, + control_time_s=episode_time_s, + dataset=dataset, + single_task=single_task, + ) + + # Reset phase, skip after the last episode (but run when re-recording) + if not events["stop_recording"] and ( + recorded_episodes < num_episodes - 1 or events["rerecord_episode"] + ): + log_say("Reset the environment", play_sounds) + + if teleop: + # Smooth handover so the transition to teleop control is jerk-free. + # For actuated teleops: drive the leader arm to the follower's current + # position so the operator takes over without fighting the arm. + # For non-actuated teleops: slide the follower to the teleop's current + # pose instead, since the leader cannot be driven. + obs = robot.get_observation() + current_pos = {k: v for k, v in obs.items() if k.endswith(".pos")} + if ( + teleop_supports_feedback(teleop) + and self.config.smooth_leader_to_follower_handover + ): + logger.info("Smooth handover: moving leader arm to follower position") + teleop_smooth_move_to(teleop, current_pos, duration_s=2) + teleop.disable_torque() + else: + logger.info("Smooth handover: sliding follower to teleop position") + teleop_action = teleop.get_action() + processed = ctx.processors.teleop_action_processor((teleop_action, obs)) + target = ctx.processors.robot_action_processor((processed, obs)) + follower_smooth_move_to(robot, current_pos, target, duration_s=1) + + elif self.config.reset_to_initial_position: + # No teleop: return the robot to its startup position. + self._return_to_initial_position(hw=ctx.hardware, duration_s=1) + + self._reset_loop( + ctx=ctx, + robot=robot, + teleop=teleop, + events=events, + fps=fps, + control_time_s=reset_time_s, + display_data=cfg.display_data, + display_compressed=display_compressed, + ) + + if events["rerecord_episode"]: + log_say("Re-record episode", play_sounds) + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + + # returns to its initial joint positions captured at startup + if not teleop and self.config.reset_to_initial_position: + self._return_to_initial_position(hw=ctx.hardware, duration_s=1) + + continue + + dataset.save_episode() + recorded_episodes += 1 + finally: + # Save any frames buffered in the current episode so an unexpected + # exception or KeyboardInterrupt does not silently drop recorded data. + # suppress: save_episode raises if the buffer is empty (nothing to lose). + logger.info("Episodic control loop ended — saving any in-progress episode") + with contextlib.suppress(Exception): + dataset.save_episode() + + def _policy_loop( + self, + ctx: RolloutContext, + robot, + events: dict, + features: dict, + fps: float, + control_time_s: float, + dataset, + single_task: str, + ) -> None: + """Policy-driven recording loop for a single episode.""" + interpolator = self._interpolator + control_interval = interpolator.get_control_interval(fps) + + timestamp = 0.0 + start_t = time.perf_counter() + + while timestamp < control_time_s: + loop_start = time.perf_counter() + + if events["exit_early"]: + events["exit_early"] = False + break + + if ctx.runtime.shutdown_event.is_set(): + break + + obs = robot.get_observation() + obs_processed = self._process_observation_and_notify(ctx.processors, obs) + + if self._handle_warmup(ctx.runtime.cfg.use_torch_compile, loop_start, control_interval): + continue + + action_dict = send_next_action(obs_processed, obs, ctx, interpolator) + + if action_dict is not None: + obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR) + action_frame = build_dataset_frame(features, action_dict, prefix=ACTION) + dataset.add_frame({**obs_frame, **action_frame, "task": single_task}) + self._log_telemetry(obs_processed, action_dict, ctx.runtime) + + dt = time.perf_counter() - loop_start + sleep_t = control_interval - dt + if sleep_t < 0: + logger.warning( + f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({fps} Hz). " + "Dataset frames might be dropped and robot control might be unstable. " + "Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long " + "3) CPU starvation" + ) + precise_sleep(max(sleep_t, 0.0)) + timestamp = time.perf_counter() - start_t + + def _reset_loop( + self, + ctx: RolloutContext, + robot, + teleop, + events: dict, + fps: float, + control_time_s: float, + display_data: bool, + display_compressed: bool, + ) -> None: + """Reset-phase loop: teleop drives the robot if available, no recording.""" + processors = ctx.processors + control_interval = 1.0 / fps + + timestamp = 0.0 + start_t = time.perf_counter() + + while timestamp < control_time_s: + loop_start = time.perf_counter() + + if events["exit_early"]: + events["exit_early"] = False + break + + if ctx.runtime.shutdown_event.is_set(): + break + + obs = robot.get_observation() + + if teleop is not None: + act = teleop.get_action() + act_teleop = processors.teleop_action_processor((act, obs)) + robot_action = processors.robot_action_processor((act_teleop, obs)) + robot.send_action(robot_action) + + if display_data: + obs_processed = processors.robot_observation_processor(obs) + log_rerun_data( + observation=obs_processed, + action=act_teleop, + compress_images=display_compressed, + ) + + dt = time.perf_counter() - loop_start + sleep_t = control_interval - dt + precise_sleep(max(sleep_t, 0.0)) + timestamp = time.perf_counter() - start_t + + def teardown(self, ctx: RolloutContext) -> None: + """Finalise dataset, stop listener, push to hub, and disconnect hardware.""" + cfg = ctx.runtime.cfg + play_sounds = cfg.play_sounds + + log_say("Stop recording", play_sounds, blocking=True) + + if not is_headless() and self._listener is not None: + self._listener.stop() + + if ctx.data.dataset is not None: + logger.info("Finalizing dataset...") + ctx.data.dataset.finalize() + + if ( + cfg.dataset is not None + and cfg.dataset.push_to_hub + and ctx.data.dataset is not None + and safe_push_to_hub( + ctx.data.dataset, + tags=cfg.dataset.tags, + private=cfg.dataset.private, + ) + ): + logger.info("Dataset uploaded to hub") + log_say("Dataset uploaded to hub", play_sounds) + + self._teardown_hardware( + ctx.hardware, + return_to_initial_position=cfg.return_to_initial_position, + ) + log_say("Exiting", play_sounds) + logger.info("Episodic strategy teardown complete") diff --git a/src/lerobot/rollout/strategies/factory.py b/src/lerobot/rollout/strategies/factory.py index 8a9727769..3e1ce3214 100644 --- a/src/lerobot/rollout/strategies/factory.py +++ b/src/lerobot/rollout/strategies/factory.py @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING from .base import BaseStrategy from .core import RolloutStrategy from .dagger import DAggerStrategy +from .episodic import EpisodicStrategy from .highlight import HighlightStrategy from .sentry import SentryStrategy @@ -42,4 +43,8 @@ def create_strategy(config: RolloutStrategyConfig) -> RolloutStrategy: return HighlightStrategy(config) if config.type == "dagger": return DAggerStrategy(config) - raise ValueError(f"Unknown strategy type '{config.type}'. Available: base, sentry, highlight, dagger") + if config.type == "episodic": + return EpisodicStrategy(config) + raise ValueError( + f"Unknown strategy type '{config.type}'. Available: base, sentry, highlight, dagger, episodic" + ) diff --git a/src/lerobot/scripts/lerobot_rollout.py b/src/lerobot/scripts/lerobot_rollout.py index 3378b6de4..82a858b9a 100644 --- a/src/lerobot/scripts/lerobot_rollout.py +++ b/src/lerobot/scripts/lerobot_rollout.py @@ -25,6 +25,7 @@ Strategies --strategy.type=sentry Continuous recording with auto-upload --strategy.type=highlight Ring buffer + keystroke save --strategy.type=dagger Human-in-the-loop (DAgger / RaC) + --strategy.type=episodic Episode-oriented recording with reset phases Inference backends ------------------ @@ -111,6 +112,18 @@ Usage examples --display_data=true \\ --use_torch_compile=true + # Episodic mode — episode-oriented recording with reset phases + lerobot-rollout \\ + --strategy.type=episodic \\ + --policy.path=user/my_policy \\ + --robot.type=so100_follower \\ + --robot.port=/dev/ttyACM0 \\ + --teleop.type=so100_leader \\ + --teleop.port=/dev/ttyACM1 \\ + --dataset.repo_id=user/rollout_episodic_data \\ + --dataset.num_episodes=20 \\ + --dataset.single_task="Grab the cube" + # Resume a previous sentry recording session lerobot-rollout \\ --strategy.type=sentry \\ diff --git a/tests/test_rollout.py b/tests/test_rollout.py index 5a1ec4703..85a29ff4c 100644 --- a/tests/test_rollout.py +++ b/tests/test_rollout.py @@ -59,6 +59,7 @@ def test_strategy_config_types(): from lerobot.rollout import ( BaseStrategyConfig, DAggerStrategyConfig, + EpisodicStrategyConfig, HighlightStrategyConfig, SentryStrategyConfig, ) @@ -67,6 +68,7 @@ def test_strategy_config_types(): assert SentryStrategyConfig().type == "sentry" assert HighlightStrategyConfig().type == "highlight" assert DAggerStrategyConfig().type == "dagger" + assert EpisodicStrategyConfig().type == "episodic" def test_dagger_config_invalid_input_device(): @@ -203,6 +205,8 @@ def test_create_strategy_dispatches(): BaseStrategyConfig, DAggerStrategy, DAggerStrategyConfig, + EpisodicStrategy, + EpisodicStrategyConfig, SentryStrategy, SentryStrategyConfig, create_strategy, @@ -211,6 +215,7 @@ def test_create_strategy_dispatches(): assert isinstance(create_strategy(BaseStrategyConfig()), BaseStrategy) assert isinstance(create_strategy(SentryStrategyConfig()), SentryStrategy) assert isinstance(create_strategy(DAggerStrategyConfig()), DAggerStrategy) + assert isinstance(create_strategy(EpisodicStrategyConfig()), EpisodicStrategy) def test_create_strategy_unknown_raises():