Compare commits

...

10 Commits

Author SHA1 Message Date
Michel Aractingi b2d7eecdb4 format docs 2025-08-05 10:36:22 +02:00
Michel Aractingi 0710f3a0f1 Remove dataset and mode from HilSerlEnvConfig to a GymManipulatorConfig to reduce verbose of configs during training 2025-08-05 10:35:54 +02:00
Michel Aractingi 9effc5214f Refactored hilserl config 2025-08-05 01:24:46 +02:00
Michel Aractingi b292dbbc55 change folder structure to reduce the size of gym_manip 2025-08-04 16:41:42 +02:00
Michel Aractingi f49280e89b RL works at this commit - fixed actor.py and bugs in gym_manipulator 2025-08-03 23:21:13 +02:00
pre-commit-ci[bot] ff38a51df9 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-08-02 17:07:19 +00:00
Michel Aractingi cfa672129e Refactored actor.py to use the pipeline 2025-08-02 19:06:56 +02:00
Michel Aractingi e6e1edfd74 Added the replay functionality with the pipeline 2025-08-02 17:57:27 +02:00
Michel Aractingi 384101731e Added the capability to record a dataset 2025-08-02 17:14:14 +02:00
Michel Aractingi 1fdbecad3c Migrate gym_manipulator to use the pipeline
Added get_teleop_events function to capture relevant events from teleop devices unrelated to actions
2025-08-01 20:20:13 +02:00
10 changed files with 1284 additions and 2119 deletions
+125 -55
View File
@@ -56,27 +56,41 @@ pip install -e ".[hilserl]"
### Understanding Configuration
The training process begins with proper configuration for the HILSerl environment. The configuration class of interest is `HILSerlRobotEnvConfig` in `lerobot/envs/configs.py`. Which is defined as:
The training process begins with proper configuration for the HILSerl environment. The main configuration class is `GymManipulatorConfig` in `lerobot/scripts/rl/gym_manipulator.py`, which contains nested `HILSerlRobotEnvConfig` and `DatasetConfig`. The configuration is organized into focused, nested sub-configs:
<!-- prettier-ignore-start -->
```python
class GymManipulatorConfig:
env: HILSerlRobotEnvConfig # Environment configuration (nested)
dataset: DatasetConfig # Dataset recording/replay configuration (nested)
mode: str | None = None # "record", "replay", or None (for training)
class HILSerlRobotEnvConfig(EnvConfig):
robot: RobotConfig | None = None # Main robot agent (defined in `lerobot/robots`)
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm, (defined in `lerobot/teleoperators`)
wrapper: EnvTransformConfig | None = None # Environment wrapper settings; check `lerobot/scripts/server/gym_manipulator.py`
fps: int = 10 # Control frequency
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm
processor: HILSerlProcessorConfig # Processing pipeline configuration (nested)
name: str = "real_robot" # Environment name
mode: str = None # "record", "replay", or None (for training)
repo_id: str | None = None # LeRobot dataset repository ID
dataset_root: str | None = None # Local dataset root (optional)
task: str = "" # Task identifier
num_episodes: int = 10 # Number of episodes for recording
episode: int = 0 # episode index for replay
device: str = "cuda" # Compute device
push_to_hub: bool = True # Whether to push the recorded datasets to Hub
pretrained_policy_name_or_path: str | None = None # For policy loading
reward_classifier_pretrained_path: str | None = None # For reward model
number_of_steps_after_success: int = 0 # For reward classifier, collect more positive examples after a success to train a classifier
fps: int = 30 # Control frequency
# Nested processor configuration
class HILSerlProcessorConfig:
control_mode: str = "gamepad" # Control mode
observation: ObservationConfig # Observation processing settings
image_preprocessing: ImagePreprocessingConfig # Image crop/resize settings
gripper: GripperConfig # Gripper control and penalty settings
reset: ResetConfig # Environment reset and timing settings
inverse_kinematics: InverseKinematicsConfig # IK processing settings
reward_classifier: RewardClassifierConfig # Reward classifier settings
# Dataset configuration
class DatasetConfig:
repo_id: str # LeRobot dataset repository ID
dataset_root: str | None = None # Local dataset root (optional)
task: str # Task identifier
num_episodes: int # Number of episodes for recording
episode: int # Episode index for replay
push_to_hub: bool # Whether to push datasets to Hub
```
<!-- prettier-ignore-end -->
@@ -130,22 +144,31 @@ With the bounds defined, you can safely collect demonstrations for training. Tra
Create a configuration file for recording demonstrations (or edit an existing one like [env_config_so100.json](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_so100.json)):
1. Set `mode` to `"record"`
2. Specify a unique `repo_id` for your dataset (e.g., "username/task_name")
3. Set `num_episodes` to the number of demonstrations you want to collect
4. Set `crop_params_dict` to `null` initially (we'll determine crops later)
5. Configure `robot`, `cameras`, and other hardware settings
1. Set `mode` to `"record"` at the root level
2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name")
3. Set `num_episodes` in the `dataset` section to the number of demonstrations you want to collect
4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later)
5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section
Example configuration section:
```json
"mode": "record",
"repo_id": "username/pick_lift_cube",
"dataset_root": null,
"task": "pick_and_lift",
"num_episodes": 15,
"episode": 0,
"push_to_hub": true
{
"env": {
"type": "gym_manipulator",
"fps": 10
// ... robot, teleop, processor configs ...
},
"dataset": {
"repo_id": "username/pick_lift_cube",
"dataset_root": null,
"task": "pick_and_lift",
"num_episodes": 15,
"episode": 0,
"push_to_hub": true
},
"mode": "record"
}
```
### Using a Teleoperation Device
@@ -191,10 +214,17 @@ The gamepad provides a very convenient way to control the robot and the episode
To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and define the `teleop` section in the configuration file.
```json
{
"env": {
"teleop": {
"type": "gamepad",
"use_gripper": true
"type": "gamepad",
"use_gripper": true
},
"processor": {
"control_mode": "gamepad"
}
}
}
```
<p align="center">
@@ -216,11 +246,18 @@ The SO101 leader arm has reduced gears that allows it to move and track the foll
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file.
```json
{
"env": {
"teleop": {
"type": "so101_leader",
"port": "/dev/tty.usbmodem585A0077921", # check your port number
"use_degrees": true
"type": "so101_leader",
"port": "/dev/tty.usbmodem585A0077921",
"use_degrees": true
},
"processor": {
"control_mode": "leader"
}
}
}
```
In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure.
@@ -251,7 +288,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/e
During recording:
1. The robot will reset to the initial position defined in the configuration file `fixed_reset_joint_positions`
1. The robot will reset to the initial position defined in the configuration file `env.processor.reset.fixed_reset_joint_positions`
2. Complete the task successfully
3. The episode ends with a reward of 1 when you press the "success" button
4. If the time limit is reached, or the fail button is pressed, the episode ends with a reward of 0
@@ -310,11 +347,19 @@ observation.images.front: [180, 250, 120, 150]
Add these crop parameters to your training configuration:
```json
"crop_params_dict": {
"observation.images.side": [180, 207, 180, 200],
"observation.images.front": [180, 250, 120, 150]
},
"resize_size": [128, 128]
{
"env": {
"processor": {
"image_preprocessing": {
"crop_params_dict": {
"observation.images.side": [180, 207, 180, 200],
"observation.images.front": [180, 250, 120, 150]
},
"resize_size": [128, 128]
}
}
}
}
```
**Recommended image resolution**
@@ -343,26 +388,35 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r
**Key Parameters for Data Collection**
- **mode**: set it to `"record"` to collect a dataset
- **repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
- **num_episodes**: Number of episodes to record
- **number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
- **fps**: Number of frames per second to record
- **push_to_hub**: Whether to push the dataset to the hub
- **mode**: set it to `"record"` to collect a dataset (at root level)
- **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
- **dataset.num_episodes**: Number of episodes to record
- **env.processor.reset.number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
- **env.fps**: Number of frames per second to record
- **dataset.push_to_hub**: Whether to push the dataset to the hub
The `number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
The `env.processor.reset.number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
Example configuration section for data collection:
```json
{
"mode": "record",
"repo_id": "hf_username/dataset_name",
"dataset_root": "data/your_dataset",
"num_episodes": 20,
"push_to_hub": true,
"fps": 10,
"number_of_steps_after_success": 15
"env": {
"type": "gym_manipulator",
"fps": 10,
"processor": {
"reset": {
"number_of_steps_after_success": 15
}
}
},
"dataset": {
"repo_id": "hf_username/dataset_name",
"dataset_root": "data/your_dataset",
"num_episodes": 20,
"push_to_hub": true
},
"mode": "record"
}
```
@@ -421,9 +475,17 @@ To use your trained reward classifier, configure the `HILSerlRobotEnvConfig` to
<!-- prettier-ignore-start -->
```python
env_config = HILSerlRobotEnvConfig(
reward_classifier_pretrained_path="path_to_your_pretrained_trained_model",
# Other environment parameters
config = GymManipulatorConfig(
env=HILSerlRobotEnvConfig(
processor=HILSerlProcessorConfig(
reward_classifier=RewardClassifierConfig(
pretrained_path="path_to_your_pretrained_trained_model"
)
),
# Other environment parameters
),
dataset=DatasetConfig(...),
mode=None # For training
)
```
<!-- prettier-ignore-end -->
@@ -432,7 +494,15 @@ or set the argument in the json config file.
```json
{
"reward_classifier_pretrained_path": "path_to_your_pretrained_model"
"env": {
"processor": {
"reward_classifier": {
"pretrained_path": "path_to_your_pretrained_model",
"success_threshold": 0.7,
"success_reward": 1.0
}
}
}
}
```
+58 -85
View File
@@ -161,35 +161,74 @@ class XarmEnv(EnvConfig):
@dataclass
class VideoRecordConfig:
"""Configuration for video recording in ManiSkill environments."""
enabled: bool = False
record_dir: str = "videos"
trajectory_name: str = "trajectory"
class ImagePreprocessingConfig:
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
resize_size: tuple[int, int] | None = None
@dataclass
class EnvTransformConfig:
"""Configuration for environment wrappers."""
class RewardClassifierConfig:
"""Configuration for reward classification."""
pretrained_path: str | None = None
success_threshold: float = 0.5
success_reward: float = 1.0
@dataclass
class InverseKinematicsConfig:
"""Configuration for inverse kinematics processing."""
urdf_path: str | None = None
target_frame_name: str | None = None
end_effector_bounds: dict[str, list[float]] | None = None
end_effector_step_sizes: dict[str, float] | None = None
max_gripper_pos: float | None = None
@dataclass
class ObservationConfig:
"""Configuration for observation processing."""
# ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
control_mode: str = "gamepad"
display_cameras: bool = False
add_joint_velocity_to_observation: bool = False
add_current_to_observation: bool = False
add_ee_pose_to_observation: bool = False
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
resize_size: tuple[int, int] | None = None
control_time_s: float = 20.0
fixed_reset_joint_positions: Any | None = None
reset_time_s: float = 5.0
display_cameras: bool = False
@dataclass
class GripperConfig:
"""Configuration for gripper control and penalties."""
use_gripper: bool = True
gripper_quantization_threshold: float | None = 0.8
gripper_penalty: float = 0.0
gripper_penalty_in_reward: bool = False
@dataclass
class ResetConfig:
"""Configuration for environment reset behavior."""
fixed_reset_joint_positions: Any | None = None
reset_time_s: float = 5.0
control_time_s: float = 20.0
terminate_on_success: bool = True
number_of_steps_after_success: int = 0
@dataclass
class HILSerlProcessorConfig:
"""Configuration for environment processing pipeline."""
control_mode: str = "gamepad"
observation: ObservationConfig = field(default_factory=ObservationConfig)
image_preprocessing: ImagePreprocessingConfig = field(default_factory=ImagePreprocessingConfig)
gripper: GripperConfig = field(default_factory=GripperConfig)
reset: ResetConfig = field(default_factory=ResetConfig)
inverse_kinematics: InverseKinematicsConfig = field(default_factory=InverseKinematicsConfig)
reward_classifier: RewardClassifierConfig = field(default_factory=RewardClassifierConfig)
@EnvConfig.register_subclass(name="gym_manipulator")
@dataclass
class HILSerlRobotEnvConfig(EnvConfig):
@@ -197,77 +236,11 @@ class HILSerlRobotEnvConfig(EnvConfig):
robot: RobotConfig | None = None
teleop: TeleoperatorConfig | None = None
wrapper: EnvTransformConfig | None = None
fps: int = 10
processor: HILSerlProcessorConfig = field(default_factory=HILSerlProcessorConfig)
name: str = "real_robot"
mode: str | None = None # Either "record", "replay", None
repo_id: str | None = None
dataset_root: str | None = None
task: str | None = ""
num_episodes: int = 10 # only for record mode
episode: int = 0
device: str = "cuda"
push_to_hub: bool = True
pretrained_policy_name_or_path: str | None = None
reward_classifier_pretrained_path: str | None = None
# For the reward classifier, to record more positive examples after a success
number_of_steps_after_success: int = 0
@property
def gym_kwargs(self) -> dict:
return {}
@EnvConfig.register_subclass("hil")
@dataclass
class HILEnvConfig(EnvConfig):
"""Configuration for the HIL environment."""
name: str = "PandaPickCube"
task: str | None = "PandaPickCubeKeyboard-v0"
use_viewer: bool = True
gripper_penalty: float = 0.0
use_gamepad: bool = True
state_dim: int = 18
action_dim: int = 4
fps: int = 100
episode_length: int = 100
video_record: VideoRecordConfig = field(default_factory=VideoRecordConfig)
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(4,)),
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128)),
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(18,)),
}
)
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"observation.image": OBS_IMAGE,
"observation.state": OBS_STATE,
}
)
################# args from hilserlrobotenv
reward_classifier_pretrained_path: str | None = None
robot_config: RobotConfig | None = None
teleop_config: TeleoperatorConfig | None = None
wrapper: EnvTransformConfig | None = None
mode: str | None = None # Either "record", "replay", None
repo_id: str | None = None
dataset_root: str | None = None
num_episodes: int = 10 # only for record mode
episode: int = 0
device: str = "cuda"
push_to_hub: bool = True
pretrained_policy_name_or_path: str | None = None
# For the reward classifier, to record more positive examples after a success
number_of_steps_after_success: int = 0
############################
@property
def gym_kwargs(self) -> dict:
return {
"use_viewer": self.use_viewer,
"use_gamepad": self.use_gamepad,
"gripper_penalty": self.gripper_penalty,
}
+18
View File
@@ -15,6 +15,12 @@
# limitations under the License.
from .device_processor import DeviceProcessor
from .hil_processor import (
GripperPenaltyProcessor,
ImageCropResizeProcessor,
InterventionActionProcessor,
TimeLimitProcessor,
)
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor
from .observation_processor import (
ImageProcessor,
@@ -36,15 +42,26 @@ from .pipeline import (
TruncatedProcessor,
)
from .rename_processor import RenameProcessor
from .robot_processor import (
InverseKinematicsProcessor,
JointVelocityProcessor,
MotorCurrentProcessor,
)
__all__ = [
"ActionProcessor",
"DeviceProcessor",
"DoneProcessor",
"EnvTransition",
"GripperPenaltyProcessor",
"IdentityProcessor",
"ImageCropResizeProcessor",
"ImageProcessor",
"InfoProcessor",
"InterventionActionProcessor",
"InverseKinematicsProcessor",
"JointVelocityProcessor",
"MotorCurrentProcessor",
"NormalizerProcessor",
"UnnormalizerProcessor",
"ObservationProcessor",
@@ -54,6 +71,7 @@ __all__ = [
"RewardProcessor",
"RobotProcessor",
"StateProcessor",
"TimeLimitProcessor",
"TransitionKey",
"TruncatedProcessor",
"VanillaObservationProcessor",
+331
View File
@@ -0,0 +1,331 @@
import time
from dataclasses import dataclass
from typing import Any
import torch
import torchvision.transforms.functional as F # noqa: N812
from lerobot.configs.types import PolicyFeature
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
@ProcessorStepRegistry.register("image_crop_resize_processor")
class ImageCropResizeProcessor:
"""Crop and resize image observations."""
crop_params_dict: dict[str, tuple[int, int, int, int]]
resize_size: tuple[int, int] = (128, 128)
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
if self.resize_size is None and not self.crop_params_dict:
return transition
new_observation = dict(observation)
# Process all image keys in the observation
for key in observation:
if "image" not in key:
continue
image = observation[key]
device = image.device
if device.type == "mps":
image = image.cpu()
# Crop if crop params are provided for this key
if key in self.crop_params_dict:
crop_params = self.crop_params_dict[key]
image = F.crop(image, *crop_params)
# Always resize
image = F.resize(image, self.resize_size)
image = image.clamp(0.0, 1.0)
new_observation[key] = image.to(device)
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"crop_params_dict": self.crop_params_dict,
"resize_size": self.resize_size,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("time_limit_processor")
class TimeLimitProcessor:
"""Track episode time and enforce time limits."""
max_episode_steps: int
current_step: int = 0
def __call__(self, transition: EnvTransition) -> EnvTransition:
truncated = transition.get(TransitionKey.TRUNCATED)
if truncated is None:
return transition
self.current_step += 1
if self.current_step >= self.max_episode_steps:
truncated = True
new_transition = transition.copy()
new_transition[TransitionKey.TRUNCATED] = truncated
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"max_episode_steps": self.max_episode_steps,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
self.current_step = 0
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("gripper_penalty_processor")
class GripperPenaltyProcessor:
penalty: float = -0.01
max_gripper_pos: float = 30.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Calculate gripper penalty and add to complementary data."""
action = transition.get(TransitionKey.ACTION)
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if complementary_data is None or action is None:
return transition
current_gripper_pos = complementary_data.get("raw_joint_positions", None)[-1]
if current_gripper_pos is None:
return transition
gripper_action = action[-1].item()
gripper_action_normalized = gripper_action / self.max_gripper_pos
# Normalize gripper state and action
gripper_state_normalized = current_gripper_pos / self.max_gripper_pos
gripper_action_normalized = gripper_action - 1.0
# Calculate penalty boolean as in original
gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or (
gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5
)
gripper_penalty = self.penalty * int(gripper_penalty_bool)
# Add penalty information to complementary data
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
# Create new complementary data with penalty info
new_complementary_data = dict(complementary_data)
new_complementary_data["discrete_penalty"] = gripper_penalty
# Create new transition with updated complementary data
new_transition = transition.copy()
new_transition[TransitionKey.COMPLEMENTARY_DATA] = new_complementary_data
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"penalty": self.penalty,
"max_gripper_pos": self.max_gripper_pos,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
"""Reset the processor state."""
self.last_gripper_state = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("intervention_action_processor")
class InterventionActionProcessor:
"""Handle action intervention based on signals in the transition.
This processor checks for intervention signals in the transition's complementary data
and overrides agent actions when intervention is active.
"""
use_gripper: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition.get(TransitionKey.ACTION)
if action is None:
return transition
# Get intervention signals from complementary data
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
teleop_action = complementary_data.get("teleop_action", {})
is_intervention = complementary_data.get("is_intervention", False)
terminate_episode = complementary_data.get("terminate_episode", False)
success = complementary_data.get("success", False)
rerecord_episode = complementary_data.get("rerecord_episode", False)
new_transition = transition.copy()
# Override action if intervention is active
if is_intervention and teleop_action:
# Convert teleop_action dict to tensor format
action_list = [
teleop_action.get("delta_x", 0.0),
teleop_action.get("delta_y", 0.0),
teleop_action.get("delta_z", 0.0),
]
if self.use_gripper:
action_list.append(teleop_action.get("gripper", 1.0))
teleop_action_tensor = torch.tensor(action_list, dtype=action.dtype, device=action.device)
new_transition[TransitionKey.ACTION] = teleop_action_tensor
# Handle episode termination
new_transition[TransitionKey.DONE] = bool(terminate_episode)
new_transition[TransitionKey.REWARD] = float(success)
# Update info with intervention metadata
info = new_transition.get(TransitionKey.INFO, {})
info["is_intervention"] = is_intervention
info["rerecord_episode"] = rerecord_episode
info["next.success"] = success if terminate_episode else info.get("next.success", False)
new_transition[TransitionKey.INFO] = info
new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"] = new_transition[
TransitionKey.ACTION
]
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"use_gripper": self.use_gripper,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("reward_classifier_processor")
class RewardClassifierProcessor:
"""Apply reward classification to image observations.
This processor runs a trained reward classifier on image observations
to predict rewards and success states, potentially terminating episodes
when success is achieved.
"""
pretrained_path: str = None
device: str = "cpu"
success_threshold: float = 0.5
success_reward: float = 1.0
terminate_on_success: bool = True
reward_classifier: Any = None
def __post_init__(self):
"""Initialize the reward classifier after dataclass initialization."""
if self.pretrained_path is not None:
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
self.reward_classifier.to(self.device)
self.reward_classifier.eval()
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None or self.reward_classifier is None:
return transition
# Extract images from observation
images = {key: value for key, value in observation.items() if "image" in key}
if not images:
return transition
# Run reward classifier
start_time = time.perf_counter()
with torch.inference_mode():
success = self.reward_classifier.predict_reward(images, threshold=self.success_threshold)
classifier_frequency = 1 / (time.perf_counter() - start_time)
# Calculate reward and termination
reward = transition.get(TransitionKey.REWARD, 0.0)
terminated = transition.get(TransitionKey.DONE, False)
if success == 1.0:
reward = self.success_reward
if self.terminate_on_success:
terminated = True
# Update transition
new_transition = transition.copy()
new_transition[TransitionKey.REWARD] = reward
new_transition[TransitionKey.DONE] = terminated
# Update info with classifier frequency
info = new_transition.get(TransitionKey.INFO, {})
info["reward_classifier_frequency"] = classifier_frequency
new_transition[TransitionKey.INFO] = info
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"device": self.device,
"success_threshold": self.success_threshold,
"success_reward": self.success_reward,
"terminate_on_success": self.terminate_on_success,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
+245
View File
@@ -0,0 +1,245 @@
from dataclasses import dataclass, field
from typing import Any
import gymnasium as gym
import numpy as np
import torch
from lerobot.configs.types import PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
@dataclass
@ProcessorStepRegistry.register("joint_velocity_processor")
class JointVelocityProcessor:
"""Add joint velocity information to observations.
Computes joint velocities by tracking changes in joint positions over time.
"""
joint_velocity_limits: float = 100.0
dt: float = 1.0 / 10
last_joint_positions: torch.Tensor | None = None
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
# Get current joint positions (assuming they're in observation.state)
current_positions = observation.get("observation.state")
if current_positions is None:
return transition
# Initialize last joint positions if not already set
if self.last_joint_positions is None:
self.last_joint_positions = current_positions.clone()
# Compute velocities
joint_velocities = (current_positions - self.last_joint_positions) / self.dt
self.last_joint_positions = current_positions.clone()
# Extend observation with velocities
extended_state = torch.cat([current_positions, joint_velocities], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation["observation.state"] = extended_state
# Return new transition
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"joint_velocity_limits": self.joint_velocity_limits,
"dt": self.dt,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
self.last_joint_positions = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("current_processor")
class MotorCurrentProcessor:
"""Add motor current information to observations."""
env: gym.Env = None
def __call__(self, transition: EnvTransition) -> EnvTransition:
observation = transition.get(TransitionKey.OBSERVATION)
if observation is None:
return transition
# Get current values from complementary_data (where robot state would be stored)
present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current")
motor_currents = torch.tensor(
[present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors],
dtype=torch.float32,
).unsqueeze(0)
current_state = observation.get("observation.state")
if current_state is None:
return transition
extended_state = torch.cat([current_state, motor_currents], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation["observation.state"] = extended_state
# Return new transition
new_transition = transition.copy()
new_transition[TransitionKey.OBSERVATION] = new_observation
return new_transition
def get_config(self) -> dict[str, Any]:
return {}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
pass
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
@dataclass
@ProcessorStepRegistry.register("inverse_kinematics_processor")
class InverseKinematicsProcessor:
"""Convert end-effector space actions to joint space using inverse kinematics.
This processor transforms delta commands in end-effector space (delta_x, delta_y, delta_z)
to joint space commands using forward and inverse kinematics. It maintains the current
end-effector pose and joint positions to compute the transformations.
"""
urdf_path: str
target_frame_name: str = "gripper_link"
end_effector_step_sizes: dict[str, float] = field(default_factory=lambda: {"x": 1.0, "y": 1.0, "z": 1.0})
end_effector_bounds: dict[str, list[float]] | None = None
max_gripper_pos: float = 30.0
# State tracking
current_ee_pos: np.ndarray | None = field(default=None, init=False, repr=False)
current_joint_pos: np.ndarray | None = field(default=None, init=False, repr=False)
kinematics: RobotKinematics | None = field(default=None, init=False, repr=False)
def __post_init__(self):
"""Initialize the kinematics module after dataclass initialization."""
if self.urdf_path:
self.kinematics = RobotKinematics(
urdf_path=self.urdf_path,
target_frame_name=self.target_frame_name,
)
def __call__(self, transition: EnvTransition) -> EnvTransition:
action = transition.get(TransitionKey.ACTION)
if action is None:
return transition
action_np = action.detach().cpu().numpy().squeeze()
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
raw_joint_positions = complementary_data.get("raw_joint_positions")
current_gripper_pos = raw_joint_positions[-1]
if self.current_joint_pos is None:
self.current_joint_pos = raw_joint_positions
# Initialize end-effector position if not available
if self.current_joint_pos is None:
return transition # Cannot proceed without joint positions
# Calculate current end-effector position using forward kinematics
if self.current_ee_pos is None:
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos)
# Scale deltas by step sizes
delta_ee = np.array(
[
action_np[0] * self.end_effector_step_sizes["x"],
action_np[1] * self.end_effector_step_sizes["y"],
action_np[2] * self.end_effector_step_sizes["z"],
],
dtype=np.float32,
)
# Set desired end-effector position by adding delta
desired_ee_pos = np.eye(4)
desired_ee_pos[:3, :3] = self.current_ee_pos[:3, :3] # Keep orientation
# Add delta to position and clip to bounds
desired_ee_pos[:3, 3] = self.current_ee_pos[:3, 3] + delta_ee
if self.end_effector_bounds is not None:
desired_ee_pos[:3, 3] = np.clip(
desired_ee_pos[:3, 3],
self.end_effector_bounds["min"],
self.end_effector_bounds["max"],
)
# Compute inverse kinematics to get joint positions
target_joint_values = self.kinematics.inverse_kinematics(self.current_joint_pos, desired_ee_pos)
# Update current state
self.current_ee_pos = desired_ee_pos.copy()
self.current_joint_pos = target_joint_values.copy()
# Create new action with joint space commands
gripper_action = current_gripper_pos
if len(action_np) > 3:
# Handle gripper command separately
gripper_command = action_np[3]
# Process gripper command (convert from [0,2] to delta) and discretize
gripper_delta = np.round(gripper_command - 1.0).astype(int) * self.max_gripper_pos
gripper_action = np.clip(current_gripper_pos + gripper_delta, 0, self.max_gripper_pos)
# Combine joint positions and gripper
target_joint_values[-1] = gripper_action
converted_action = torch.from_numpy(target_joint_values).to(action.device).to(action.dtype)
new_transition = transition.copy()
new_transition[TransitionKey.ACTION] = converted_action
return new_transition
def get_config(self) -> dict[str, Any]:
return {
"urdf_path": self.urdf_path,
"target_frame_name": self.target_frame_name,
"end_effector_step_sizes": self.end_effector_step_sizes,
"end_effector_bounds": self.end_effector_bounds,
"max_gripper_pos": self.max_gripper_pos,
}
def state_dict(self) -> dict[str, torch.Tensor]:
return {}
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
pass
def reset(self) -> None:
"""Reset the processor state."""
self.current_ee_pos = None
self.current_joint_pos = None
def feature_contract(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
return features
+61 -26
View File
@@ -62,8 +62,14 @@ from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.processor.pipeline import TransitionKey
from lerobot.robots import so100_follower # noqa: F401
from lerobot.scripts.rl.gym_manipulator import make_robot_env
from lerobot.scripts.rl.gym_manipulator import (
create_transition,
make_processors,
make_robot_env,
step_env_and_process_transition,
)
from lerobot.teleoperators import gamepad, so101_leader # noqa: F401
from lerobot.transport import services_pb2, services_pb2_grpc
from lerobot.transport.utils import (
@@ -236,7 +242,8 @@ def act_with_policy(
logging.info("make_env online")
online_env = make_robot_env(cfg=cfg.env)
online_env, teleop_device = make_robot_env(cfg=cfg.env)
env_processor, action_processor = make_processors(online_env, cfg.env)
set_seed(cfg.seed)
device = get_safe_torch_device(cfg.policy.device, log=True)
@@ -257,6 +264,13 @@ def act_with_policy(
assert isinstance(policy, nn.Module)
obs, info = online_env.reset()
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
transition = env_processor(transition)
# NOTE: For the moment we will solely handle the case of a single environment
sum_reward_episode = 0
@@ -274,45 +288,57 @@ def act_with_policy(
logging.info("[ACTOR] Shutting down act_with_policy")
return
if interaction_step >= cfg.policy.online_step_before_learning:
# Time policy inference and check if it meets FPS requirement
with policy_timer:
action = policy.select_action(batch=obs)
policy_fps = policy_timer.fps_last
observation = transition[TransitionKey.OBSERVATION]
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
# Time policy inference and check if it meets FPS requirement
with policy_timer:
# Extract observation from transition for policy
action = policy.select_action(batch=observation)
policy_fps = policy_timer.fps_last
else:
action = online_env.action_space.sample()
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
next_obs, reward, done, truncated, info = online_env.step(action)
# Use the new step function
new_transition, terminate_episode = step_env_and_process_transition(
env=online_env,
transition=transition,
action=action,
teleop_device=teleop_device,
env_processor=env_processor,
action_processor=action_processor,
)
# Extract values from processed transition
next_observation = new_transition[TransitionKey.OBSERVATION]
executed_action = new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
reward = new_transition[TransitionKey.REWARD]
done = new_transition.get(TransitionKey.DONE, False)
truncated = new_transition.get(TransitionKey.TRUNCATED, False)
sum_reward_episode += float(reward)
# Increment total steps counter for intervention rate
episode_total_steps += 1
# NOTE: We override the action if the intervention is True, because the action applied is the intervention action
if "is_intervention" in info and info["is_intervention"]:
# NOTE: The action space for demonstration before hand is with the full action space
# but sometimes for example we want to deactivate the gripper
action = info["action_intervention"]
# Check for intervention from transition info
intervention_info = new_transition[TransitionKey.INFO]
if intervention_info.get("is_intervention", False):
episode_intervention = True
# Increment intervention steps counter
episode_intervention_steps += 1
# Create transition for learner (convert to old format)
list_transition_to_send_to_learner.append(
Transition(
state=obs,
action=action,
state=observation,
action=executed_action,
reward=reward,
next_state=next_obs,
next_state=next_observation,
done=done,
truncated=truncated, # TODO: (azouitine) Handle truncation properly
complementary_info=info,
truncated=truncated,
complementary_info={}, # new_transition[TransitionKey.COMPLEMENTARY_DATA],
)
)
# assign obs to the next obs and continue the rollout
obs = next_obs
# Update transition for next iteration
transition = new_transition
if done or truncated:
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
@@ -347,12 +373,21 @@ def act_with_policy(
)
)
# Reset intervention counters
# Reset intervention counters and environment
sum_reward_episode = 0.0
episode_intervention = False
episode_intervention_steps = 0
episode_total_steps = 0
# Reset environment and processors
obs, info = online_env.reset()
complementary_data = {"raw_joint_positions": info.pop("raw_joint_positions")}
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
transition = env_processor(transition)
if cfg.env.fps is not None:
dt_time = time.perf_counter() - start_time
File diff suppressed because it is too large Load Diff
@@ -107,6 +107,45 @@ class GamepadTeleop(Teleoperator):
return action_dict
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the gamepad such as intervention status,
episode termination, success indicators, etc.
Returns:
Dictionary containing:
- is_intervention: bool - Whether human is currently intervening
- terminate_episode: bool - Whether to terminate the current episode
- success: bool - Whether the episode was successful
- rerecord_episode: bool - Whether to rerecord the episode
"""
if self.gamepad is None:
return {
"is_intervention": False,
"terminate_episode": False,
"success": False,
"rerecord_episode": False,
}
# Update gamepad state to get fresh inputs
self.gamepad.update()
# Check if intervention is active
is_intervention = self.gamepad.should_intervene()
# Get episode end status
episode_end_status = self.gamepad.get_episode_end_status()
terminate_episode = episode_end_status is not None
success = episode_end_status == "success"
rerecord_episode = episode_end_status == "rerecord_episode"
return {
"is_intervention": is_intervention,
"terminate_episode": terminate_episode,
"success": success,
"rerecord_episode": rerecord_episode,
}
def disconnect(self) -> None:
"""Disconnect from the gamepad."""
if self.gamepad is not None:
@@ -235,3 +235,67 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
action_dict["gripper"] = gripper_action
return action_dict
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the keyboard such as intervention status,
episode termination, success indicators, etc.
Keyboard mappings:
- Any movement keys pressed = intervention active
- 's' key = success (terminate episode successfully)
- 'r' key = rerecord episode (terminate and rerecord)
- 'q' key = quit episode (terminate without success)
Returns:
Dictionary containing:
- is_intervention: bool - Whether human is currently intervening
- terminate_episode: bool - Whether to terminate the current episode
- success: bool - Whether the episode was successful
- rerecord_episode: bool - Whether to rerecord the episode
"""
if not self.is_connected:
return {
"is_intervention": False,
"terminate_episode": False,
"success": False,
"rerecord_episode": False,
}
# Check if any movement keys are currently pressed (indicates intervention)
movement_keys = [
keyboard.Key.up,
keyboard.Key.down,
keyboard.Key.left,
keyboard.Key.right,
keyboard.Key.shift,
keyboard.Key.shift_r,
keyboard.Key.ctrl_r,
keyboard.Key.ctrl_l,
]
is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys)
# Check for episode control commands from misc_keys_queue
terminate_episode = False
success = False
rerecord_episode = False
# Process any pending misc keys
while not self.misc_keys_queue.empty():
key = self.misc_keys_queue.get_nowait()
if key == "s":
terminate_episode = True
success = True
elif key == "r":
terminate_episode = True
rerecord_episode = True
elif key == "q":
terminate_episode = True
success = False
return {
"is_intervention": is_intervention,
"terminate_episode": terminate_episode,
"success": success,
"rerecord_episode": rerecord_episode,
}
+12
View File
@@ -160,6 +160,18 @@ class Teleoperator(abc.ABC):
"""
pass
@abc.abstractmethod
def get_teleop_events(self) -> dict[str, Any]:
"""
Get extra control events from the teleoperator such as intervention status,
episode termination, success indicators, etc.
Check the implementation of the gamepad for an example.
Returns:
dict[str, Any]: A dictionary containing control events with keys and values that are specific to the setup.
"""
pass
@abc.abstractmethod
def send_feedback(self, feedback: dict[str, Any]) -> None:
"""