Extract simulator logic from eval_with real robot and add proper headers to files

This commit is contained in:
Eugene Mironov
2025-11-16 19:04:24 +07:00
parent 8eb28fd653
commit 045f9c02f7
2 changed files with 49 additions and 198 deletions
+14
View File
@@ -1,5 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Evaluate Real-Time Chunking (RTC) performance on dataset samples.
+35 -198
View File
@@ -1,14 +1,30 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Demo script showing how to use Real-Time Chunking (RTC) with action chunking policies.
Demo script showing how to use Real-Time Chunking (RTC) with action chunking policies on real robots.
This script demonstrates:
1. Creating a robot/environment and policy (SmolVLA, Pi0, etc.) with RTC
2. Consuming actions from the policy while the robot/environment executes
1. Creating a robot and policy (SmolVLA, Pi0, etc.) with RTC
2. Consuming actions from the policy while the robot executes
3. Periodically requesting new action chunks in the background using threads
4. Managing action buffers and timing for real-time operation
For simulation environments, see eval_with_simulation.py
Usage:
# Run RTC with Real robot with RTC
uv run examples/rtc/eval_with_real_robot.py \
@@ -57,7 +73,6 @@ import traceback
from dataclasses import dataclass, field
from threading import Event, Lock, Thread
import numpy as np
import torch
from torch import Tensor
@@ -67,8 +82,6 @@ from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import RTCAttentionSchedule
from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features
from lerobot.envs.configs import EnvConfig # noqa: F401
from lerobot.envs.factory import make_env
from lerobot.policies.factory import get_policy_class, make_pre_post_processors
from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
@@ -116,148 +129,16 @@ class RobotWrapper:
return self.robot.action_features
class EnvWrapper:
"""Wrapper for gym environments to provide same interface as RobotWrapper."""
def __init__(self, env, env_cfg: EnvConfig):
self.env = env
self.env_cfg = env_cfg
self.lock = Lock()
self._last_obs = None
self._episode_count = 0
self._step_count = 0
# Initialize environment
obs, _ = self.env.reset()
self._last_obs = (
obs[0]
if isinstance(obs, tuple)
or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict))
else obs
)
# Cache feature names
self._observation_features = None
self._action_features = None
def get_observation(self) -> dict[str, np.ndarray]:
"""Get current observation from environment.
Returns observations in the same format as robot.get_observation():
a dict mapping feature names to numpy arrays.
"""
with self.lock:
if self._last_obs is None:
# Reset environment on first observation
obs, _ = self.env.reset()
self._last_obs = (
obs[0]
if isinstance(obs, tuple)
or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict))
else obs
)
# VectorEnv returns observations as numpy arrays in a batch
# Extract first element if it's a vectorized observation
obs = self._last_obs
if isinstance(obs, dict):
# Handle dict observations (extract first element from batch if needed)
result = {}
for key, value in obs.items():
if isinstance(value, np.ndarray) and len(value.shape) > 0 and value.shape[0] == 1:
# Remove batch dimension for single env
result[key] = value[0]
else:
result[key] = value
return result
else:
# Handle array observations - shouldn't happen with our configs but handle it
return {"observation": obs[0] if len(obs.shape) > 1 else obs}
def send_action(self, action: dict):
"""Execute action in environment and update observation."""
with self.lock:
# Convert action dict to array based on action_features
action_list = []
for feature_name in self.action_features():
if feature_name in action:
action_list.append(action[feature_name])
action_array = np.array(action_list)
# VectorEnv expects actions with batch dimension
action_batch = action_array.reshape(1, -1)
# Step environment
obs, _reward, terminated, truncated, _info = self.env.step(action_batch)
# Extract from batch
self._last_obs = (
obs[0]
if isinstance(obs, tuple)
or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict))
else obs
)
self._step_count += 1
# Check if episode is done (handle vectorized env format)
is_done = terminated[0] if isinstance(terminated, (np.ndarray, list)) else terminated
is_truncated = truncated[0] if isinstance(truncated, (np.ndarray, list)) else truncated
# Reset if episode is done
if is_done or is_truncated:
logger.info(f"Episode {self._episode_count} finished after {self._step_count} steps")
obs, _ = self.env.reset()
self._last_obs = (
obs[0]
if isinstance(obs, tuple)
or (hasattr(obs, "__getitem__") and len(obs) > 0 and not isinstance(obs, dict))
else obs
)
self._episode_count += 1
self._step_count = 0
def observation_features(self) -> list[str]:
"""Get observation feature names from environment config."""
if self._observation_features is not None:
return self._observation_features
with self.lock:
features = []
for feature_name in self.env_cfg.features:
if feature_name != "action":
# Use the mapped name from features_map
mapped_name = self.env_cfg.features_map.get(feature_name, feature_name)
features.append(mapped_name)
self._observation_features = features
return features
def action_features(self) -> list[str]:
"""Get action feature names from environment config."""
if self._action_features is not None:
return self._action_features
with self.lock:
# Return action dimension names
action_dim = self.env_cfg.features["action"].shape[0]
self._action_features = [f"action_{i}" for i in range(action_dim)]
return self._action_features
@dataclass
class RTCDemoConfig(HubMixin):
"""Configuration for RTC demo with action chunking policies."""
"""Configuration for RTC demo with action chunking policies and real robots."""
# Policy configuration
policy: PreTrainedConfig | None = None
# Robot configuration (mutually exclusive with env)
# Robot configuration
robot: RobotConfig | None = None
# Environment configuration (mutually exclusive with robot)
env: EnvConfig | None = None
# RTC configuration
rtc: RTCConfig = field(
default_factory=lambda: RTCConfig(
@@ -315,11 +196,9 @@ class RTCDemoConfig(HubMixin):
else:
raise ValueError("Policy path is required")
# Validate that either robot or env is provided, but not both
if self.robot is None and self.env is None:
raise ValueError("Either robot or env configuration must be provided")
if self.robot is not None and self.env is not None:
raise ValueError("Cannot specify both robot and env configuration. Choose one.")
# Validate that robot configuration is provided
if self.robot is None:
raise ValueError("Robot configuration must be provided")
@classmethod
def __get_path_fields__(cls) -> list[str]:
@@ -565,7 +444,6 @@ def demo_cli(cfg: RTCDemoConfig):
policy = None
robot = None
vec_env = None
get_actions_thread = None
actor_thread = None
@@ -595,46 +473,11 @@ def demo_cli(cfg: RTCDemoConfig):
if cfg.use_torch_compile:
policy = _apply_torch_compile(policy, cfg)
# Create robot or environment
if cfg.robot is not None:
logger.info(f"Initializing robot: {cfg.robot.type}")
robot = make_robot_from_config(cfg.robot)
robot.connect()
agent_wrapper = RobotWrapper(robot)
else:
logger.info(f"Initializing environment: {cfg.env.type}")
# Create environment using make_env
env_dict = make_env(cfg.env, n_envs=1, use_async_envs=False)
# Validate environment structure: should have exactly one suite
if len(env_dict) != 1:
raise ValueError(
f"Expected exactly one environment suite, but got {len(env_dict)}. "
f"Suites: {list(env_dict.keys())}"
)
# Extract the actual env from the dict structure {suite: {task_id: vec_env}}
suite_name = list(env_dict.keys())[0]
task_dict = env_dict[suite_name]
# Validate task structure: should have exactly one task
if len(task_dict) != 1:
raise ValueError(
f"Expected exactly one task in suite '{suite_name}', but got {len(task_dict)}. "
f"Tasks: {list(task_dict.keys())}"
)
vec_env = task_dict[0]
logger.info(f"Created environment: suite='{suite_name}', task_id=0, num_envs={vec_env.num_envs}")
# Validate that we have exactly 1 parallel environment
if vec_env.num_envs != 1:
raise ValueError(
f"Expected exactly 1 parallel environment, but got {vec_env.num_envs}. "
f"The EnvWrapper is designed for single environment instances."
)
agent_wrapper = EnvWrapper(vec_env, cfg.env)
# Create robot
logger.info(f"Initializing robot: {cfg.robot.type}")
robot = make_robot_from_config(cfg.robot)
robot.connect()
robot_wrapper = RobotWrapper(robot)
# Create robot observation processor
robot_observation_processor = make_default_robot_observation_processor()
@@ -646,7 +489,7 @@ def demo_cli(cfg: RTCDemoConfig):
# Start chunk requester thread
get_actions_thread = Thread(
target=get_actions,
args=(policy, agent_wrapper, robot_observation_processor, action_queue, shutdown_event, cfg),
args=(policy, robot_wrapper, robot_observation_processor, action_queue, shutdown_event, cfg),
daemon=True,
name="GetActions",
)
@@ -656,7 +499,7 @@ def demo_cli(cfg: RTCDemoConfig):
# Start action executor thread
actor_thread = Thread(
target=actor_control,
args=(agent_wrapper, robot_action_processor, action_queue, shutdown_event, cfg),
args=(robot_wrapper, robot_action_processor, action_queue, shutdown_event, cfg),
daemon=True,
name="Actor",
)
@@ -693,16 +536,10 @@ def demo_cli(cfg: RTCDemoConfig):
logger.info("Waiting for action executor thread to finish...")
actor_thread.join()
# Cleanup robot or environment
if cfg.robot is not None:
if robot:
robot.disconnect()
logger.info("Robot disconnected")
else:
# Close environment
if vec_env:
vec_env.close()
logger.info("Environment closed")
# Cleanup robot
if robot:
robot.disconnect()
logger.info("Robot disconnected")
logger.info("Cleanup completed")