From 8847e75c558149116a7e12fb995ee244a5acf9fa Mon Sep 17 00:00:00 2001 From: Eugene Mironov Date: Sun, 16 Nov 2025 19:04:24 +0700 Subject: [PATCH] Extract simulator logic from eval_with real robot and add proper headers to files --- examples/rtc/eval_dataset.py | 14 ++ examples/rtc/eval_with_real_robot.py | 233 ++++----------------------- 2 files changed, 49 insertions(+), 198 deletions(-) diff --git a/examples/rtc/eval_dataset.py b/examples/rtc/eval_dataset.py index 92eb3fa93..af37636f9 100644 --- a/examples/rtc/eval_dataset.py +++ b/examples/rtc/eval_dataset.py @@ -1,5 +1,19 @@ #!/usr/bin/env python +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ Evaluate Real-Time Chunking (RTC) performance on dataset samples. diff --git a/examples/rtc/eval_with_real_robot.py b/examples/rtc/eval_with_real_robot.py index 0fcc7f25e..6f051485a 100644 --- a/examples/rtc/eval_with_real_robot.py +++ b/examples/rtc/eval_with_real_robot.py @@ -1,14 +1,30 @@ #!/usr/bin/env python +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ -Demo script showing how to use Real-Time Chunking (RTC) with action chunking policies. +Demo script showing how to use Real-Time Chunking (RTC) with action chunking policies on real robots. This script demonstrates: -1. Creating a robot/environment and policy (SmolVLA, Pi0, etc.) with RTC -2. Consuming actions from the policy while the robot/environment executes +1. Creating a robot and policy (SmolVLA, Pi0, etc.) with RTC +2. Consuming actions from the policy while the robot executes 3. Periodically requesting new action chunks in the background using threads 4. Managing action buffers and timing for real-time operation +For simulation environments, see eval_with_simulation.py + Usage: # Run RTC with Real robot with RTC uv run examples/rtc/eval_with_real_robot.py \ @@ -57,7 +73,6 @@ import traceback from dataclasses import dataclass, field from threading import Event, Lock, Thread -import numpy as np import torch from torch import Tensor @@ -67,8 +82,6 @@ from lerobot.configs import parser from lerobot.configs.policies import PreTrainedConfig from lerobot.configs.types import RTCAttentionSchedule from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features -from lerobot.envs.configs import EnvConfig # noqa: F401 -from lerobot.envs.factory import make_env from lerobot.policies.factory import get_policy_class, make_pre_post_processors from lerobot.policies.rtc.action_queue import ActionQueue from lerobot.policies.rtc.configuration_rtc import RTCConfig @@ -116,148 +129,16 @@ class RobotWrapper: return self.robot.action_features -class EnvWrapper: - """Wrapper for gym environments to provide same interface as RobotWrapper.""" - - def __init__(self, env, env_cfg: EnvConfig): - self.env = env - self.env_cfg = env_cfg - self.lock = Lock() - self._last_obs = None - self._episode_count = 0 - self._step_count = 0 - - # Initialize environment - obs, _ = self.env.reset() - self._last_obs = ( - obs[0] - if isinstance(obs, tuple) - or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict)) - else obs - ) - - # Cache feature names - self._observation_features = None - self._action_features = None - - def get_observation(self) -> dict[str, np.ndarray]: - """Get current observation from environment. - - Returns observations in the same format as robot.get_observation(): - a dict mapping feature names to numpy arrays. - """ - with self.lock: - if self._last_obs is None: - # Reset environment on first observation - obs, _ = self.env.reset() - self._last_obs = ( - obs[0] - if isinstance(obs, tuple) - or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict)) - else obs - ) - - # VectorEnv returns observations as numpy arrays in a batch - # Extract first element if it's a vectorized observation - obs = self._last_obs - if isinstance(obs, dict): - # Handle dict observations (extract first element from batch if needed) - result = {} - for key, value in obs.items(): - if isinstance(value, np.ndarray) and len(value.shape) > 0 and value.shape[0] == 1: - # Remove batch dimension for single env - result[key] = value[0] - else: - result[key] = value - return result - else: - # Handle array observations - shouldn't happen with our configs but handle it - return {"observation": obs[0] if len(obs.shape) > 1 else obs} - - def send_action(self, action: dict): - """Execute action in environment and update observation.""" - with self.lock: - # Convert action dict to array based on action_features - action_list = [] - for feature_name in self.action_features(): - if feature_name in action: - action_list.append(action[feature_name]) - - action_array = np.array(action_list) - - # VectorEnv expects actions with batch dimension - action_batch = action_array.reshape(1, -1) - - # Step environment - obs, _reward, terminated, truncated, _info = self.env.step(action_batch) - - # Extract from batch - self._last_obs = ( - obs[0] - if isinstance(obs, tuple) - or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict)) - else obs - ) - self._step_count += 1 - - # Check if episode is done (handle vectorized env format) - is_done = terminated[0] if isinstance(terminated, (np.ndarray, list)) else terminated - is_truncated = truncated[0] if isinstance(truncated, (np.ndarray, list)) else truncated - - # Reset if episode is done - if is_done or is_truncated: - logger.info(f"Episode {self._episode_count} finished after {self._step_count} steps") - obs, _ = self.env.reset() - self._last_obs = ( - obs[0] - if isinstance(obs, tuple) - or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict)) - else obs - ) - self._episode_count += 1 - self._step_count = 0 - - def observation_features(self) -> list[str]: - """Get observation feature names from environment config.""" - if self._observation_features is not None: - return self._observation_features - - with self.lock: - features = [] - for feature_name in self.env_cfg.features: - if feature_name != "action": - # Use the mapped name from features_map - mapped_name = self.env_cfg.features_map.get(feature_name, feature_name) - features.append(mapped_name) - - self._observation_features = features - return features - - def action_features(self) -> list[str]: - """Get action feature names from environment config.""" - if self._action_features is not None: - return self._action_features - - with self.lock: - # Return action dimension names - action_dim = self.env_cfg.features["action"].shape[0] - self._action_features = [f"action_{i}" for i in range(action_dim)] - return self._action_features - - @dataclass class RTCDemoConfig(HubMixin): - """Configuration for RTC demo with action chunking policies.""" + """Configuration for RTC demo with action chunking policies and real robots.""" # Policy configuration policy: PreTrainedConfig | None = None - # Robot configuration (mutually exclusive with env) + # Robot configuration robot: RobotConfig | None = None - # Environment configuration (mutually exclusive with robot) - env: EnvConfig | None = None - # RTC configuration rtc: RTCConfig = field( default_factory=lambda: RTCConfig( @@ -315,11 +196,9 @@ class RTCDemoConfig(HubMixin): else: raise ValueError("Policy path is required") - # Validate that either robot or env is provided, but not both - if self.robot is None and self.env is None: - raise ValueError("Either robot or env configuration must be provided") - if self.robot is not None and self.env is not None: - raise ValueError("Cannot specify both robot and env configuration. Choose one.") + # Validate that robot configuration is provided + if self.robot is None: + raise ValueError("Robot configuration must be provided") @classmethod def __get_path_fields__(cls) -> list[str]: @@ -565,7 +444,6 @@ def demo_cli(cfg: RTCDemoConfig): policy = None robot = None - vec_env = None get_actions_thread = None actor_thread = None @@ -595,46 +473,11 @@ def demo_cli(cfg: RTCDemoConfig): if cfg.use_torch_compile: policy = _apply_torch_compile(policy, cfg) - # Create robot or environment - if cfg.robot is not None: - logger.info(f"Initializing robot: {cfg.robot.type}") - robot = make_robot_from_config(cfg.robot) - robot.connect() - agent_wrapper = RobotWrapper(robot) - else: - logger.info(f"Initializing environment: {cfg.env.type}") - # Create environment using make_env - env_dict = make_env(cfg.env, n_envs=1, use_async_envs=False) - - # Validate environment structure: should have exactly one suite - if len(env_dict) != 1: - raise ValueError( - f"Expected exactly one environment suite, but got {len(env_dict)}. " - f"Suites: {list(env_dict.keys())}" - ) - - # Extract the actual env from the dict structure {suite: {task_id: vec_env}} - suite_name = list(env_dict.keys())[0] - task_dict = env_dict[suite_name] - - # Validate task structure: should have exactly one task - if len(task_dict) != 1: - raise ValueError( - f"Expected exactly one task in suite '{suite_name}', but got {len(task_dict)}. " - f"Tasks: {list(task_dict.keys())}" - ) - - vec_env = task_dict[0] - logger.info(f"Created environment: suite='{suite_name}', task_id=0, num_envs={vec_env.num_envs}") - - # Validate that we have exactly 1 parallel environment - if vec_env.num_envs != 1: - raise ValueError( - f"Expected exactly 1 parallel environment, but got {vec_env.num_envs}. " - f"The EnvWrapper is designed for single environment instances." - ) - - agent_wrapper = EnvWrapper(vec_env, cfg.env) + # Create robot + logger.info(f"Initializing robot: {cfg.robot.type}") + robot = make_robot_from_config(cfg.robot) + robot.connect() + robot_wrapper = RobotWrapper(robot) # Create robot observation processor robot_observation_processor = make_default_robot_observation_processor() @@ -646,7 +489,7 @@ def demo_cli(cfg: RTCDemoConfig): # Start chunk requester thread get_actions_thread = Thread( target=get_actions, - args=(policy, agent_wrapper, robot_observation_processor, action_queue, shutdown_event, cfg), + args=(policy, robot_wrapper, robot_observation_processor, action_queue, shutdown_event, cfg), daemon=True, name="GetActions", ) @@ -656,7 +499,7 @@ def demo_cli(cfg: RTCDemoConfig): # Start action executor thread actor_thread = Thread( target=actor_control, - args=(agent_wrapper, robot_action_processor, action_queue, shutdown_event, cfg), + args=(robot_wrapper, robot_action_processor, action_queue, shutdown_event, cfg), daemon=True, name="Actor", ) @@ -693,16 +536,10 @@ def demo_cli(cfg: RTCDemoConfig): logger.info("Waiting for action executor thread to finish...") actor_thread.join() - # Cleanup robot or environment - if cfg.robot is not None: - if robot: - robot.disconnect() - logger.info("Robot disconnected") - else: - # Close environment - if vec_env: - vec_env.close() - logger.info("Environment closed") + # Cleanup robot + if robot: + robot.disconnect() + logger.info("Robot disconnected") logger.info("Cleanup completed")