Compare commits

...

51 Commits

Author SHA1 Message Date
Khalil Meftah 5c444302c1 feat(so_follower): synchronize goal position with present position to prevent positional error during torque re-enablement 2026-04-28 18:40:48 +02:00
Khalil Meftah c868f874f1 feat(teleop): enhance leader-follower behavior and torque management in SO101 teleoperation 2026-04-28 17:46:06 +02:00
Khalil Meftah e228f0880f feat(teleop): add SO100/SO101 leader-follower teleoperation example
fix: update import for SO101Leader in so101_leader_follower.py
chore: include SO101LeaderFollower in exports
2026-04-28 17:28:15 +02:00
Khalil Meftah fe2c32d9e7 add so leader arm 2026-04-28 16:53:36 +02:00
Khalil Meftah 6ed80f5a59 Merge remote-tracking branch 'origin/main' into user/khalil-meftah/2026-02-16-rl-stack-refactor
# Conflicts:
#	src/lerobot/policies/__init__.py
#	src/lerobot/rl/actor.py
2026-04-28 12:04:13 +02:00
Khalil Meftah ef6b3b5b0f refactor: simplify docstrings for clarity and conciseness across multiple files 2026-04-28 11:11:02 +02:00
Khalil Meftah e298474bf3 fix(tests): gate RL tests on the datasets extra 2026-04-27 16:53:34 +02:00
Khalil Meftah 577f14337a refactor(tests): remove grpc import checks from test files for cleaner code 2026-04-27 16:20:13 +02:00
Khalil Meftah 47be90f040 refactor(rl): make RLAlgorithmConfig an abstract base class for better extensibility 2026-04-27 15:59:59 +02:00
Khalil Meftah 47dd65347e refactor(rl): add type property to RLAlgorithmConfig for better clarity 2026-04-27 15:57:24 +02:00
Khalil Meftah fd5a788120 refactor(rl): add make_algorithm_config function for RLAlgorithmConfig instantiation 2026-04-27 15:55:16 +02:00
Khalil Meftah 9ce9e01469 refactor(rl): make algorithm a nested config so all SAC hyperparameters are JSON-addressable 2026-04-27 13:39:03 +02:00
Khalil Meftah 21c16a27f0 Revert "perf(observation_processor): add CUDA support for image processing"
This reverts commit 38b88c414c.
2026-04-27 11:52:19 +02:00
Khalil Meftah b3164543f4 fix(rl): enhance intervention handling in actor and learner
(cherry picked from commit ef8bfffbd7)
2026-04-27 11:35:21 +02:00
Khalil Meftah f3993cbbb1 fix(rl): improve action processing for discrete and continuous actions
(cherry picked from commit f887ab3f6a)
2026-04-27 11:35:20 +02:00
Khalil Meftah c278cfa026 fix(rl): postprocess action in actor
(cherry picked from commit c2556439e5)
2026-04-27 11:35:20 +02:00
Khalil Meftah 77d18659b1 fix(rl): mirror gym_manipulator in actor
(cherry picked from commit d2a046dfc5)
2026-04-27 11:35:19 +02:00
Khalil Meftah 6347edefb1 fix(rl): merge environment and action-processor info in transition processing
(cherry picked from commit 30e1886b64)
2026-04-27 11:35:18 +02:00
Khalil Meftah eda47eca18 fix(rl): update neutral gripper action
(cherry picked from commit 9c9064e5be)
2026-04-27 11:35:18 +02:00
Khalil Meftah a64e6f5070 fix(rl): clarify discrete gripper action mapping in GripperVelocityToJoint for SO100
(cherry picked from commit 494f469a2b)
2026-04-27 11:35:17 +02:00
Khalil Meftah 3def86c2c3 fix(rl): add time limit processor to environment pipeline
(cherry picked from commit cd105f65cb)
2026-04-27 11:35:17 +02:00
Khalil Meftah 356a64d8c4 fix(rl): correctly wire HIL-SERL gripper penalty through processor pipeline
(cherry picked from commit 9c2af818ff)
2026-04-27 11:35:16 +02:00
Khalil Meftah 38b88c414c perf(observation_processor): add CUDA support for image processing 2026-04-24 13:36:26 +02:00
Khalil Meftah 1ed32210c7 refactor(rl/sac): consolidate hyperparameter ownership and clean up discrete critic 2026-04-24 13:18:33 +02:00
Khalil Meftah 06255996ea refactor(policies): rename policies/sac → policies/gaussian_actor 2026-04-23 19:13:18 +02:00
Khalil Meftah 8065bf15c7 fix test for flat dict structure 2026-04-21 12:06:25 +02:00
Khalil Meftah 8191d2d87f remove unused type alias 2026-04-21 11:56:27 +02:00
Khalil Meftah 6b93f31238 fix docstring 2026-04-21 11:55:17 +02:00
Khalil Meftah a4c0c9e358 update losses names in tests 2026-04-21 11:53:32 +02:00
Khalil Meftah a84b0e8132 refactor(sac): decouple algorithm hyperparameters from policy config 2026-04-18 16:40:56 +02:00
Khalil Meftah 2487a6ee6d perf(rl): use async iterators in OnlineOfflineMixer.get_iterator 2026-04-18 16:02:28 +02:00
Khalil Meftah 72fb0faf62 refactor(sac): simplify optimizer return structure 2026-04-18 15:45:22 +02:00
Khalil Meftah 2c97cb23c8 refactor(rl): update shutdown_event type hints from 'any' to 'Any' for consistency and clarity 2026-04-18 15:39:32 +02:00
Khalil Meftah 87d4c9879c fix(sac): clarify torch.compile status 2026-04-18 15:19:35 +02:00
Khalil Meftah e4c1a8472d fix(config): update vision encoder model name to lerobot/resnet10 2026-04-18 15:15:59 +02:00
Khalil Meftah d7e25c8326 refactor(rl): expose public API in rl/__init__ and use relative imports in sub-packages 2026-04-16 15:46:34 +02:00
Khalil Meftah a5ad273b62 fix(tests): skip tests that require grpc if not available 2026-04-15 16:30:20 +02:00
Khalil Meftah 23bece96a4 fix(tests): ensure tensor stats comparison accounts for reshaping in normalization tests 2026-04-15 16:12:08 +02:00
Khalil Meftah 7a1c9e74c3 fix: skip tests that require grpc if not available 2026-04-15 15:18:04 +02:00
Khalil Meftah c88cf979f1 fix: use string key for IS_INTERVENTION in complementary_info to avoid torch.load serialization error 2026-04-15 11:49:38 +02:00
Khalil Meftah 79a9ebdaa6 fix: add try/finally to control_loop to ensure image writer cleanup on exit 2026-04-14 17:54:35 +02:00
Khalil Meftah da6e36fd03 Merge remote-tracking branch 'origin/main' into user/khalil-meftah/2026-02-16-rl-stack-refactor 2026-04-14 17:14:56 +02:00
Khalil Meftah 64dc08cb7b fix: include IS_INTERVENTION in complementary_info sent to learner for offline replay buffer 2026-04-14 16:35:08 +02:00
Khalil Meftah e6d282108d Fix: add kwargs in reward classifier __init__() 2026-04-14 11:13:43 +02:00
Khalil Meftah a8838c081b perf: remove redundant CPU→GPU→CPU transition move in learner 2026-04-13 19:06:28 +02:00
Khalil Meftah ee0814ef60 refactor: update SACAlgorithm to pass action_dim to _init_critics and fix encoder reference 2026-04-13 18:31:17 +02:00
Khalil Meftah 7b0bdf2a98 fix: add thread synchronization to ReplayBuffer to prevent race condition between add() and sample() 2026-04-13 18:27:24 +02:00
Khalil Meftah 9422dc98c2 fix: remove leftover normalization calls from reward classifier predict_reward
Fixes #2355
2026-04-13 13:30:50 +02:00
Khalil Meftah 11a0b0174f fix(teleop): keyboard EE teleop not registering special keys and losing intervention state
Fixes #2345

Co-authored-by: jpizarrom <jpizarrom@gmail.com>
2026-04-13 12:31:00 +02:00
Khalil Meftah 036b310a97 chore: clarify torch.compile disabled note in SACAlgorithm 2026-04-13 11:49:27 +02:00
Khalil Meftah e022207c75 refactor: RL stack refactoring — RLAlgorithm, RLTrainer, DataMixer, and SAC restructuring 2026-04-13 11:39:48 +02:00
62 changed files with 4344 additions and 1693 deletions
+3 -3
View File
@@ -820,10 +820,10 @@ The LeRobot system uses a distributed actor-learner architecture for training. T
Create a training configuration file (example available [here](https://huggingface.co/datasets/lerobot/config_examples/resolve/main/rl/train_config.json)). The training config is based on the main `TrainRLServerPipelineConfig` class in `lerobot/configs/train.py`.
1. Configure the policy settings (`type="sac"`, `device`, etc.)
1. Configure the policy settings (`type="gaussian_actor"`, `device`, etc.)
2. Set `dataset` to your cropped dataset
3. Configure environment settings with crop parameters
4. Check the other parameters related to SAC in [configuration_sac.py](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/sac/configuration_sac.py#L79).
4. Check the other parameters related to the Gaussian Actor in [configuration_gaussian_actor.py](https://github.com/huggingface/lerobot/blob/main/src/lerobot/policies/gaussian_actor/configuration_gaussian_actor.py#L79).
5. Verify that the `policy` config is correct with the right `input_features` and `output_features` for your task.
**Starting the Learner**
@@ -926,7 +926,7 @@ The ideal behaviour is that your intervention rate should drop gradually during
Some configuration values have a disproportionate impact on training stability and speed:
- **`temperature_init`** (`policy.temperature_init`) initial entropy temperature in SAC. Higher values encourage more exploration; lower values make the policy more deterministic early on. A good starting point is `1e-2`. We observed that setting it too high can make human interventions ineffective and slow down learning.
- **`temperature_init`** (`algorithm.temperature_init`) initial entropy temperature in SAC. Higher values encourage more exploration; lower values make the policy more deterministic early on. A good starting point is `1e-2`. We observed that setting it too high can make human interventions ineffective and slow down learning.
- **`policy_parameters_push_frequency`** (`policy.actor_learner_config.policy_parameters_push_frequency`) interval in _seconds_ between two weight pushes from the learner to the actor. The default is `4 s`. Decrease to **1-2 s** to provide fresher weights (at the cost of more network traffic); increase only if your connection is slow, as this will reduce sample efficiency.
- **`storage_device`** (`policy.storage_device`) device on which the learner keeps the policy parameters. If you have spare GPU memory, set this to `"cuda"` (instead of the default `"cpu"`). Keeping the weights on-GPU removes CPU→GPU transfer overhead and can significantly increase the number of learner updates per second.
+175
View File
@@ -0,0 +1,175 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Simple SO100/SO101 leader-follower teleoperation with spacebar intervention toggle.
Modes:
- Default (not intervening): follower holds its current position.
The leader arm has torque ENABLED and mirrors the follower so there is no
large position jump when intervention starts.
- Intervention (SPACE pressed): leader torque DISABLED, human moves the leader
freely, and the follower mirrors the leader joint-by-joint.
Usage:
uv run python examples/so100_teleop/teleop.py
Controls:
SPACE — toggle intervention on/off
Ctrl+C — exit
"""
import logging
import os
import sys
import time
from threading import Event, Thread
from lerobot.robots.so_follower import SO101Follower, SO101FollowerConfig
from lerobot.teleoperators.so_leader import SO101Leader
from lerobot.teleoperators.so_leader.config_so_leader import SOLeaderTeleopConfig
from lerobot.utils.robot_utils import precise_sleep
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ── pynput keyboard listener ─────────────────────────────────────────────────
PYNPUT_AVAILABLE = True
try:
if "DISPLAY" not in os.environ and "linux" in sys.platform:
raise ImportError("No DISPLAY set, pynput skipped.")
from pynput import keyboard as pynput_keyboard
except Exception:
pynput_keyboard = None
PYNPUT_AVAILABLE = False
# ── Configure ports ──────────────────────────────────────────────────────────
FOLLOWER_PORT = "/dev/ttyUSB0" # ← change to your follower port
LEADER_PORT = "/dev/ttyUSB1" # ← change to your leader port
FPS = 30
def hold_position(robot) -> dict:
"""Read current joint positions and write them back as the goal.
This prevents the motors from snapping to a stale Goal_Position register
value (which can happen when torque is re-enabled after calibration).
Returns the current position dict for reuse.
"""
current = robot.bus.sync_read("Present_Position")
robot.bus.sync_write("Goal_Position", current)
return {f"{motor}.pos": val for motor, val in current.items()}
# ── Connect ───────────────────────────────────────────────────────────────────
follower_config = SO101FollowerConfig(
port=FOLLOWER_PORT,
id="follower_arm",
use_degrees=True,
)
leader_config = SOLeaderTeleopConfig(
port=LEADER_PORT,
id="leader_arm",
use_degrees=True,
)
follower = SO101Follower(follower_config)
leader = SO101Leader(leader_config)
follower.connect()
leader.connect()
# ── CRITICAL: hold both arms at their current position before doing anything ─
# configure() enables follower torque, and the Goal_Position register may contain
# a stale value from a previous session. Writing current→goal prevents sudden motion.
follower_current = hold_position(follower)
leader_current = hold_position(leader) # leader torque is still off here, but sets the register
# ── Intervention state + keyboard listener ───────────────────────────────────
is_intervening = False
stop_event = Event()
def _start_keyboard_listener():
if not PYNPUT_AVAILABLE:
logger.warning("pynput not available — spacebar toggle disabled.")
return None
def on_press(key):
global is_intervening
if key == pynput_keyboard.Key.space:
is_intervening = not is_intervening
state = "INTERVENTION (leader → follower)" if is_intervening else "IDLE (follower holds)"
print(f"\n[SPACE] {state}\n")
def listen():
with pynput_keyboard.Listener(on_press=on_press) as listener:
while not stop_event.is_set():
time.sleep(0.05)
listener.stop()
t = Thread(target=listen, daemon=True)
t.start()
return t
kbd_thread = _start_keyboard_listener()
# Enable leader torque AFTER writing its goal to current position, so it holds in place.
leader.bus.sync_write("Torque_Enable", 1)
leader_torque_on = True
print("\nTeleoperation ready.")
print(" SPACE → toggle intervention (leader controls follower)")
print(" Ctrl+C → exit\n")
try:
while True:
t0 = time.perf_counter()
if is_intervening:
# ── Intervention: leader torque OFF, follower mirrors leader ──────
if leader_torque_on:
leader.bus.sync_write("Torque_Enable", 0)
leader_torque_on = False
leader_action = leader.get_action() # reads present leader joints
follower.send_action(leader_action) # follower tracks leader
else:
# ── Idle: leader torque ON, leader mirrors follower, follower holds
if not leader_torque_on:
# Before re-enabling torque, set the leader's goal to its current
# position so it doesn't snap to the follower position suddenly.
hold_position(leader)
leader.bus.sync_write("Torque_Enable", 1)
leader_torque_on = True
follower_obs = follower.get_observation()
# Command leader to match follower (so next intervention has no jump)
goal_pos = {motor: follower_obs[f"{motor}.pos"] for motor in leader.bus.motors}
leader.bus.sync_write("Goal_Position", goal_pos)
# Follower holds — no send_action call
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
except KeyboardInterrupt:
print("\nExiting...")
finally:
stop_event.set()
leader.bus.sync_write("Torque_Enable", 0)
follower.disconnect()
leader.disconnect()
@@ -0,0 +1,365 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from dataclasses import dataclass
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import (
ProcessorStepRegistry,
RobotAction,
RobotActionProcessorStep,
RobotObservation,
RobotProcessorPipeline,
TransitionKey,
)
from lerobot.processor.converters import (
create_transition,
identity_transition,
)
from lerobot.robots.robot import Robot
from lerobot.robots.so100_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
EEReferenceAndDelta,
GripperVelocityToJoint,
InverseKinematicsRLStep,
)
from lerobot.robots.so101_follower.config_so101_follower import SO101FollowerConfig
from lerobot.robots.so101_follower.so101_follower import SO101Follower
from lerobot.teleoperators.so101_leader.config_so101_leader import SO101LeaderConfig
from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.rotation import Rotation
def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None:
"""Reset robot arm to target position using smooth trajectory."""
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array(
[current_position_dict[name] for name in current_position_dict],
dtype=np.float32,
)
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 50)
) # NOTE: 30 is just an arbitrary number
for pose in trajectory:
action_dict = dict(zip(current_position_dict, pose, strict=False))
robot_arm.bus.sync_write("Goal_Position", action_dict)
precise_sleep(0.015)
@dataclass
class LogRobotAction(RobotActionProcessorStep):
def action(self, action: RobotAction) -> RobotAction:
print(f"Robot action: {action}")
return action
def transform_features(self, features):
# features[PipelineFeatureType.ACTION][ACTION] = PolicyFeature(
# type=FeatureType.ACTION, shape=(len(self.motor_names),)
# )
return features
@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_target_action")
@dataclass
class ForwardKinematicsJointsToEETargetAction(RobotActionProcessorStep):
"""
Computes the end-effector pose from joint positions using forward kinematics (FK).
This step is typically used to add the robot's Cartesian pose to the observation space,
which can be useful for visualization or as an input to a policy.
Attributes:
kinematics: The robot's kinematic model.
"""
kinematics: RobotKinematics
motor_names: list[str]
end_effector_step_sizes: dict
max_gripper_pos: float
use_ik_solution: bool = False
def action(self, action: RobotAction) -> RobotAction:
# return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names)
teleop_action = action
raw_joint_pos = self.transition.get(TransitionKey.OBSERVATION)
leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names])
leader_ee = self.kinematics.forward_kinematics(leader_pos)
if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA):
follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
else:
follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
follower_ee = self.kinematics.forward_kinematics(follower_pos)
follower_ee_pos = follower_ee[:3, 3]
follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
# follower_gripper_pos = raw_joint_pos["gripper.pos"]
follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor
leader_ee_pos = leader_ee[:3, 3]
leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec()
leader_gripper_pos = np.clip(
teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos
)
print("f pos:", follower_ee_pos)
print("l pos:", leader_ee_pos)
print("f rvec:", follower_ee_rvec)
print("l rvec:", leader_ee_rvec)
# follower_ee_pos = follower_ee[:3, 3]
# follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
delta_pos = leader_ee_pos - follower_ee_pos
# For rotation: compute relative rotation from follower to leader
# R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader
r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3]
delta_rvec = Rotation.from_matrix(r_delta).as_rotvec()
delta_gripper = leader_gripper_pos - follower_gripper_pos
desired = np.eye(4, dtype=float)
desired[:3, :3] = follower_ee[:3, :3] @ r_delta
desired[:3, 3] = follower_ee[:3, 3] + delta_pos
pos = desired[:3, 3]
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
assert np.allclose(pos, leader_ee_pos), "Position delta computation error"
assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error"
assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), (
"Gripper delta computation error"
)
# Normalize the action to the range [-1, 1]
delta_pos = delta_pos / np.array(
[
self.end_effector_step_sizes["x"],
self.end_effector_step_sizes["y"],
self.end_effector_step_sizes["z"],
]
)
delta_rvec = delta_rvec / np.array(
[
self.end_effector_step_sizes["wx"],
self.end_effector_step_sizes["wy"],
self.end_effector_step_sizes["wz"],
]
)
# Check if any of the normalized deltas exceed 1.0
max_normalized_pos = max(
abs(delta_pos[0]),
abs(delta_pos[1]),
abs(delta_pos[2]),
)
max_normalized_rot = max(
abs(delta_rvec[0]),
abs(delta_rvec[1]),
abs(delta_rvec[2]),
)
# Use the same scaling factor for both position and rotation
max_normalized = max(max_normalized_pos, max_normalized_rot)
if max_normalized > 1.0:
print(f"Warning: EE delta too large, scaling. Max normalized delta: {max_normalized_pos}")
print(f"Original delta_pos: {delta_pos}, delta_rvec: {delta_rvec}")
# Scale proportionally
delta_pos = delta_pos / max_normalized
delta_rvec = delta_rvec / max_normalized
new_action = {}
new_action["enabled"] = True
new_action["target_x"] = float(delta_pos[0])
new_action["target_y"] = float(delta_pos[1])
new_action["target_z"] = float(delta_pos[2])
new_action["target_wx"] = float(delta_rvec[0])
new_action["target_wy"] = float(delta_rvec[1])
new_action["target_wz"] = float(delta_rvec[2])
new_action["gripper_vel"] = float(
np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos
)
return new_action
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# TODO: implement feature transformation
return features
FPS = 20
# Initialize the robot and teleoperator config
follower_config = SO101FollowerConfig(port="/dev/usb_follower_arm_a", id="follower_arm_a", use_degrees=True)
leader_config = SO101LeaderConfig(port="/dev/usb_leader_arm_a", id="leader_arm_a", use_degrees=True)
# Initialize the robot and teleoperator
follower = SO101Follower(follower_config)
leader = SO101Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
end_effector_step_sizes = {
"x": 0.004,
"y": 0.004,
"z": 0.004,
"wx": 5 * np.pi / 180,
"wy": 5 * np.pi / 180,
"wz": 5 * np.pi / 180,
}
# Build pipeline to convert teleop joints to EE action
leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
LogRobotAction(),
ForwardKinematicsJointsToEETargetAction(
kinematics=leader_kinematics_solver,
motor_names=list(leader.bus.motors.keys()),
end_effector_step_sizes=end_effector_step_sizes,
max_gripper_pos=30.0,
use_ik_solution=True,
),
LogRobotAction(),
],
to_transition=identity_transition,
to_output=identity_transition,
)
# build pipeline to convert EE action to robot joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
LogRobotAction(),
EEReferenceAndDelta(
kinematics=follower_kinematics_solver,
# end_effector_step_sizes={"x": 0.006, "y": 0.01, "z": 0.005},
end_effector_step_sizes=end_effector_step_sizes,
motor_names=list(follower.bus.motors.keys()),
use_latched_reference=False,
use_ik_solution=True,
),
LogRobotAction(),
EEBoundsAndSafety(
end_effector_bounds={
"min": [-0.05, -0.55, -0.0075],
"max": [0.55, 0.55, 0.55],
},
# end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.05,
),
LogRobotAction(),
GripperVelocityToJoint(
clip_max=30.0,
speed_factor=0.2,
discrete_gripper=False,
scale_velocity=True,
use_ik_solution=True,
),
LogRobotAction(),
InverseKinematicsRLStep(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=False,
),
LogRobotAction(),
],
to_transition=identity_transition,
to_output=identity_transition,
)
# Connect to the robot and teleoperator
follower.connect()
leader.connect()
reset_pose = [0.0, 10, 20, 60.00, 90.00, 10.00]
start_time = time.perf_counter()
reset_follower_position(follower, np.array(reset_pose))
reset_follower_position(leader, np.array(reset_pose))
precise_sleep(5.0 - (time.perf_counter() - start_time))
# time.sleep(10)
leader.bus.sync_write("Torque_Enable", 0)
# Init rerun viewer
# init_rerun(session_name="so100_so100_EE_teleop")
transition = None
print("Starting teleop loop...")
while True:
print("New loop iteration")
t0 = time.perf_counter()
# Get robot observation
robot_obs = follower.get_observation()
# Get teleop observation
leader_joints_obs = leader.get_action()
# teleop joints -> teleop EE action
if transition is None:
transition = create_transition(action=leader_joints_obs, observation=robot_obs)
else:
transition = create_transition(
action=leader_joints_obs,
observation=robot_obs,
complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA),
)
transition = leader_to_ee(transition)
leader_ee_act = transition[TransitionKey.ACTION]
# teleop EE -> robot joints
transition = create_transition(
action=leader_ee_act,
observation=robot_obs,
complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA),
)
transition = ee_to_follower_joints(transition)
follower_joints_act = transition[TransitionKey.ACTION]
# Send action to robot
_ = follower.send_action(follower_joints_act)
# Visualize
# log_rerun_data(observation=leader_ee_act, action=follower_joints_act)
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
+25 -22
View File
@@ -4,13 +4,13 @@ from pathlib import Path
from queue import Empty, Full
import torch
import torch.optim as optim
from lerobot.datasets import LeRobotDataset
from lerobot.envs.configs import HILSerlProcessorConfig, HILSerlRobotEnvConfig
from lerobot.policies import SACConfig
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies import GaussianActorConfig
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
from lerobot.rl.algorithms.sac import SACAlgorithm, SACAlgorithmConfig
from lerobot.rl.buffer import ReplayBuffer
from lerobot.rl.gym_manipulator import make_robot_env
from lerobot.robots.so_follower import SO100FollowerConfig
@@ -28,7 +28,7 @@ def run_learner(
transitions_queue: mp.Queue,
parameters_queue: mp.Queue,
shutdown_event: mp.Event,
policy_learner: SACPolicy,
policy_learner: GaussianActorPolicy,
online_buffer: ReplayBuffer,
offline_buffer: ReplayBuffer,
lr: float = 3e-4,
@@ -40,8 +40,9 @@ def run_learner(
policy_learner.train()
policy_learner.to(device)
# Create Adam optimizer from scratch - simple and clean
optimizer = optim.Adam(policy_learner.parameters(), lr=lr)
algo_config = SACAlgorithmConfig.from_policy_config(policy_learner.config)
algorithm = SACAlgorithm(policy=policy_learner, config=algo_config)
algorithm.make_optimizers_and_scheduler()
print(f"[LEARNER] Online buffer capacity: {online_buffer.capacity}")
print(f"[LEARNER] Offline buffer capacity: {offline_buffer.capacity}")
@@ -83,24 +84,26 @@ def run_learner(
else:
batch[key] = online_batch[key]
loss, _ = policy_learner.forward(batch)
def batch_iter(b=batch):
while True:
yield b
optimizer.zero_grad()
loss.backward()
optimizer.step()
stats = algorithm.update(batch_iter())
training_step += 1
if training_step % LOG_EVERY == 0:
log_dict = stats.to_log_dict()
print(
f"[LEARNER] Training step {training_step}, Loss: {loss.item():.4f}, "
f"[LEARNER] Training step {training_step}, "
f"critic_loss: {log_dict.get('critic', 'N/A'):.4f}, "
f"Buffers: Online={len(online_buffer)}, Offline={len(offline_buffer)}"
)
# Send updated parameters to actor every 10 training steps
if training_step % SEND_EVERY == 0:
try:
state_dict = {k: v.cpu() for k, v in policy_learner.state_dict().items()}
parameters_queue.put_nowait(state_dict)
weights = algorithm.get_weights()
parameters_queue.put_nowait(weights)
print("[LEARNER] Sent updated parameters to actor")
except Full:
# Missing write due to queue not being consumed (should happen rarely)
@@ -113,7 +116,7 @@ def run_actor(
transitions_queue: mp.Queue,
parameters_queue: mp.Queue,
shutdown_event: mp.Event,
policy_actor: SACPolicy,
policy_actor: GaussianActorPolicy,
reward_classifier: Classifier,
env_cfg: HILSerlRobotEnvConfig,
device: torch.device = "mps",
@@ -144,15 +147,15 @@ def run_actor(
while step < MAX_STEPS_PER_EPISODE and not shutdown_event.is_set():
try:
new_params = parameters_queue.get_nowait()
policy_actor.load_state_dict(new_params)
new_weights = parameters_queue.get_nowait()
policy_actor.load_state_dict(new_weights)
print("[ACTOR] Updated policy parameters from learner")
except Empty: # No new updated parameters available from learner, waiting
pass
# Get action from policy
# Get action from policy (returns full action: continuous + discrete)
policy_obs = make_policy_obs(obs, device=device)
action_tensor = policy_actor.select_action(policy_obs) # predicts a single action
action_tensor = policy_actor.select_action(policy_obs)
action = action_tensor.squeeze(0).cpu().numpy()
# Step environment
@@ -261,14 +264,14 @@ def main():
action_features = hw_to_dataset_features(env.robot.action_features, "action")
# Create SAC policy for action selection
policy_cfg = SACConfig(
policy_cfg = GaussianActorConfig(
device=device,
input_features=obs_features,
output_features=action_features,
)
policy_actor = SACPolicy(policy_cfg)
policy_learner = SACPolicy(policy_cfg)
policy_actor = GaussianActorPolicy(policy_cfg)
policy_learner = GaussianActorPolicy(policy_cfg)
demonstrations_repo_id = "lerobot/example_hil_serl_dataset"
offline_dataset = LeRobotDataset(repo_id=demonstrations_repo_id)
+1
View File
@@ -99,6 +99,7 @@ def save_checkpoint(
optimizer (Optimizer | None, optional): The optimizer to save the state from. Defaults to None.
scheduler (LRScheduler | None, optional): The scheduler to save the state from. Defaults to None.
preprocessor: The preprocessor/pipeline to save. Defaults to None.
postprocessor: The postprocessor/pipeline to save. Defaults to None.
"""
pretrained_dir = checkpoint_dir / PRETRAINED_MODEL_DIR
policy.save_pretrained(pretrained_dir)
-7
View File
@@ -209,10 +209,3 @@ class TrainPipelineConfig(HubMixin):
cli_args = kwargs.pop("cli_args", [])
with draccus.config_type("json"):
return draccus.parse(cls, config_file, args=cli_args)
@dataclass(kw_only=True)
class TrainRLServerPipelineConfig(TrainPipelineConfig):
# NOTE: In RL, we don't need an offline dataset
# TODO: Make `TrainPipelineConfig.dataset` optional
dataset: DatasetConfig | None = None # type: ignore[assignment] # because the parent class has made it's type non-optional
+1
View File
@@ -299,6 +299,7 @@ class HILSerlProcessorConfig:
inverse_kinematics: InverseKinematicsConfig | None = None
reward_classifier: RewardClassifierConfig | None = None
max_gripper_pos: float | None = 100.0
gripper_speed_factor: float | None = None
@EnvConfig.register_subclass(name="gym_manipulator")
+7 -5
View File
@@ -17,14 +17,16 @@ from lerobot.utils.action_interpolator import ActionInterpolator as ActionInterp
from .act.configuration_act import ACTConfig as ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig as DiffusionConfig
from .factory import get_policy_class, make_policy, make_policy_config, make_pre_post_processors
from .gaussian_actor.configuration_gaussian_actor import GaussianActorConfig as GaussianActorConfig
from .gaussian_actor.reward_model.configuration_classifier import (
RewardClassifierConfig as RewardClassifierConfig,
)
from .groot.configuration_groot import GrootConfig as GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig as MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config as PI0Config
from .pi0_fast.configuration_pi0_fast import PI0FastConfig as PI0FastConfig
from .pi05.configuration_pi05 import PI05Config as PI05Config
from .pretrained import PreTrainedPolicy as PreTrainedPolicy
from .sac.configuration_sac import SACConfig as SACConfig
from .sac.reward_model.configuration_classifier import RewardClassifierConfig as RewardClassifierConfig
from .sarm.configuration_sarm import SARMConfig as SARMConfig
from .smolvla.configuration_smolvla import SmolVLAConfig as SmolVLAConfig
from .tdmpc.configuration_tdmpc import TDMPCConfig as TDMPCConfig
@@ -33,21 +35,21 @@ from .vqbet.configuration_vqbet import VQBeTConfig as VQBeTConfig
from .wall_x.configuration_wall_x import WallXConfig as WallXConfig
from .xvla.configuration_xvla import XVLAConfig as XVLAConfig
# NOTE: Policy modeling classes (e.g., SACPolicy) are intentionally NOT re-exported here.
# NOTE: Policy modeling classes (e.g., GaussianActorPolicy) are intentionally NOT re-exported here.
# They have heavy optional dependencies and are loaded lazily via get_policy_class().
# Import directly: ``from lerobot.policies.sac.modeling_sac import SACPolicy``
# Import directly: ``from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy``
__all__ = [
# Configuration classes
"ACTConfig",
"DiffusionConfig",
"GaussianActorConfig",
"GrootConfig",
"MultiTaskDiTConfig",
"PI0Config",
"PI0FastConfig",
"PI05Config",
"RewardClassifierConfig",
"SACConfig",
"SARMConfig",
"SmolVLAConfig",
"TDMPCConfig",
+14 -14
View File
@@ -46,13 +46,13 @@ from lerobot.utils.feature_utils import dataset_to_policy_features
from .act.configuration_act import ACTConfig
from .diffusion.configuration_diffusion import DiffusionConfig
from .gaussian_actor.configuration_gaussian_actor import GaussianActorConfig
from .gaussian_actor.reward_model.configuration_classifier import RewardClassifierConfig
from .groot.configuration_groot import GrootConfig
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config
from .pi05.configuration_pi05 import PI05Config
from .pretrained import PreTrainedPolicy
from .sac.configuration_sac import SACConfig
from .sac.reward_model.configuration_classifier import RewardClassifierConfig
from .sarm.configuration_sarm import SARMConfig
from .smolvla.configuration_smolvla import SmolVLAConfig
from .tdmpc.configuration_tdmpc import TDMPCConfig
@@ -89,7 +89,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"multi_task_dit", "vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla", "wall_x".
"multi_task_dit", "vqbet", "pi0", "pi05", "gaussian_actor", "reward_classifier", "smolvla", "wall_x".
Returns:
The policy class corresponding to the given name.
@@ -128,12 +128,12 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from .pi05.modeling_pi05 import PI05Policy
return PI05Policy
elif name == "sac":
from .sac.modeling_sac import SACPolicy
elif name == "gaussian_actor":
from .gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy
return SACPolicy
return GaussianActorPolicy
elif name == "reward_classifier":
from .sac.reward_model.modeling_classifier import Classifier
from .gaussian_actor.reward_model.modeling_classifier import Classifier
return Classifier
elif name == "smolvla":
@@ -172,7 +172,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"multi_task_dit", "diffusion", "act", "vqbet", "pi0", "pi05", "sac",
"multi_task_dit", "diffusion", "act", "vqbet", "pi0", "pi05", "gaussian_actor",
"smolvla", "reward_classifier", "wall_x".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
@@ -196,8 +196,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return PI0Config(**kwargs)
elif policy_type == "pi05":
return PI05Config(**kwargs)
elif policy_type == "sac":
return SACConfig(**kwargs)
elif policy_type == "gaussian_actor":
return GaussianActorConfig(**kwargs)
elif policy_type == "smolvla":
return SmolVLAConfig(**kwargs)
elif policy_type == "reward_classifier":
@@ -370,16 +370,16 @@ def make_pre_post_processors(
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, SACConfig):
from .sac.processor_sac import make_sac_pre_post_processors
elif isinstance(policy_cfg, GaussianActorConfig):
from .gaussian_actor.processor_gaussian_actor import make_gaussian_actor_pre_post_processors
processors = make_sac_pre_post_processors(
processors = make_gaussian_actor_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, RewardClassifierConfig):
from .sac.reward_model.processor_classifier import make_classifier_processor
from .gaussian_actor.reward_model.processor_classifier import make_classifier_processor
processors = make_classifier_processor(
config=policy_cfg,
@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_sac import SACConfig
from .modeling_sac import SACPolicy
from .processor_sac import make_sac_pre_post_processors
from .configuration_gaussian_actor import GaussianActorConfig
from .modeling_gaussian_actor import GaussianActorPolicy
from .processor_gaussian_actor import make_gaussian_actor_pre_post_processors
__all__ = ["SACConfig", "SACPolicy", "make_sac_pre_post_processors"]
__all__ = ["GaussianActorConfig", "GaussianActorPolicy", "make_gaussian_actor_pre_post_processors"]
@@ -75,18 +75,19 @@ class PolicyConfig:
init_final: float = 0.05
@PreTrainedConfig.register_subclass("sac")
@PreTrainedConfig.register_subclass("gaussian_actor")
@dataclass
class SACConfig(PreTrainedConfig):
"""Soft Actor-Critic (SAC) configuration.
class GaussianActorConfig(PreTrainedConfig):
"""Gaussian actor configuration.
SAC is an off-policy actor-critic deep RL algorithm based on the maximum entropy
reinforcement learning framework. It learns a policy and a Q-function simultaneously
using experience collected from the environment.
This configures the policy-side (actor + observation encoder) of a Gaussian
policy, as used by SAC and related maximum-entropy continuous-control algorithms.
By default the actor output is a tanh-squashed diagonal Gaussian
(``TanhMultivariateNormalDiag``); the tanh squashing can be disabled via
``policy_kwargs.use_tanh_squash``. The critics, temperature, and Bellman-update
logic live on the algorithm side (see ``lerobot.rl.algorithms.sac``).
This configuration class contains all the parameters needed to define a SAC agent,
including network architectures, optimization settings, and algorithm-specific
hyperparameters.
CLI: ``--policy.type=gaussian_actor``.
"""
# Mapping of feature types to normalization modes
@@ -122,7 +123,7 @@ class SACConfig(PreTrainedConfig):
device: str = "cpu"
# Device to store the model on
storage_device: str = "cpu"
# Name of the vision encoder model (Set to "helper2424/resnet10" for hil serl resnet10)
# Name of the vision encoder model (Set to "lerobot/resnet10" for hil serl resnet10)
vision_encoder_name: str | None = None
# Whether to freeze the vision encoder during training
freeze_vision_encoder: bool = True
@@ -135,78 +136,41 @@ class SACConfig(PreTrainedConfig):
# Dimension of the image embedding pooling
image_embedding_pooling_dim: int = 8
# Training parameter
# Number of steps for online training
online_steps: int = 1000000
# Capacity of the online replay buffer
online_buffer_capacity: int = 100000
# Capacity of the offline replay buffer
offline_buffer_capacity: int = 100000
# Whether to use asynchronous prefetching for the buffers
async_prefetch: bool = False
# Number of steps before learning starts
online_step_before_learning: int = 100
# Frequency of policy updates
policy_update_freq: int = 1
# SAC algorithm parameters
# Discount factor for the SAC algorithm
discount: float = 0.99
# Initial temperature value
temperature_init: float = 1.0
# Number of critics in the ensemble
num_critics: int = 2
# Number of subsampled critics for training
num_subsample_critics: int | None = None
# Learning rate for the critic network
critic_lr: float = 3e-4
# Learning rate for the actor network
actor_lr: float = 3e-4
# Learning rate for the temperature parameter
temperature_lr: float = 3e-4
# Weight for the critic target update
critic_target_update_weight: float = 0.005
# Update-to-data ratio for the UTD algorithm (If you want enable utd_ratio, you need to set it to >1)
utd_ratio: int = 1
# Encoder architecture
# Hidden dimension size for the state encoder
state_encoder_hidden_dim: int = 256
# Dimension of the latent space
latent_dim: int = 256
# Target entropy for the SAC algorithm
target_entropy: float | None = None
# Whether to use backup entropy for the SAC algorithm
use_backup_entropy: bool = True
# Gradient clipping norm for the SAC algorithm
grad_clip_norm: float = 40.0
# Network configuration
# Configuration for the critic network architecture
critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for the actor network architecture
actor_network_kwargs: ActorNetworkConfig = field(default_factory=ActorNetworkConfig)
# Configuration for the policy parameters
policy_kwargs: PolicyConfig = field(default_factory=PolicyConfig)
# Configuration for the discrete critic network
discrete_critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for actor-learner architecture
# Online training (TODO(Khalil): relocate to TrainRLServerPipelineConfig)
online_steps: int = 1000000
online_buffer_capacity: int = 100000
offline_buffer_capacity: int = 100000
async_prefetch: bool = False
online_step_before_learning: int = 100
# Actor-learner transport (TODO(Khalil): relocate to TrainRLServerPipelineConfig).
actor_learner_config: ActorLearnerConfig = field(default_factory=ActorLearnerConfig)
# Configuration for concurrency settings (you can use threads or processes for the actor and learner)
concurrency: ConcurrencyConfig = field(default_factory=ConcurrencyConfig)
# Optimizations
use_torch_compile: bool = True
# Network architecture
# Actor network
actor_network_kwargs: ActorNetworkConfig = field(default_factory=ActorNetworkConfig)
# Gaussian head parameters
policy_kwargs: PolicyConfig = field(default_factory=PolicyConfig)
# Discrete critic
discrete_critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
def __post_init__(self):
super().__post_init__()
# Any validation specific to SAC configuration
def get_optimizer_preset(self) -> MultiAdamConfig:
return MultiAdamConfig(
weight_decay=0.0,
optimizer_groups={
"actor": {"lr": self.actor_lr},
"critic": {"lr": self.critic_lr},
"temperature": {"lr": self.temperature_lr},
"actor": {"lr": 3e-4},
"critic": {"lr": 3e-4},
"temperature": {"lr": 3e-4},
},
)
@@ -15,16 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from collections.abc import Callable
from dataclasses import asdict
from typing import Literal
from typing import Any
import einops
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F # noqa: N812
from torch import Tensor
from torch.distributions import MultivariateNormal, TanhTransform, Transform, TransformedDistribution
@@ -32,20 +28,20 @@ from lerobot.utils.constants import ACTION, OBS_ENV_STATE, OBS_STATE
from ..pretrained import PreTrainedPolicy
from ..utils import get_device_from_parameters
from .configuration_sac import SACConfig, is_image_feature
from .configuration_gaussian_actor import GaussianActorConfig, is_image_feature
DISCRETE_DIMENSION_INDEX = -1 # Gripper is always the last dimension
class SACPolicy(
class GaussianActorPolicy(
PreTrainedPolicy,
):
config_class = SACConfig
name = "sac"
config_class = GaussianActorConfig
name = "gaussian_actor"
def __init__(
self,
config: SACConfig | None = None,
config: GaussianActorConfig | None = None,
):
super().__init__(config)
config.validate_features()
@@ -54,9 +50,8 @@ class SACPolicy(
# Determine action dimension and initialize all components
continuous_action_dim = config.output_features[ACTION].shape[0]
self._init_encoders()
self._init_critics(continuous_action_dim)
self._init_actor(continuous_action_dim)
self._init_temperature()
self._init_discrete_critic()
def get_optim_params(self) -> dict:
optim_params = {
@@ -65,11 +60,7 @@ class SACPolicy(
for n, p in self.actor.named_parameters()
if not n.startswith("encoder") or not self.shared_encoder
],
"critic": self.critic_ensemble.parameters(),
"temperature": self.log_alpha,
}
if self.config.num_discrete_actions is not None:
optim_params["discrete_critic"] = self.discrete_critic.parameters()
return optim_params
def reset(self):
@@ -79,7 +70,9 @@ class SACPolicy(
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""Predict a chunk of actions given environment observations."""
raise NotImplementedError("SACPolicy does not support action chunking. It returns single actions!")
raise NotImplementedError(
"GaussianActorPolicy does not support action chunking. It returns single actions!"
)
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
@@ -92,360 +85,55 @@ class SACPolicy(
actions, _, _ = self.actor(batch, observations_features)
if self.config.num_discrete_actions is not None:
discrete_action_value = self.discrete_critic(batch, observations_features)
discrete_action = torch.argmax(discrete_action_value, dim=-1, keepdim=True)
if self.discrete_critic is not None:
discrete_action_value = self.discrete_critic(batch, observations_features)
discrete_action = torch.argmax(discrete_action_value, dim=-1, keepdim=True)
else:
discrete_action = torch.ones(
(*actions.shape[:-1], 1), device=actions.device, dtype=actions.dtype
)
actions = torch.cat([actions, discrete_action], dim=-1)
return actions
def critic_forward(
self,
observations: dict[str, Tensor],
actions: Tensor,
use_target: bool = False,
observation_features: Tensor | None = None,
) -> Tensor:
"""Forward pass through a critic network ensemble
def forward(self, batch: dict[str, Tensor | dict[str, Tensor]]) -> dict[str, Tensor]:
"""Actor forward pass: sample actions and return log-probabilities.
Args:
observations: Dictionary of observations
actions: Action tensor
use_target: If True, use target critics, otherwise use ensemble critics
batch: A flat observation dict, or a training dict containing
``"state"`` (observations) and optionally ``"observation_feature"``
(pre-computed encoder features).
Returns:
Tensor of Q-values from all critics
Dict with ``"action"``, ``"log_prob"``, and ``"action_mean"`` tensors.
"""
observations = batch.get("state", batch)
observation_features = batch.get("observation_feature") if isinstance(batch, dict) else None
actions, log_probs, means = self.actor(observations, observation_features)
return {"action": actions, "log_prob": log_probs, "action_mean": means}
critics = self.critic_target if use_target else self.critic_ensemble
q_values = critics(observations, actions, observation_features)
return q_values
def load_actor_weights(self, state_dicts: dict[str, Any], device: str | torch.device = "cpu") -> None:
from lerobot.utils.transition import move_state_dict_to_device
def discrete_critic_forward(
self, observations, use_target=False, observation_features=None
) -> torch.Tensor:
"""Forward pass through a discrete critic network
actor_state_dict = move_state_dict_to_device(state_dicts["policy"], device=device)
self.actor.load_state_dict(actor_state_dict)
Args:
observations: Dictionary of observations
use_target: If True, use target critics, otherwise use ensemble critics
observation_features: Optional pre-computed observation features to avoid recomputing encoder output
Returns:
Tensor of Q-values from the discrete critic network
"""
discrete_critic = self.discrete_critic_target if use_target else self.discrete_critic
q_values = discrete_critic(observations, observation_features)
return q_values
def forward(
self,
batch: dict[str, Tensor | dict[str, Tensor]],
model: Literal["actor", "critic", "temperature", "discrete_critic"] = "critic",
) -> dict[str, Tensor]:
"""Compute the loss for the given model
Args:
batch: Dictionary containing:
- action: Action tensor
- reward: Reward tensor
- state: Observations tensor dict
- next_state: Next observations tensor dict
- done: Done mask tensor
- observation_feature: Optional pre-computed observation features
- next_observation_feature: Optional pre-computed next observation features
model: Which model to compute the loss for ("actor", "critic", "discrete_critic", or "temperature")
Returns:
The computed loss tensor
"""
# Extract common components from batch
actions: Tensor = batch[ACTION]
observations: dict[str, Tensor] = batch["state"]
observation_features: Tensor = batch.get("observation_feature")
if model == "critic":
# Extract critic-specific components
rewards: Tensor = batch["reward"]
next_observations: dict[str, Tensor] = batch["next_state"]
done: Tensor = batch["done"]
next_observation_features: Tensor = batch.get("next_observation_feature")
loss_critic = self.compute_loss_critic(
observations=observations,
actions=actions,
rewards=rewards,
next_observations=next_observations,
done=done,
observation_features=observation_features,
next_observation_features=next_observation_features,
if "discrete_critic" in state_dicts and self.discrete_critic is not None:
discrete_critic_state_dict = move_state_dict_to_device(
state_dicts["discrete_critic"], device=device
)
return {"loss_critic": loss_critic}
if model == "discrete_critic" and self.config.num_discrete_actions is not None:
# Extract critic-specific components
rewards: Tensor = batch["reward"]
next_observations: dict[str, Tensor] = batch["next_state"]
done: Tensor = batch["done"]
next_observation_features: Tensor = batch.get("next_observation_feature")
complementary_info = batch.get("complementary_info")
loss_discrete_critic = self.compute_loss_discrete_critic(
observations=observations,
actions=actions,
rewards=rewards,
next_observations=next_observations,
done=done,
observation_features=observation_features,
next_observation_features=next_observation_features,
complementary_info=complementary_info,
)
return {"loss_discrete_critic": loss_discrete_critic}
if model == "actor":
return {
"loss_actor": self.compute_loss_actor(
observations=observations,
observation_features=observation_features,
)
}
if model == "temperature":
return {
"loss_temperature": self.compute_loss_temperature(
observations=observations,
observation_features=observation_features,
)
}
raise ValueError(f"Unknown model type: {model}")
def update_target_networks(self):
"""Update target networks with exponential moving average"""
for target_param, param in zip(
self.critic_target.parameters(),
self.critic_ensemble.parameters(),
strict=True,
):
target_param.data.copy_(
param.data * self.config.critic_target_update_weight
+ target_param.data * (1.0 - self.config.critic_target_update_weight)
)
if self.config.num_discrete_actions is not None:
for target_param, param in zip(
self.discrete_critic_target.parameters(),
self.discrete_critic.parameters(),
strict=True,
):
target_param.data.copy_(
param.data * self.config.critic_target_update_weight
+ target_param.data * (1.0 - self.config.critic_target_update_weight)
)
@property
def temperature(self) -> float:
"""Return the current temperature value, always in sync with log_alpha."""
return self.log_alpha.exp().item()
def compute_loss_critic(
self,
observations,
actions,
rewards,
next_observations,
done,
observation_features: Tensor | None = None,
next_observation_features: Tensor | None = None,
) -> Tensor:
with torch.no_grad():
next_action_preds, next_log_probs, _ = self.actor(next_observations, next_observation_features)
# 2- compute q targets
q_targets = self.critic_forward(
observations=next_observations,
actions=next_action_preds,
use_target=True,
observation_features=next_observation_features,
)
# subsample critics to prevent overfitting if use high UTD (update to date)
# TODO: Get indices before forward pass to avoid unnecessary computation
if self.config.num_subsample_critics is not None:
indices = torch.randperm(self.config.num_critics)
indices = indices[: self.config.num_subsample_critics]
q_targets = q_targets[indices]
# critics subsample size
min_q, _ = q_targets.min(dim=0) # Get values from min operation
if self.config.use_backup_entropy:
min_q = min_q - (self.temperature * next_log_probs)
td_target = rewards + (1 - done) * self.config.discount * min_q
# 3- compute predicted qs
if self.config.num_discrete_actions is not None:
# NOTE: We only want to keep the continuous action part
# In the buffer we have the full action space (continuous + discrete)
# We need to split them before concatenating them in the critic forward
actions: Tensor = actions[:, :DISCRETE_DIMENSION_INDEX]
q_preds = self.critic_forward(
observations=observations,
actions=actions,
use_target=False,
observation_features=observation_features,
)
# 4- Calculate loss
# Compute state-action value loss (TD loss) for all of the Q functions in the ensemble.
td_target_duplicate = einops.repeat(td_target, "b -> e b", e=q_preds.shape[0])
# You compute the mean loss of the batch for each critic and then to compute the final loss you sum them up
critics_loss = (
F.mse_loss(
input=q_preds,
target=td_target_duplicate,
reduction="none",
).mean(dim=1)
).sum()
return critics_loss
def compute_loss_discrete_critic(
self,
observations,
actions,
rewards,
next_observations,
done,
observation_features=None,
next_observation_features=None,
complementary_info=None,
):
# NOTE: We only want to keep the discrete action part
# In the buffer we have the full action space (continuous + discrete)
# We need to split them before concatenating them in the critic forward
actions_discrete: Tensor = actions[:, DISCRETE_DIMENSION_INDEX:].clone()
actions_discrete = torch.round(actions_discrete)
actions_discrete = actions_discrete.long()
discrete_penalties: Tensor | None = None
if complementary_info is not None:
discrete_penalties: Tensor | None = complementary_info.get("discrete_penalty")
with torch.no_grad():
# For DQN, select actions using online network, evaluate with target network
next_discrete_qs = self.discrete_critic_forward(
next_observations, use_target=False, observation_features=next_observation_features
)
best_next_discrete_action = torch.argmax(next_discrete_qs, dim=-1, keepdim=True)
# Get target Q-values from target network
target_next_discrete_qs = self.discrete_critic_forward(
observations=next_observations,
use_target=True,
observation_features=next_observation_features,
)
# Use gather to select Q-values for best actions
target_next_discrete_q = torch.gather(
target_next_discrete_qs, dim=1, index=best_next_discrete_action
).squeeze(-1)
# Compute target Q-value with Bellman equation
rewards_discrete = rewards
if discrete_penalties is not None:
rewards_discrete = rewards + discrete_penalties
target_discrete_q = rewards_discrete + (1 - done) * self.config.discount * target_next_discrete_q
# Get predicted Q-values for current observations
predicted_discrete_qs = self.discrete_critic_forward(
observations=observations, use_target=False, observation_features=observation_features
)
# Use gather to select Q-values for taken actions
predicted_discrete_q = torch.gather(predicted_discrete_qs, dim=1, index=actions_discrete).squeeze(-1)
# Compute MSE loss between predicted and target Q-values
discrete_critic_loss = F.mse_loss(input=predicted_discrete_q, target=target_discrete_q)
return discrete_critic_loss
def compute_loss_temperature(self, observations, observation_features: Tensor | None = None) -> Tensor:
"""Compute the temperature loss"""
# calculate temperature loss
with torch.no_grad():
_, log_probs, _ = self.actor(observations, observation_features)
temperature_loss = (-self.log_alpha.exp() * (log_probs + self.target_entropy)).mean()
return temperature_loss
def compute_loss_actor(
self,
observations,
observation_features: Tensor | None = None,
) -> Tensor:
actions_pi, log_probs, _ = self.actor(observations, observation_features)
q_preds = self.critic_forward(
observations=observations,
actions=actions_pi,
use_target=False,
observation_features=observation_features,
)
min_q_preds = q_preds.min(dim=0)[0]
actor_loss = ((self.temperature * log_probs) - min_q_preds).mean()
return actor_loss
self.discrete_critic.load_state_dict(discrete_critic_state_dict)
def _init_encoders(self):
"""Initialize shared or separate encoders for actor and critic."""
self.shared_encoder = self.config.shared_encoder
self.encoder_critic = SACObservationEncoder(self.config)
self.encoder_critic = GaussianActorObservationEncoder(self.config)
self.encoder_actor = (
self.encoder_critic if self.shared_encoder else SACObservationEncoder(self.config)
self.encoder_critic if self.shared_encoder else GaussianActorObservationEncoder(self.config)
)
def _init_critics(self, continuous_action_dim):
"""Build critic ensemble, targets, and optional discrete critic."""
heads = [
CriticHead(
input_dim=self.encoder_critic.output_dim + continuous_action_dim,
**asdict(self.config.critic_network_kwargs),
)
for _ in range(self.config.num_critics)
]
self.critic_ensemble = CriticEnsemble(encoder=self.encoder_critic, ensemble=heads)
target_heads = [
CriticHead(
input_dim=self.encoder_critic.output_dim + continuous_action_dim,
**asdict(self.config.critic_network_kwargs),
)
for _ in range(self.config.num_critics)
]
self.critic_target = CriticEnsemble(encoder=self.encoder_critic, ensemble=target_heads)
self.critic_target.load_state_dict(self.critic_ensemble.state_dict())
if self.config.use_torch_compile:
self.critic_ensemble = torch.compile(self.critic_ensemble)
self.critic_target = torch.compile(self.critic_target)
if self.config.num_discrete_actions is not None:
self._init_discrete_critics()
def _init_discrete_critics(self):
"""Build discrete discrete critic ensemble and target networks."""
self.discrete_critic = DiscreteCritic(
encoder=self.encoder_critic,
input_dim=self.encoder_critic.output_dim,
output_dim=self.config.num_discrete_actions,
**asdict(self.config.discrete_critic_network_kwargs),
)
self.discrete_critic_target = DiscreteCritic(
encoder=self.encoder_critic,
input_dim=self.encoder_critic.output_dim,
output_dim=self.config.num_discrete_actions,
**asdict(self.config.discrete_critic_network_kwargs),
)
# TODO: (maractingi, azouitine) Compile the discrete critic
self.discrete_critic_target.load_state_dict(self.discrete_critic.state_dict())
def _init_actor(self, continuous_action_dim):
"""Initialize policy actor network and default target entropy."""
"""Initialize policy actor network."""
# NOTE: The actor select only the continuous action part
self.actor = Policy(
encoder=self.encoder_actor,
@@ -455,21 +143,25 @@ class SACPolicy(
**asdict(self.config.policy_kwargs),
)
self.target_entropy = self.config.target_entropy
if self.target_entropy is None:
dim = continuous_action_dim + (1 if self.config.num_discrete_actions is not None else 0)
self.target_entropy = -np.prod(dim) / 2
def _init_discrete_critic(self) -> None:
"""Initialize discrete critic network."""
if self.config.num_discrete_actions is None:
self.discrete_critic = None
return
def _init_temperature(self) -> None:
"""Set up temperature parameter (log_alpha)."""
temp_init = self.config.temperature_init
self.log_alpha = nn.Parameter(torch.tensor([math.log(temp_init)]))
# TODO(Khalil): Compile the discrete critic
self.discrete_critic = DiscreteCritic(
encoder=self.encoder_critic,
input_dim=self.encoder_critic.output_dim,
output_dim=self.config.num_discrete_actions,
**asdict(self.config.discrete_critic_network_kwargs),
)
class SACObservationEncoder(nn.Module):
class GaussianActorObservationEncoder(nn.Module):
"""Encode image and/or state vector observations."""
def __init__(self, config: SACConfig) -> None:
def __init__(self, config: GaussianActorConfig) -> None:
super().__init__()
self.config = config
self._init_image_layers()
@@ -677,84 +369,6 @@ class MLP(nn.Module):
return self.net(x)
class CriticHead(nn.Module):
def __init__(
self,
input_dim: int,
hidden_dims: list[int],
activations: Callable[[torch.Tensor], torch.Tensor] | str = nn.SiLU(),
activate_final: bool = False,
dropout_rate: float | None = None,
init_final: float | None = None,
final_activation: Callable[[torch.Tensor], torch.Tensor] | str | None = None,
):
super().__init__()
self.net = MLP(
input_dim=input_dim,
hidden_dims=hidden_dims,
activations=activations,
activate_final=activate_final,
dropout_rate=dropout_rate,
final_activation=final_activation,
)
self.output_layer = nn.Linear(in_features=hidden_dims[-1], out_features=1)
if init_final is not None:
nn.init.uniform_(self.output_layer.weight, -init_final, init_final)
nn.init.uniform_(self.output_layer.bias, -init_final, init_final)
else:
orthogonal_init()(self.output_layer.weight)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.output_layer(self.net(x))
class CriticEnsemble(nn.Module):
"""
CriticEnsemble wraps multiple CriticHead modules into an ensemble.
Args:
encoder (SACObservationEncoder): encoder for observations.
ensemble (List[CriticHead]): list of critic heads.
init_final (float | None): optional initializer scale for final layers.
Forward returns a tensor of shape (num_critics, batch_size) containing Q-values.
"""
def __init__(
self,
encoder: SACObservationEncoder,
ensemble: list[CriticHead],
init_final: float | None = None,
):
super().__init__()
self.encoder = encoder
self.init_final = init_final
self.critics = nn.ModuleList(ensemble)
def forward(
self,
observations: dict[str, torch.Tensor],
actions: torch.Tensor,
observation_features: torch.Tensor | None = None,
) -> torch.Tensor:
device = get_device_from_parameters(self)
# Move each tensor in observations to device
observations = {k: v.to(device) for k, v in observations.items()}
obs_enc = self.encoder(observations, cache=observation_features)
inputs = torch.cat([obs_enc, actions], dim=-1)
# Loop through critics and collect outputs
q_values = []
for critic in self.critics:
q_values.append(critic(inputs))
# Stack outputs to match expected shape [num_critics, batch_size]
q_values = torch.stack([q.squeeze(-1) for q in q_values], dim=0)
return q_values
class DiscreteCritic(nn.Module):
def __init__(
self,
@@ -800,7 +414,7 @@ class DiscreteCritic(nn.Module):
class Policy(nn.Module):
def __init__(
self,
encoder: SACObservationEncoder,
encoder: GaussianActorObservationEncoder,
network: nn.Module,
action_dim: int,
std_min: float = -5,
@@ -811,7 +425,7 @@ class Policy(nn.Module):
encoder_is_shared: bool = False,
):
super().__init__()
self.encoder: SACObservationEncoder = encoder
self.encoder: GaussianActorObservationEncoder = encoder
self.network = network
self.action_dim = action_dim
self.std_min = std_min
@@ -885,7 +499,7 @@ class Policy(nn.Module):
class DefaultImageEncoder(nn.Module):
def __init__(self, config: SACConfig):
def __init__(self, config: GaussianActorConfig):
super().__init__()
image_key = next(key for key in config.input_features if is_image_feature(key))
self.image_enc_layers = nn.Sequential(
@@ -931,12 +545,12 @@ def freeze_image_encoder(image_encoder: nn.Module):
class PretrainedImageEncoder(nn.Module):
def __init__(self, config: SACConfig):
def __init__(self, config: GaussianActorConfig):
super().__init__()
self.image_enc_layers, self.image_enc_out_shape = self._load_pretrained_vision_encoder(config)
def _load_pretrained_vision_encoder(self, config: SACConfig):
def _load_pretrained_vision_encoder(self, config: GaussianActorConfig):
"""Set up CNN encoder"""
from transformers import AutoModel
@@ -32,18 +32,18 @@ from lerobot.processor import (
)
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
from .configuration_sac import SACConfig
from .configuration_gaussian_actor import GaussianActorConfig
def make_sac_pre_post_processors(
config: SACConfig,
def make_gaussian_actor_pre_post_processors(
config: GaussianActorConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""
Constructs pre-processor and post-processor pipelines for the SAC policy.
Constructs pre-processor and post-processor pipelines for the Gaussian actor policy.
The pre-processing pipeline prepares input data for the model by:
1. Renaming features to match pretrained configurations.
@@ -56,7 +56,7 @@ def make_sac_pre_post_processors(
2. Unnormalizing the output features to their original scale.
Args:
config: The configuration object for the SAC policy.
config: The configuration object for the tanh-Gaussian policy.
dataset_stats: A dictionary of statistics for normalization.
Returns:
@@ -31,7 +31,7 @@ class RewardClassifierConfig(PreTrainedConfig):
latent_dim: int = 256
image_embedding_pooling_dim: int = 8
dropout_rate: float = 0.1
model_name: str = "helper2424/resnet10" # TODO: This needs to be updated. The model on the Hub doesn't call self.post_init() in its __init__, which is required by transformers v5 to set all_tied_weights_keys. The from_pretrained call fails when it tries to access this attribute during _finalize_model_loading.
model_name: str = "lerobot/resnet10"
device: str = "cpu"
model_type: str = "cnn" # "transformer" or "cnn"
num_cameras: int = 2
@@ -108,6 +108,7 @@ class Classifier(PreTrainedPolicy):
def __init__(
self,
config: RewardClassifierConfig,
**kwargs,
):
from transformers import AutoModel
@@ -269,10 +270,6 @@ class Classifier(PreTrainedPolicy):
def predict_reward(self, batch, threshold=0.5):
"""Eval method. Returns predicted reward with the decision threshold as argument."""
# Check for both OBS_IMAGE and OBS_IMAGES prefixes
batch = self.normalize_inputs(batch)
batch = self.normalize_targets(batch)
# Extract images from batch dict
images = [batch[key] for key in self.config.input_features if key.startswith(OBS_IMAGE)]
+2
View File
@@ -61,6 +61,7 @@ from .hil_processor import (
RewardClassifierProcessorStep,
TimeLimitProcessorStep,
)
from .leader_follower_processor import LeaderFollowerProcessor
from .newline_task_processor import NewLineTaskProcessorStep
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats
from .observation_processor import VanillaObservationProcessorStep
@@ -122,6 +123,7 @@ __all__ = [
"ImageCropResizeProcessorStep",
"InfoProcessorStep",
"InterventionActionProcessorStep",
"LeaderFollowerProcessor",
"make_default_processors",
"make_default_teleop_action_processor",
"make_default_robot_action_processor",
@@ -38,6 +38,7 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
"""
use_gripper: bool = True
use_rotation: bool = False
def action(self, action: PolicyAction) -> RobotAction:
if not isinstance(action, PolicyAction):
@@ -52,7 +53,13 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
"delta_y": action[1].item(),
"delta_z": action[2].item(),
}
if self.use_gripper:
if self.use_rotation:
delta_action["delta_wx"] = action[3].item()
delta_action["delta_wy"] = action[4].item()
delta_action["delta_wz"] = action[5].item()
if self.use_gripper:
delta_action["gripper"] = action[6].item()
elif self.use_gripper:
delta_action["gripper"] = action[3].item()
return delta_action
@@ -64,6 +71,12 @@ class MapTensorToDeltaActionDictStep(ActionProcessorStep):
type=FeatureType.ACTION, shape=(1,)
)
if self.use_rotation:
for axis in ["wx", "wy", "wz"]:
features[PipelineFeatureType.ACTION][f"delta_{axis}"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,)
)
if self.use_gripper:
features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,)
@@ -90,6 +103,8 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
# Scale factors for delta movements
position_scale: float = 1.0
noise_threshold: float = 1e-3 # 1 mm threshold to filter out noise
use_rotation: bool = False
rotation_scale: float = 1.0
def action(self, action: RobotAction) -> RobotAction:
# NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy
@@ -97,23 +112,34 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
delta_x = action.pop("delta_x")
delta_y = action.pop("delta_y")
delta_z = action.pop("delta_z")
if self.use_rotation:
delta_wx = action.pop("delta_wx")
delta_wy = action.pop("delta_wy")
delta_wz = action.pop("delta_wz")
else:
delta_wx = 0.0
delta_wy = 0.0
delta_wz = 0.0
gripper = action.pop("gripper")
# Determine if the teleoperator is actively providing input
# Consider enabled if any significant movement delta is detected
position_magnitude = (delta_x**2 + delta_y**2 + delta_z**2) ** 0.5 # Use Euclidean norm for position
enabled = position_magnitude > self.noise_threshold # Small threshold to avoid noise
rotation_magnitude = (
delta_wx**2 + delta_wy**2 + delta_wz**2
) ** 0.5 # TODO use proper magnitud for rotation
enabled = (
position_magnitude > self.noise_threshold or rotation_magnitude > self.noise_threshold
) # Small threshold to avoid noise
# Scale the deltas appropriately
scaled_delta_x = delta_x * self.position_scale
scaled_delta_y = delta_y * self.position_scale
scaled_delta_z = delta_z * self.position_scale
# For gamepad/keyboard, we don't have rotation input, so set to 0
# These could be extended in the future for more sophisticated teleoperators
target_wx = 0.0
target_wy = 0.0
target_wz = 0.0
target_wx = delta_wx * self.rotation_scale
target_wy = delta_wy * self.rotation_scale
target_wz = delta_wz * self.rotation_scale
# Update action with robot target format
action = {
@@ -132,9 +158,15 @@ class MapDeltaActionToRobotActionStep(RobotActionProcessorStep):
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
for axis in ["x", "y", "z", "gripper"]:
for axis in ["x", "y", "z"]:
features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None)
if self.use_rotation:
for axis in ["wx", "wy", "wz"]:
features[PipelineFeatureType.ACTION].pop(f"delta_{axis}", None)
features[PipelineFeatureType.ACTION].pop("delta_gripper", None)
for feat in ["enabled", "target_x", "target_y", "target_z", "target_wx", "target_wy", "target_wz"]:
features[PipelineFeatureType.ACTION][f"{feat}"] = PolicyFeature(
type=FeatureType.ACTION, shape=(1,)
+35 -9
View File
@@ -321,6 +321,7 @@ class GymHILAdapterProcessorStep(ProcessorStep):
This step normalizes the `transition` object by:
1. Copying `teleop_action` from `info` to `complementary_data`.
2. Copying `is_intervention` from `info` (using the string key) to `info` (using the enum key).
3. Copying `discrete_penalty` from `info` to `complementary_data`.
"""
def __call__(self, transition: EnvTransition) -> EnvTransition:
@@ -330,6 +331,9 @@ class GymHILAdapterProcessorStep(ProcessorStep):
if TELEOP_ACTION_KEY in info:
complementary_data[TELEOP_ACTION_KEY] = info[TELEOP_ACTION_KEY]
if DISCRETE_PENALTY_KEY in info:
complementary_data[DISCRETE_PENALTY_KEY] = info[DISCRETE_PENALTY_KEY]
if "is_intervention" in info:
info[TeleopEvents.IS_INTERVENTION] = info["is_intervention"]
@@ -348,18 +352,24 @@ class GymHILAdapterProcessorStep(ProcessorStep):
@ProcessorStepRegistry.register("gripper_penalty_processor")
class GripperPenaltyProcessorStep(ProcessorStep):
"""
Applies a penalty for inefficient gripper usage.
Applies a small per-transition cost on the discrete gripper action.
This step penalizes actions that attempt to close an already closed gripper or
open an already open one, based on position thresholds.
Fires only when the commanded action would actually transition the gripper
from one extreme to the other (close-while-open or open-while-closed).
This discourages gripper oscillation while leaving "stay" and saturating-further
commands unpenalized.
Attributes:
penalty: The negative reward value to apply.
max_gripper_pos: The maximum position value for the gripper, used for normalization.
open_threshold: Normalized state below which the gripper is considered "open".
closed_threshold: Normalized state above which the gripper is considered "closed".
"""
penalty: float = -0.01
penalty: float = -0.02
max_gripper_pos: float = 30.0
open_threshold: float = 0.1
closed_threshold: float = 0.9
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""
@@ -391,9 +401,13 @@ class GripperPenaltyProcessorStep(ProcessorStep):
gripper_state_normalized = current_gripper_pos / self.max_gripper_pos
# Calculate penalty boolean as in original
gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or (
gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5
)
# - currently open AND target is closed -> close transition
# - currently closed AND target is open -> open transition
is_open = gripper_state_normalized < self.open_threshold
is_closed = gripper_state_normalized > self.closed_threshold
cmd_close = gripper_action_normalized > self.closed_threshold
cmd_open = gripper_action_normalized < self.open_threshold
gripper_penalty_bool = (is_open and cmd_close) or (is_closed and cmd_open)
gripper_penalty = self.penalty * int(gripper_penalty_bool)
@@ -409,11 +423,14 @@ class GripperPenaltyProcessorStep(ProcessorStep):
Returns the configuration of the step for serialization.
Returns:
A dictionary containing the penalty value and max gripper position.
A dictionary containing the penalty value, max gripper position,
and the open/closed thresholds.
"""
return {
"penalty": self.penalty,
"max_gripper_pos": self.max_gripper_pos,
"open_threshold": self.open_threshold,
"closed_threshold": self.closed_threshold,
}
def reset(self) -> None:
@@ -444,6 +461,7 @@ class InterventionActionProcessorStep(ProcessorStep):
use_gripper: bool = False
terminate_on_success: bool = True
use_rotation: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""
@@ -480,6 +498,14 @@ class InterventionActionProcessorStep(ProcessorStep):
teleop_action.get("delta_y", 0.0),
teleop_action.get("delta_z", 0.0),
]
if self.use_rotation:
action_list.extend(
[
teleop_action.get("delta_wx", 0.0),
teleop_action.get("delta_wy", 0.0),
teleop_action.get("delta_wz", 0.0),
]
)
if self.use_gripper:
action_list.append(teleop_action.get(GRIPPER_KEY, 1.0))
elif isinstance(teleop_action, np.ndarray):
@@ -557,7 +583,7 @@ class RewardClassifierProcessorStep(ProcessorStep):
def __post_init__(self):
"""Initializes the reward classifier model after the dataclass is created."""
if self.pretrained_path is not None:
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
self.reward_classifier.to(self.device)
@@ -0,0 +1,243 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
from lerobot.robots import Robot
from lerobot.teleoperators import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents
from lerobot.utils.rotation import Rotation
from .pipeline import ProcessorStep
@ProcessorStepRegistry.register("leader_follower_processor")
@dataclass
class LeaderFollowerProcessor(ProcessorStep):
"""
Processor for leader-follower teleoperation mode.
This processor:
1. Sends follower positions to leader arm when not intervening
2. Computes EE delta actions from leader when intervening
3. Handles teleop events from the leader device
"""
leader_device: Teleoperator
motor_names: list[str]
robot: Robot
kinematics: RobotKinematics
end_effector_step_sizes: np.ndarray | None = None
use_gripper: bool = True
# prev_leader_gripper: float | None = None
max_gripper_pos: float = 100.0
use_ik_solution: bool = False
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Process transition with leader-follower logic."""
# Get current follower position from complementary data
# raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions")
raw_joint_pos = transition.get(TransitionKey.OBSERVATION)
if raw_joint_pos is not None:
# Send follower position to leader (for follow mode)
# follower_action = {
# f"{motor}.pos": float(raw_joint_pos[motor])
# for motor in self.motor_names
# }
self.leader_device.send_action(raw_joint_pos)
# Only compute EE action if intervention is active
# (AddTeleopEventsAsInfo already added IS_INTERVENTION to info)
info = transition.get(TransitionKey.INFO, {})
if info.get(TeleopEvents.IS_INTERVENTION, False):
# Get leader joint positions from teleop_action
# (AddTeleopActionAsComplimentaryData already got the action)
complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
teleop_action = complementary.get("teleop_action", {})
if isinstance(teleop_action, dict) and raw_joint_pos is not None:
leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names])
leader_ee = self.kinematics.forward_kinematics(leader_pos)
if self.use_ik_solution and "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
else:
follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
follower_ee = self.kinematics.forward_kinematics(follower_pos)
# follower_gripper_pos = raw_joint_pos["gripper.pos"]
follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor
leader_ee_pos = leader_ee[:3, 3]
leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec()
leader_gripper_pos = np.clip(
teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos
)
follower_ee_pos = follower_ee[:3, 3]
# follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
delta_pos = leader_ee_pos - follower_ee_pos
# For rotation: compute relative rotation from follower to leader
# R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader
r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3]
delta_rvec = Rotation.from_matrix(r_delta).as_rotvec()
delta_gripper = leader_gripper_pos - follower_gripper_pos
desired = np.eye(4, dtype=float)
desired[:3, :3] = follower_ee[:3, :3] @ r_delta
desired[:3, 3] = follower_ee[:3, 3] + delta_pos
pos = desired[:3, 3]
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
assert np.allclose(pos, leader_ee_pos), "Position delta computation error"
assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error"
assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), (
"Gripper delta computation error"
)
# Normalize the action to the range [-1, 1]
delta_pos = delta_pos / np.array(
[
self.end_effector_step_sizes["x"],
self.end_effector_step_sizes["y"],
self.end_effector_step_sizes["z"],
]
)
delta_rvec = delta_rvec / np.array(
[
self.end_effector_step_sizes["wx"],
self.end_effector_step_sizes["wy"],
self.end_effector_step_sizes["wz"],
]
)
max_normalized_pos = max(
abs(delta_pos[0]),
abs(delta_pos[1]),
abs(delta_pos[2]),
)
normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2]))
max_normalized = max(max_normalized_pos, normalized_rot)
if max_normalized > 1.0:
# Scale proportionally
delta_pos = delta_pos / max_normalized
delta_rvec = delta_rvec / max_normalized
intervention_action = np.array(
[
delta_pos[0],
delta_pos[1],
delta_pos[2],
delta_rvec[0],
delta_rvec[1],
delta_rvec[2],
np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos)
/ self.max_gripper_pos,
],
dtype=float,
)
# # Extract leader positions from teleop action dict
# # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names])
# # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
# teleop_action = self.leader_device.bus.sync_read("Present_Position")
# raw_joint_pos = self.robot.bus.sync_read("Present_Position")
# leader_pos = np.array([teleop_action.get(f"{motor}", 0) for motor in self.motor_names])
# follower_pos = np.array([raw_joint_pos[f"{motor}"] for motor in self.motor_names])
# # Compute EE positions
# leader_ee_fi = self.kinematics.forward_kinematics(leader_pos)
# leader_ee_pos = leader_ee_fi[:3, 3]
# # leader_ee_rot = Rotation.from_matrix(leader_ee_fi[:3, :3]).as_rotvec()
# leader_ee = np.concat([leader_ee_pos, [0,0,0]])
# if "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
# follower_ee = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
# else:
# follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
# follower_ee_fi = self.kinematics.forward_kinematics(follower_pos)
# follower_ee_pos = follower_ee_fi[:3, 3]
# # follower_ee_rot = Rotation.from_matrix(follower_ee_fi[:3, :3]).as_rotvec()
# follower_ee = np.concat([follower_ee_pos, [0,0,0]])
# # Compute normalized EE delta
# if self.end_effector_step_sizes is not None:
# ee_delta = np.clip(
# leader_ee - follower_ee,
# -self.end_effector_step_sizes,
# self.end_effector_step_sizes
# )
# ee_delta_normalized = ee_delta / self.end_effector_step_sizes
# else:
# ee_delta_normalized = leader_ee - follower_ee
# # Handle gripper
# if self.use_gripper and len(leader_pos) > 3:
# if self.prev_leader_gripper is None:
# self.prev_leader_gripper = np.clip(
# leader_pos[-1], 0, self.max_gripper_pos
# )
# leader_gripper = leader_pos[-1]
# gripper_delta = leader_gripper - self.prev_leader_gripper
# normalized_delta = gripper_delta / self.max_gripper_pos
# # Quantize gripper action
# if normalized_delta >= 0.3:
# gripper_action = 2
# elif normalized_delta <= -0.1:
# gripper_action = 0
# else:
# gripper_action = 1
# self.prev_leader_gripper = leader_gripper
# # Create intervention action
# intervention_action = np.append(ee_delta_normalized, gripper_action)
# else:
# intervention_action = ee_delta_normalized
# # Override teleop_action with computed EE action
complementary["teleop_action"] = torch.from_numpy(intervention_action).float()
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc]
return transition
def reset(self) -> None:
"""Reset leader-follower state."""
# self.prev_leader_gripper = None
if hasattr(self.leader_device, "reset"):
self.leader_device.reset()
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
@@ -134,6 +134,15 @@ class _NormalizationMixin:
if self.dtype is None:
self.dtype = torch.float32
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
def _reshape_visual_stats(self) -> None:
"""Reshape visual stats from ``[C]`` to ``[C, 1, 1]`` for image broadcasting."""
for key, feature in self.features.items():
if feature.type == FeatureType.VISUAL and key in self._tensor_stats:
for stat_name, stat_tensor in self._tensor_stats[key].items():
if isinstance(stat_tensor, Tensor) and stat_tensor.ndim == 1:
self._tensor_stats[key][stat_name] = stat_tensor.reshape(-1, 1, 1)
def to(
self, device: torch.device | str | None = None, dtype: torch.dtype | None = None
@@ -152,6 +161,7 @@ class _NormalizationMixin:
if dtype is not None:
self.dtype = dtype
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
return self
def state_dict(self) -> dict[str, Tensor]:
@@ -201,6 +211,7 @@ class _NormalizationMixin:
# Don't load from state_dict, keep the explicitly provided stats
# But ensure _tensor_stats is properly initialized
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) # type: ignore[assignment]
self._reshape_visual_stats()
return
# Normal behavior: load stats from state_dict
@@ -211,6 +222,7 @@ class _NormalizationMixin:
self._tensor_stats.setdefault(key, {})[stat_name] = tensor.to(
dtype=torch.float32, device=self.device
)
self._reshape_visual_stats()
# Reconstruct the original stats dict from tensor stats for compatibility with to() method
# and other functions that rely on self.stats
+26 -16
View File
@@ -12,23 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Reinforcement learning modules.
"""Reinforcement learning modules.
Requires: ``pip install 'lerobot[hilserl]'``
Available modules (import directly)::
from lerobot.rl.actor import ...
from lerobot.rl.learner import ...
from lerobot.rl.learner_service import ...
from lerobot.rl.buffer import ...
from lerobot.rl.eval_policy import ...
from lerobot.rl.gym_manipulator import ...
Distributed actor / learner entry points (``actor``, ``learner``,
``learner_service``) require ``pip install 'lerobot[hilserl]'``. Algorithms,
buffer, data sources and trainer are gRPC-free and usable standalone.
"""
from lerobot.utils.import_utils import require_package
from .algorithms.base import RLAlgorithm as RLAlgorithm
from .algorithms.configs import RLAlgorithmConfig as RLAlgorithmConfig, TrainingStats as TrainingStats
from .algorithms.factory import (
make_algorithm as make_algorithm,
make_algorithm_config as make_algorithm_config,
)
from .algorithms.sac.configuration_sac import SACAlgorithmConfig as SACAlgorithmConfig
from .buffer import ReplayBuffer as ReplayBuffer
from .data_sources import DataMixer as DataMixer, OnlineOfflineMixer as OnlineOfflineMixer
from .trainer import RLTrainer as RLTrainer
require_package("grpcio", extra="hilserl", import_name="grpc")
__all__: list[str] = []
__all__ = [
"RLAlgorithm",
"RLAlgorithmConfig",
"TrainingStats",
"make_algorithm",
"make_algorithm_config",
"SACAlgorithmConfig",
"RLTrainer",
"ReplayBuffer",
"DataMixer",
"OnlineOfflineMixer",
]
+44 -49
View File
@@ -51,17 +51,19 @@ import os
import time
from functools import lru_cache
from queue import Empty
from typing import Any
import grpc
import torch
from torch import nn
from torch.multiprocessing import Event, Queue
from torch.multiprocessing import Queue
from lerobot.cameras import opencv # noqa: F401
from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.processor import TransitionKey
from lerobot.rl.queue import get_last_item_from_queue
from lerobot.rl.train_rl import TrainRLServerPipelineConfig
from lerobot.robots import so_follower # noqa: F401
from lerobot.teleoperators import gamepad, so_leader # noqa: F401
from lerobot.teleoperators.utils import TeleopEvents
@@ -74,14 +76,12 @@ from lerobot.transport.utils import (
send_bytes_in_chunks,
transitions_to_bytes,
)
from lerobot.types import TransitionKey
from lerobot.utils.device_utils import get_safe_torch_device
from lerobot.utils.process import ProcessSignalHandler
from lerobot.utils.random_utils import set_seed
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.transition import (
Transition,
move_state_dict_to_device,
move_transition_to_device,
)
from lerobot.utils.utils import (
@@ -90,12 +90,11 @@ from lerobot.utils.utils import (
)
from .gym_manipulator import (
create_transition,
make_processors,
make_robot_env,
reset_and_build_transition,
step_env_and_process_transition,
)
from .queue import get_last_item_from_queue
# Main entry point
@@ -212,7 +211,7 @@ def actor_cli(cfg: TrainRLServerPipelineConfig):
def act_with_policy(
cfg: TrainRLServerPipelineConfig,
shutdown_event: any, # Event,
shutdown_event: Any, # Event
parameters_queue: Queue,
transitions_queue: Queue,
interactions_queue: Queue,
@@ -252,22 +251,21 @@ def act_with_policy(
logging.info("make_policy")
### Instantiate the policy in both the actor and learner processes
### To avoid sending a SACPolicy object through the port, we create a policy instance
### To avoid sending a policy object through the port, we create a policy instance
### on both sides, the learner sends the updated parameters every n steps to update the actor's parameters
policy: SACPolicy = make_policy(
policy = make_policy(
cfg=cfg.policy,
env_cfg=cfg.env,
)
policy = policy.eval()
policy = policy.to(device).eval()
assert isinstance(policy, nn.Module)
obs, info = online_env.reset()
env_processor.reset()
action_processor.reset()
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
# Process initial observation
transition = create_transition(observation=obs, info=info)
transition = env_processor(transition)
transition = reset_and_build_transition(online_env, env_processor, action_processor)
# NOTE: For the moment we will solely handle the case of a single environment
sum_reward_episode = 0
@@ -291,8 +289,17 @@ def act_with_policy(
# Time policy inference and check if it meets FPS requirement
with policy_timer:
# Extract observation from transition for policy
action = policy.select_action(batch=observation)
normalized_observation = preprocessor.process_observation(observation)
action = policy.select_action(batch=normalized_observation)
# Unnormalize only the continuous part.
if cfg.policy.num_discrete_actions is not None:
continuous_action = postprocessor.process_action(action[..., :-1])
discrete_action = action[..., -1:].to(
device=continuous_action.device, dtype=continuous_action.dtype
)
action = torch.cat([continuous_action, discrete_action], dim=-1)
else:
action = postprocessor.process_action(action)
policy_fps = policy_timer.fps_last
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
@@ -326,7 +333,8 @@ def act_with_policy(
# Check for intervention from transition info
intervention_info = new_transition[TransitionKey.INFO]
if intervention_info.get(TeleopEvents.IS_INTERVENTION, False):
is_intervention = bool(intervention_info.get(TeleopEvents.IS_INTERVENTION, False))
if is_intervention:
episode_intervention = True
episode_intervention_steps += 1
@@ -334,6 +342,7 @@ def act_with_policy(
"discrete_penalty": torch.tensor(
[new_transition[TransitionKey.COMPLEMENTARY_DATA].get("discrete_penalty", 0.0)]
),
TeleopEvents.IS_INTERVENTION.value: is_intervention,
}
# Create transition for learner (convert to old format)
list_transition_to_send_to_learner.append(
@@ -390,14 +399,7 @@ def act_with_policy(
episode_intervention_steps = 0
episode_total_steps = 0
# Reset environment and processors
obs, info = online_env.reset()
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info)
transition = env_processor(transition)
transition = reset_and_build_transition(online_env, env_processor, action_processor)
if cfg.env.fps is not None:
dt_time = time.perf_counter() - start_time
@@ -409,7 +411,7 @@ def act_with_policy(
def establish_learner_connection(
stub: services_pb2_grpc.LearnerServiceStub,
shutdown_event: Event, # type: ignore
shutdown_event: Any, # Event
attempts: int = 30,
):
"""Establish a connection with the learner.
@@ -461,7 +463,7 @@ def learner_service_client(
def receive_policy(
cfg: TrainRLServerPipelineConfig,
parameters_queue: Queue,
shutdown_event: Event, # type: ignore
shutdown_event: Any, # Event
learner_client: services_pb2_grpc.LearnerServiceStub | None = None,
grpc_channel: grpc.Channel | None = None,
):
@@ -513,7 +515,7 @@ def receive_policy(
def send_transitions(
cfg: TrainRLServerPipelineConfig,
transitions_queue: Queue,
shutdown_event: any, # Event,
shutdown_event: Any, # Event
learner_client: services_pb2_grpc.LearnerServiceStub | None = None,
grpc_channel: grpc.Channel | None = None,
) -> services_pb2.Empty:
@@ -563,7 +565,7 @@ def send_transitions(
def send_interactions(
cfg: TrainRLServerPipelineConfig,
interactions_queue: Queue,
shutdown_event: Event, # type: ignore
shutdown_event: Any, # Event
learner_client: services_pb2_grpc.LearnerServiceStub | None = None,
grpc_channel: grpc.Channel | None = None,
) -> services_pb2.Empty:
@@ -613,7 +615,11 @@ def send_interactions(
logging.info("[ACTOR] Interactions process stopped")
def transitions_stream(shutdown_event: Event, transitions_queue: Queue, timeout: float) -> services_pb2.Empty: # type: ignore
def transitions_stream(
shutdown_event: Any, # Event
transitions_queue: Queue,
timeout: float,
) -> services_pb2.Empty:
while not shutdown_event.is_set():
try:
message = transitions_queue.get(block=True, timeout=timeout)
@@ -629,9 +635,9 @@ def transitions_stream(shutdown_event: Event, transitions_queue: Queue, timeout:
def interactions_stream(
shutdown_event: Event,
shutdown_event: Any, # Event
interactions_queue: Queue,
timeout: float, # type: ignore
timeout: float,
) -> services_pb2.Empty:
while not shutdown_event.is_set():
try:
@@ -652,7 +658,7 @@ def interactions_stream(
# Policy functions
def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device):
def update_policy_parameters(policy: PreTrainedPolicy, parameters_queue: Queue, device):
bytes_state_dict = get_last_item_from_queue(parameters_queue, block=False)
if bytes_state_dict is not None:
logging.info("[ACTOR] Load new parameters from Learner.")
@@ -667,18 +673,7 @@ def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device)
# - Send critic's encoder state when shared_encoder=True
# - Skip encoder params entirely when freeze_vision_encoder=True
# - Ensure discrete_critic gets correct encoder state (currently uses encoder_critic)
# Load actor state dict
actor_state_dict = move_state_dict_to_device(state_dicts["policy"], device=device)
policy.actor.load_state_dict(actor_state_dict)
# Load discrete critic if present
if hasattr(policy, "discrete_critic") and "discrete_critic" in state_dicts:
discrete_critic_state_dict = move_state_dict_to_device(
state_dicts["discrete_critic"], device=device
)
policy.discrete_critic.load_state_dict(discrete_critic_state_dict)
logging.info("[ACTOR] Loaded discrete critic parameters from Learner.")
policy.load_actor_weights(state_dicts, device=device)
# Utilities functions
+20
View File
@@ -0,0 +1,20 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .sac import SACAlgorithm as SACAlgorithm, SACAlgorithmConfig as SACAlgorithmConfig
__all__ = [
"SACAlgorithm",
"SACAlgorithmConfig",
]
+106
View File
@@ -0,0 +1,106 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any
import torch
from torch.optim import Optimizer
from lerobot.rl.algorithms.configs import RLAlgorithmConfig, TrainingStats
if TYPE_CHECKING:
from lerobot.rl.data_sources.data_mixer import DataMixer
BatchType = dict[str, Any]
class RLAlgorithm(abc.ABC):
"""Base for all RL algorithms."""
config_class: type[RLAlgorithmConfig] | None = None
name: str | None = None
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not getattr(cls, "config_class", None):
raise TypeError(f"Class {cls.__name__} must define 'config_class'")
if not getattr(cls, "name", None):
raise TypeError(f"Class {cls.__name__} must define 'name'")
@abc.abstractmethod
def update(self, batch_iterator: Iterator[BatchType]) -> TrainingStats:
"""One complete training step.
The algorithm calls ``next(batch_iterator)`` as many times as it
needs (e.g. ``utd_ratio`` times for SAC) to obtain fresh batches.
The iterator is owned by the trainer; the algorithm just consumes
from it.
"""
...
def configure_data_iterator(
self,
data_mixer: DataMixer,
batch_size: int,
*,
async_prefetch: bool = True,
queue_size: int = 2,
) -> Iterator[BatchType]:
"""Create the data iterator this algorithm needs.
The default implementation uses the standard ``data_mixer.get_iterator()``.
Algorithms that need specialised sampling should override this method.
"""
return data_mixer.get_iterator(
batch_size=batch_size,
async_prefetch=async_prefetch,
queue_size=queue_size,
)
def make_optimizers_and_scheduler(self) -> dict[str, Optimizer]:
"""Create, store, and return the optimizers needed for training.
Called on the **learner** side after construction. Subclasses must
override this with algorithm-specific optimizer setup.
"""
return {}
def get_optimizers(self) -> dict[str, Optimizer]:
"""Return optimizers for checkpointing / external scheduling."""
return {}
@property
def optimization_step(self) -> int:
"""Current learner optimization step.
Part of the stable contract for checkpoint/resume. Algorithms can
either use this default storage or override for custom behavior.
"""
return getattr(self, "_optimization_step", 0)
@optimization_step.setter
def optimization_step(self, value: int) -> None:
self._optimization_step = int(value)
def get_weights(self) -> dict[str, Any]:
"""Policy state-dict to push to actors."""
return {}
@abc.abstractmethod
def load_weights(self, weights: dict[str, Any], device: str | torch.device = "cpu") -> None:
"""Load policy state-dict received from the learner."""
+76
View File
@@ -0,0 +1,76 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import draccus
import torch
if TYPE_CHECKING:
from lerobot.rl.algorithms.base import RLAlgorithm
@dataclass
class TrainingStats:
"""Returned by ``algorithm.update()`` for logging and checkpointing."""
losses: dict[str, float] = field(default_factory=dict)
grad_norms: dict[str, float] = field(default_factory=dict)
extra: dict[str, float] = field(default_factory=dict)
def to_log_dict(self) -> dict[str, float]:
"""Flatten all stats into a single dict for logging."""
d: dict[str, float] = {}
for name, val in self.losses.items():
d[name] = val
for name, val in self.grad_norms.items():
d[f"{name}_grad_norm"] = val
for name, val in self.extra.items():
d[name] = val
return d
@dataclass
class RLAlgorithmConfig(draccus.ChoiceRegistry, abc.ABC):
"""Registry for algorithm configs."""
@property
def type(self) -> str:
"""Registered name of this algorithm config (e.g. ``"sac"``)."""
choice_name = self.get_choice_name(self.__class__)
if not isinstance(choice_name, str):
raise TypeError(f"Expected string from get_choice_name, got {type(choice_name)}")
return choice_name
@abc.abstractmethod
def build_algorithm(self, policy: torch.nn.Module) -> RLAlgorithm:
"""Construct the :class:`RLAlgorithm` for this config.
Must be overridden by every registered config subclass.
"""
raise NotImplementedError(f"{type(self).__name__} must implement build_algorithm()")
@classmethod
@abc.abstractmethod
def from_policy_config(cls, policy_cfg: Any) -> RLAlgorithmConfig:
"""Build an algorithm config from a policy config.
Must be overridden by every registered config subclass.
"""
raise NotImplementedError(f"{cls.__name__} must implement from_policy_config()")
+47
View File
@@ -0,0 +1,47 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import torch
from lerobot.rl.algorithms.base import RLAlgorithm
from lerobot.rl.algorithms.configs import RLAlgorithmConfig
def make_algorithm_config(algorithm_type: str, **kwargs) -> RLAlgorithmConfig:
"""Instantiate an `RLAlgorithmConfig` from its registered type name.
Args:
algorithm_type: Registry key of the algorithm (e.g. ``"sac"``).
**kwargs: Keyword arguments forwarded to the config class constructor.
Returns:
An instance of the matching ``RLAlgorithmConfig`` subclass.
Raises:
ValueError: If ``algorithm_type`` is not registered.
"""
try:
cls = RLAlgorithmConfig.get_choice_class(algorithm_type)
except KeyError as err:
raise ValueError(
f"Algorithm type '{algorithm_type}' is not registered. "
f"Available: {list(RLAlgorithmConfig.get_known_choices().keys())}"
) from err
return cls(**kwargs)
def make_algorithm(cfg: RLAlgorithmConfig, policy: torch.nn.Module) -> RLAlgorithm:
return cfg.build_algorithm(policy)
+18
View File
@@ -0,0 +1,18 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from lerobot.rl.algorithms.sac.configuration_sac import SACAlgorithmConfig
from lerobot.rl.algorithms.sac.sac_algorithm import SACAlgorithm
__all__ = ["SACAlgorithm", "SACAlgorithmConfig"]
@@ -0,0 +1,90 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
import torch
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import (
CriticNetworkConfig,
GaussianActorConfig,
)
from lerobot.rl.algorithms.configs import RLAlgorithmConfig
if TYPE_CHECKING:
from lerobot.rl.algorithms.sac.sac_algorithm import SACAlgorithm
@RLAlgorithmConfig.register_subclass("sac")
@dataclass
class SACAlgorithmConfig(RLAlgorithmConfig):
"""SAC algorithm hyperparameters."""
# Optimizer learning rates
actor_lr: float = 3e-4
critic_lr: float = 3e-4
temperature_lr: float = 3e-4
# Bellman update
discount: float = 0.99
use_backup_entropy: bool = True
critic_target_update_weight: float = 0.005
# Critic ensemble
num_critics: int = 2
num_subsample_critics: int | None = None
critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
discrete_critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Temperature / entropy
temperature_init: float = 1.0
# Target entropy for automatic temperature tuning. If ``None``, defaults to
# ``-|A|/2`` where ``|A|`` is the total action dimension (continuous + 1 if
# there is a discrete action head).
target_entropy: float | None = None
# Update loop
utd_ratio: int = 1
policy_update_freq: int = 1
grad_clip_norm: float = 40.0
# Optimizations
# torch.compile is currently disabled by default
use_torch_compile: bool = False
# Policy config
policy_config: GaussianActorConfig | None = None
@classmethod
def from_policy_config(cls, policy_cfg: GaussianActorConfig) -> SACAlgorithmConfig:
"""Build an algorithm config with default hyperparameters for a given policy."""
return cls(
policy_config=policy_cfg,
discrete_critic_network_kwargs=policy_cfg.discrete_critic_network_kwargs,
)
def build_algorithm(self, policy: torch.nn.Module) -> SACAlgorithm:
if self.policy_config is None:
raise ValueError(
"SACAlgorithmConfig.policy_config is None. "
"It must be populated (typically by TrainRLServerPipelineConfig.validate) "
"before calling build_algorithm()."
)
from lerobot.rl.algorithms.sac.sac_algorithm import SACAlgorithm
return SACAlgorithm(policy=policy, config=self)
@@ -0,0 +1,595 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import math
from collections.abc import Callable, Iterator
from dataclasses import asdict
from typing import Any
import einops
import torch
import torch.nn as nn
import torch.nn.functional as F # noqa: N812
from torch import Tensor
from torch.optim import Optimizer
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import (
DISCRETE_DIMENSION_INDEX,
MLP,
DiscreteCritic,
GaussianActorObservationEncoder,
GaussianActorPolicy,
orthogonal_init,
)
from lerobot.policies.utils import get_device_from_parameters
from lerobot.rl.algorithms.base import BatchType, RLAlgorithm
from lerobot.rl.algorithms.configs import TrainingStats
from lerobot.rl.algorithms.sac.configuration_sac import SACAlgorithmConfig
from lerobot.utils.constants import ACTION
from lerobot.utils.transition import move_state_dict_to_device
class SACAlgorithm(RLAlgorithm):
"""Soft Actor-Critic. Owns critics, targets, temperature, and loss computation."""
config_class = SACAlgorithmConfig
name = "sac"
def __init__(
self,
policy: GaussianActorPolicy,
config: SACAlgorithmConfig,
):
self.config = config
self.policy_config = config.policy_config
self.policy = policy
self.optimizers: dict[str, Optimizer] = {}
self._optimization_step: int = 0
action_dim = self.policy.config.output_features[ACTION].shape[0]
self._init_critics(action_dim)
self._init_temperature(action_dim)
self._device = torch.device(self.policy.config.device)
self._move_to_device()
def _init_critics(self, action_dim) -> None:
"""Build critic ensemble, targets."""
encoder = self.policy.encoder_critic
heads = [
CriticHead(
input_dim=encoder.output_dim + action_dim,
**asdict(self.config.critic_network_kwargs),
)
for _ in range(self.config.num_critics)
]
self.critic_ensemble = CriticEnsemble(encoder=encoder, ensemble=heads)
target_heads = [
CriticHead(
input_dim=encoder.output_dim + action_dim,
**asdict(self.config.critic_network_kwargs),
)
for _ in range(self.config.num_critics)
]
self.critic_target = CriticEnsemble(encoder=encoder, ensemble=target_heads)
self.critic_target.load_state_dict(self.critic_ensemble.state_dict())
# TODO(Khalil): Investigate and fix torch.compile
# NOTE: torch.compile is disabled, policy does not converge when enabled.
if self.config.use_torch_compile:
self.critic_ensemble = torch.compile(self.critic_ensemble)
self.critic_target = torch.compile(self.critic_target)
self.discrete_critic_target = None
if self.policy_config.num_discrete_actions is not None:
self.discrete_critic_target = self._init_discrete_critic_target(encoder)
def _init_discrete_critic_target(self, encoder: GaussianActorObservationEncoder) -> DiscreteCritic:
"""Build target discrete critic (main network is owned by the policy)."""
discrete_critic_target = DiscreteCritic(
encoder=encoder,
input_dim=encoder.output_dim,
output_dim=self.policy_config.num_discrete_actions,
**asdict(self.config.discrete_critic_network_kwargs),
)
# TODO(Khalil): Compile the discrete critic
discrete_critic_target.load_state_dict(self.policy.discrete_critic.state_dict())
return discrete_critic_target
def _init_temperature(self, continuous_action_dim: int) -> None:
"""Set up temperature parameter (log_alpha) and target entropy."""
temp_init = self.config.temperature_init
self.log_alpha = nn.Parameter(torch.tensor([math.log(temp_init)]))
self.target_entropy = self.config.target_entropy
if self.target_entropy is None:
total_action_dim = continuous_action_dim + (
1 if self.policy_config.num_discrete_actions is not None else 0
)
self.target_entropy = -total_action_dim / 2
def _move_to_device(self) -> None:
self.policy.to(self._device)
self.critic_ensemble.to(self._device)
self.critic_target.to(self._device)
self.log_alpha = nn.Parameter(self.log_alpha.data.to(self._device))
if self.discrete_critic_target is not None:
self.discrete_critic_target.to(self._device)
@property
def temperature(self) -> float:
"""Return the current temperature value, always in sync with log_alpha."""
return self.log_alpha.exp().item()
def _critic_forward(
self,
observations: dict[str, Tensor],
actions: Tensor,
use_target: bool = False,
observation_features: Tensor | None = None,
) -> Tensor:
"""Forward pass through a critic network ensemble
Args:
observations: Dictionary of observations
actions: Action tensor
use_target: If True, use target critics, otherwise use ensemble critics
Returns:
Tensor of Q-values from all critics
"""
critics = self.critic_target if use_target else self.critic_ensemble
q_values = critics(observations, actions, observation_features)
return q_values
def _discrete_critic_forward(
self, observations, use_target=False, observation_features=None
) -> torch.Tensor:
"""Forward pass through a discrete critic network
Args:
observations: Dictionary of observations
use_target: If True, use target critics, otherwise use ensemble critics
observation_features: Optional pre-computed observation features to avoid recomputing encoder output
Returns:
Tensor of Q-values from the discrete critic network
"""
discrete_critic = self.discrete_critic_target if use_target else self.policy.discrete_critic
q_values = discrete_critic(observations, observation_features)
return q_values
def update(self, batch_iterator: Iterator[BatchType]) -> TrainingStats:
clip = self.config.grad_clip_norm
for _ in range(self.config.utd_ratio - 1):
batch = next(batch_iterator)
fb = self._prepare_forward_batch(batch, include_complementary_info=True)
loss_critic = self._compute_loss_critic(fb)
self.optimizers["critic"].zero_grad()
loss_critic.backward()
torch.nn.utils.clip_grad_norm_(self.critic_ensemble.parameters(), max_norm=clip)
self.optimizers["critic"].step()
if self.policy_config.num_discrete_actions is not None:
loss_dc = self._compute_loss_discrete_critic(fb)
self.optimizers["discrete_critic"].zero_grad()
loss_dc.backward()
torch.nn.utils.clip_grad_norm_(self.policy.discrete_critic.parameters(), max_norm=clip)
self.optimizers["discrete_critic"].step()
self._update_target_networks()
batch = next(batch_iterator)
fb = self._prepare_forward_batch(batch, include_complementary_info=False)
loss_critic = self._compute_loss_critic(fb)
self.optimizers["critic"].zero_grad()
loss_critic.backward()
critic_grad = torch.nn.utils.clip_grad_norm_(self.critic_ensemble.parameters(), max_norm=clip).item()
self.optimizers["critic"].step()
stats = TrainingStats(
losses={"loss_critic": loss_critic.item()},
grad_norms={"critic": critic_grad},
)
if self.policy_config.num_discrete_actions is not None:
loss_dc = self._compute_loss_discrete_critic(fb)
self.optimizers["discrete_critic"].zero_grad()
loss_dc.backward()
dc_grad = torch.nn.utils.clip_grad_norm_(
self.policy.discrete_critic.parameters(), max_norm=clip
).item()
self.optimizers["discrete_critic"].step()
stats.losses["loss_discrete_critic"] = loss_dc.item()
stats.grad_norms["discrete_critic"] = dc_grad
if self._optimization_step % self.config.policy_update_freq == 0:
for _ in range(self.config.policy_update_freq):
loss_actor = self._compute_loss_actor(fb)
self.optimizers["actor"].zero_grad()
loss_actor.backward()
actor_grad = torch.nn.utils.clip_grad_norm_(
self.policy.actor.parameters(), max_norm=clip
).item()
self.optimizers["actor"].step()
loss_temp = self._compute_loss_temperature(fb)
self.optimizers["temperature"].zero_grad()
loss_temp.backward()
temp_grad = torch.nn.utils.clip_grad_norm_([self.log_alpha], max_norm=clip).item()
self.optimizers["temperature"].step()
stats.losses["loss_actor"] = loss_actor.item()
stats.losses["loss_temperature"] = loss_temp.item()
stats.grad_norms["actor"] = actor_grad
stats.grad_norms["temperature"] = temp_grad
stats.extra["temperature"] = self.temperature
self._update_target_networks()
self._optimization_step += 1
return stats
def _compute_loss_critic(self, batch: dict[str, Any]) -> Tensor:
observations = batch["state"]
actions = batch[ACTION]
rewards = batch["reward"]
next_observations = batch["next_state"]
done = batch["done"]
observation_features = batch.get("observation_feature")
next_observation_features = batch.get("next_observation_feature")
with torch.no_grad():
next_action_preds, next_log_probs, _ = self.policy.actor(
next_observations, next_observation_features
)
# 2- compute q targets
q_targets = self._critic_forward(
observations=next_observations,
actions=next_action_preds,
use_target=True,
observation_features=next_observation_features,
)
# subsample critics to prevent overfitting if use high UTD (update to date)
# TODO: Get indices before forward pass to avoid unnecessary computation
if self.config.num_subsample_critics is not None:
indices = torch.randperm(self.config.num_critics)
indices = indices[: self.config.num_subsample_critics]
q_targets = q_targets[indices]
# critics subsample size
min_q, _ = q_targets.min(dim=0) # Get values from min operation
if self.config.use_backup_entropy:
min_q = min_q - (self.temperature * next_log_probs)
td_target = rewards + (1 - done) * self.config.discount * min_q
# 3- compute predicted qs
if self.policy_config.num_discrete_actions is not None:
# NOTE: We only want to keep the continuous action part
# In the buffer we have the full action space (continuous + discrete)
# We need to split them before concatenating them in the critic forward
actions: Tensor = actions[:, :DISCRETE_DIMENSION_INDEX]
q_preds = self._critic_forward(
observations=observations,
actions=actions,
use_target=False,
observation_features=observation_features,
)
# 4- Calculate loss
# Compute state-action value loss (TD loss) for all of the Q functions in the ensemble.
td_target_duplicate = einops.repeat(td_target, "b -> e b", e=q_preds.shape[0])
# You compute the mean loss of the batch for each critic and then to compute the final loss you sum them up
critics_loss = (
F.mse_loss(
input=q_preds,
target=td_target_duplicate,
reduction="none",
).mean(dim=1)
).sum()
return critics_loss
def _compute_loss_discrete_critic(self, batch: dict[str, Any]) -> Tensor:
observations = batch["state"]
actions = batch[ACTION]
rewards = batch["reward"]
next_observations = batch["next_state"]
done = batch["done"]
observation_features = batch.get("observation_feature")
next_observation_features = batch.get("next_observation_feature")
complementary_info = batch.get("complementary_info")
# NOTE: We only want to keep the discrete action part
# In the buffer we have the full action space (continuous + discrete)
# We need to split them before concatenating them in the critic forward
actions_discrete: Tensor = actions[:, DISCRETE_DIMENSION_INDEX:].clone()
actions_discrete = torch.round(actions_discrete)
actions_discrete = actions_discrete.long()
discrete_penalties: Tensor | None = None
if complementary_info is not None:
discrete_penalties = complementary_info.get("discrete_penalty")
with torch.no_grad():
# For DQN, select actions using online network, evaluate with target network
next_discrete_qs = self._discrete_critic_forward(
next_observations, use_target=False, observation_features=next_observation_features
)
best_next_discrete_action = torch.argmax(next_discrete_qs, dim=-1, keepdim=True)
# Get target Q-values from target network
target_next_discrete_qs = self._discrete_critic_forward(
observations=next_observations,
use_target=True,
observation_features=next_observation_features,
)
# Use gather to select Q-values for best actions
target_next_discrete_q = torch.gather(
target_next_discrete_qs, dim=1, index=best_next_discrete_action
).squeeze(-1)
# Compute target Q-value with Bellman equation
rewards_discrete = rewards
if discrete_penalties is not None:
rewards_discrete = rewards + discrete_penalties
target_discrete_q = rewards_discrete + (1 - done) * self.config.discount * target_next_discrete_q
# Get predicted Q-values for current observations
predicted_discrete_qs = self._discrete_critic_forward(
observations=observations, use_target=False, observation_features=observation_features
)
# Use gather to select Q-values for taken actions
predicted_discrete_q = torch.gather(predicted_discrete_qs, dim=1, index=actions_discrete).squeeze(-1)
# Compute MSE loss between predicted and target Q-values
discrete_critic_loss = F.mse_loss(input=predicted_discrete_q, target=target_discrete_q)
return discrete_critic_loss
def _compute_loss_actor(self, batch: dict[str, Any]) -> Tensor:
observations = batch["state"]
observation_features = batch.get("observation_feature")
actions_pi, log_probs, _ = self.policy.actor(observations, observation_features)
q_preds = self._critic_forward(
observations=observations,
actions=actions_pi,
use_target=False,
observation_features=observation_features,
)
min_q_preds = q_preds.min(dim=0)[0]
actor_loss = ((self.temperature * log_probs) - min_q_preds).mean()
return actor_loss
def _compute_loss_temperature(self, batch: dict[str, Any]) -> Tensor:
"""Compute the temperature loss"""
observations = batch["state"]
observation_features = batch.get("observation_feature")
# calculate temperature loss
with torch.no_grad():
_, log_probs, _ = self.policy.actor(observations, observation_features)
temperature_loss = (-self.log_alpha.exp() * (log_probs + self.target_entropy)).mean()
return temperature_loss
def _update_target_networks(self) -> None:
"""Update target networks with exponential moving average"""
for target_p, p in zip(
self.critic_target.parameters(), self.critic_ensemble.parameters(), strict=True
):
target_p.data.copy_(
p.data * self.config.critic_target_update_weight
+ target_p.data * (1.0 - self.config.critic_target_update_weight)
)
if self.policy_config.num_discrete_actions is not None:
for target_p, p in zip(
self.discrete_critic_target.parameters(),
self.policy.discrete_critic.parameters(),
strict=True,
):
target_p.data.copy_(
p.data * self.config.critic_target_update_weight
+ target_p.data * (1.0 - self.config.critic_target_update_weight)
)
def _prepare_forward_batch(
self, batch: BatchType, *, include_complementary_info: bool = True
) -> dict[str, Any]:
observations = batch["state"]
next_observations = batch["next_state"]
observation_features, next_observation_features = self.get_observation_features(
observations, next_observations
)
forward_batch: dict[str, Any] = {
ACTION: batch[ACTION],
"reward": batch["reward"],
"state": observations,
"next_state": next_observations,
"done": batch["done"],
"observation_feature": observation_features,
"next_observation_feature": next_observation_features,
}
if include_complementary_info and "complementary_info" in batch:
forward_batch["complementary_info"] = batch["complementary_info"]
return forward_batch
def make_optimizers_and_scheduler(self) -> dict[str, Optimizer]:
"""
Creates and returns optimizers for the actor, critic, and temperature components of a reinforcement learning policy.
This function sets up Adam optimizers for:
- The **actor network**, ensuring that only relevant parameters are optimized.
- The **critic ensemble**, which evaluates the value function.
- The **temperature parameter**, which controls the entropy in soft actor-critic (SAC)-like methods.
It also initializes a learning rate scheduler, though currently, it is set to `None`.
NOTE:
- If the encoder is shared, its parameters are excluded from the actor's optimization process.
- The policy's log temperature (`log_alpha`) is wrapped in a list to ensure proper optimization as a standalone tensor.
Args:
cfg: Configuration object containing hyperparameters.
policy (nn.Module): The policy model containing the actor, critic, and temperature components.
Returns:
A dictionary mapping component names ("actor", "critic", "temperature")
to their respective Adam optimizers.
"""
actor_params = self.policy.get_optim_params()["actor"]
self.optimizers = {
"actor": torch.optim.Adam(actor_params, lr=self.config.actor_lr),
"critic": torch.optim.Adam(self.critic_ensemble.parameters(), lr=self.config.critic_lr),
"temperature": torch.optim.Adam([self.log_alpha], lr=self.config.temperature_lr),
}
if self.policy_config.num_discrete_actions is not None:
self.optimizers["discrete_critic"] = torch.optim.Adam(
self.policy.discrete_critic.parameters(), lr=self.config.critic_lr
)
return self.optimizers
def get_optimizers(self) -> dict[str, Optimizer]:
return self.optimizers
def get_weights(self) -> dict[str, Any]:
"""Send actor + discrete-critic state dicts."""
state_dicts: dict[str, Any] = {
"policy": move_state_dict_to_device(self.policy.actor.state_dict(), device="cpu"),
}
if self.policy_config.num_discrete_actions is not None:
state_dicts["discrete_critic"] = move_state_dict_to_device(
self.policy.discrete_critic.state_dict(), device="cpu"
)
return state_dicts
def load_weights(self, weights: dict[str, Any], device: str | torch.device = "cpu") -> None:
"""Load actor + discrete-critic weights into the policy."""
self.policy.load_actor_weights(weights, device=device)
def get_observation_features(
self, observations: Tensor, next_observations: Tensor
) -> tuple[Tensor | None, Tensor | None]:
"""
Get observation features from the policy encoder. It act as cache for the observation features.
when the encoder is frozen, the observation features are not updated.
We can save compute by caching the observation features.
Args:
policy: The policy model
observations: The current observations
next_observations: The next observations
Returns:
tuple: observation_features, next_observation_features
"""
if self.policy.config.vision_encoder_name is None or not self.policy.config.freeze_vision_encoder:
return None, None
with torch.no_grad():
observation_features = self.policy.actor.encoder.get_cached_image_features(observations)
next_observation_features = self.policy.actor.encoder.get_cached_image_features(next_observations)
return observation_features, next_observation_features
class CriticHead(nn.Module):
def __init__(
self,
input_dim: int,
hidden_dims: list[int],
activations: Callable[[torch.Tensor], torch.Tensor] | str = nn.SiLU(),
activate_final: bool = False,
dropout_rate: float | None = None,
init_final: float | None = None,
final_activation: Callable[[torch.Tensor], torch.Tensor] | str | None = None,
):
super().__init__()
self.net = MLP(
input_dim=input_dim,
hidden_dims=hidden_dims,
activations=activations,
activate_final=activate_final,
dropout_rate=dropout_rate,
final_activation=final_activation,
)
self.output_layer = nn.Linear(in_features=hidden_dims[-1], out_features=1)
if init_final is not None:
nn.init.uniform_(self.output_layer.weight, -init_final, init_final)
nn.init.uniform_(self.output_layer.bias, -init_final, init_final)
else:
orthogonal_init()(self.output_layer.weight)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.output_layer(self.net(x))
class CriticEnsemble(nn.Module):
"""
CriticEnsemble wraps multiple CriticHead modules into an ensemble.
Args:
encoder (GaussianActorObservationEncoder): encoder for observations.
ensemble (List[CriticHead]): list of critic heads.
init_final (float | None): optional initializer scale for final layers.
Forward returns a tensor of shape (num_critics, batch_size) containing Q-values.
"""
def __init__(
self,
encoder: GaussianActorObservationEncoder,
ensemble: list[CriticHead],
init_final: float | None = None,
):
super().__init__()
self.encoder = encoder
self.init_final = init_final
self.critics = nn.ModuleList(ensemble)
def forward(
self,
observations: dict[str, torch.Tensor],
actions: torch.Tensor,
observation_features: torch.Tensor | None = None,
) -> torch.Tensor:
device = get_device_from_parameters(self)
# Move each tensor in observations to device
observations = {k: v.to(device) for k, v in observations.items()}
obs_enc = self.encoder(observations, cache=observation_features)
inputs = torch.cat([obs_enc, actions], dim=-1)
# Loop through critics and collect outputs
q_values = []
for critic in self.critics:
q_values.append(critic(inputs))
# Stack outputs to match expected shape [num_critics, batch_size]
q_values = torch.stack([q.squeeze(-1) for q in q_values], dim=0)
return q_values
+3 -3
View File
@@ -97,8 +97,8 @@ class ReplayBuffer:
Args:
capacity (int): Maximum number of transitions to store in the buffer.
device (str): The device where the tensors will be moved when sampling ("cuda:0" or "cpu").
state_keys (List[str]): The list of keys that appear in `state` and `next_state`.
image_augmentation_function (Optional[Callable]): A function that takes a batch of images
state_keys (list[str]): The list of keys that appear in `state` and `next_state`.
image_augmentation_function (Callable | None): A function that takes a batch of images
and returns a batch of augmented images. If None, a default augmentation function is used.
use_drq (bool): Whether to use the default DRQ image augmentation style, when sampling in the buffer.
storage_device: The device (e.g. "cpu" or "cuda:0") where the data will be stored.
@@ -634,7 +634,7 @@ class ReplayBuffer:
If None, you must handle or define default keys.
Returns:
transitions (List[Transition]):
transitions (list[Transition]):
A list of Transition dictionaries with the same length as `dataset`.
"""
if state_keys is None:
+2 -2
View File
@@ -176,11 +176,11 @@ def convert_lerobot_dataset_to_cropped_lerobot_dataset(
Args:
original_dataset (LeRobotDataset): The source dataset.
crop_params_dict (Dict[str, Tuple[int, int, int, int]]):
crop_params_dict (dict[str, Tuple[int, int, int, int]]):
A dictionary mapping observation keys to crop parameters (top, left, height, width).
new_repo_id (str): Repository id for the new dataset.
new_dataset_root (str): The root directory where the new dataset will be written.
resize_size (Tuple[int, int], optional): The target size (height, width) after cropping.
resize_size (tuple[int, int], optional): The target size (height, width) after cropping.
Defaults to (128, 128).
Returns:
+17
View File
@@ -0,0 +1,17 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .data_mixer import BatchType, DataMixer, OnlineOfflineMixer
__all__ = ["BatchType", "DataMixer", "OnlineOfflineMixer"]
+96
View File
@@ -0,0 +1,96 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
from lerobot.rl.algorithms.base import BatchType
from lerobot.rl.buffer import ReplayBuffer, concatenate_batch_transitions
class DataMixer(abc.ABC):
"""Abstract interface for all data mixing strategies."""
@abc.abstractmethod
def sample(self, batch_size: int) -> BatchType:
"""Draw one batch of ``batch_size`` transitions."""
...
def get_iterator(
self,
batch_size: int,
async_prefetch: bool = True,
queue_size: int = 2,
):
"""Infinite iterator that yields batches."""
while True:
yield self.sample(batch_size)
class OnlineOfflineMixer(DataMixer):
"""Mixes transitions from an online and an offline replay buffer."""
def __init__(
self,
online_buffer: ReplayBuffer,
offline_buffer: ReplayBuffer | None = None,
online_ratio: float = 1.0,
):
if not 0.0 <= online_ratio <= 1.0:
raise ValueError(f"online_ratio must be in [0, 1], got {online_ratio}")
self.online_buffer = online_buffer
self.offline_buffer = offline_buffer
self.online_ratio = online_ratio
def sample(self, batch_size: int) -> BatchType:
if self.offline_buffer is None:
return self.online_buffer.sample(batch_size)
n_online = max(1, int(batch_size * self.online_ratio))
n_offline = batch_size - n_online
online_batch = self.online_buffer.sample(n_online)
offline_batch = self.offline_buffer.sample(n_offline)
return concatenate_batch_transitions(online_batch, offline_batch)
def get_iterator(
self,
batch_size: int,
async_prefetch: bool = True,
queue_size: int = 2,
):
"""Yield batches by composing buffer async iterators."""
n_online = max(1, int(batch_size * self.online_ratio))
online_iter = self.online_buffer.get_iterator(
batch_size=n_online,
async_prefetch=async_prefetch,
queue_size=queue_size,
)
if self.offline_buffer is None:
yield from online_iter
return
n_offline = batch_size - n_online
offline_iter = self.offline_buffer.get_iterator(
batch_size=n_offline,
async_prefetch=async_prefetch,
queue_size=queue_size,
)
while True:
yield concatenate_batch_transitions(next(online_iter), next(offline_iter))
+1 -1
View File
@@ -17,9 +17,9 @@ import logging
from lerobot.cameras import opencv # noqa: F401
from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.datasets import LeRobotDataset
from lerobot.policies import make_policy
from lerobot.rl.train_rl import TrainRLServerPipelineConfig
from lerobot.robots import ( # noqa: F401
RobotConfig,
make_robot_from_config,
+111 -79
View File
@@ -383,10 +383,21 @@ def make_processors(
GymHILAdapterProcessorStep(),
Numpy2TorchActionProcessorStep(),
VanillaObservationProcessorStep(),
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=device),
]
# Add time limit processor if reset config exists
if cfg.processor.reset is not None:
env_pipeline_steps.append(
TimeLimitProcessorStep(max_episode_steps=int(cfg.processor.reset.control_time_s * cfg.fps))
)
env_pipeline_steps.extend(
[
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=device),
]
)
return DataProcessorPipeline(
steps=env_pipeline_steps, to_transition=identity_transition, to_output=identity_transition
), DataProcessorPipeline(
@@ -551,8 +562,19 @@ def step_env_and_process_transition(
terminated = terminated or processed_action_transition[TransitionKey.DONE]
truncated = truncated or processed_action_transition[TransitionKey.TRUNCATED]
complementary_data = processed_action_transition[TransitionKey.COMPLEMENTARY_DATA].copy()
if hasattr(env, "get_raw_joint_positions"):
raw_joint_positions = env.get_raw_joint_positions()
if raw_joint_positions is not None:
complementary_data["raw_joint_positions"] = raw_joint_positions
# Merge env and action-processor info: env wins for str keys, action-processor
# wins for `TeleopEvents` enum keys
action_info = processed_action_transition[TransitionKey.INFO]
new_info = info.copy()
new_info.update(processed_action_transition[TransitionKey.INFO])
for key, value in action_info.items():
if isinstance(key, TeleopEvents):
new_info[key] = value
new_transition = create_transition(
observation=obs,
@@ -568,6 +590,24 @@ def step_env_and_process_transition(
return new_transition
def reset_and_build_transition(
env: gym.Env,
env_processor: DataProcessorPipeline[EnvTransition, EnvTransition],
action_processor: DataProcessorPipeline[EnvTransition, EnvTransition],
) -> EnvTransition:
"""Reset env + processors and return the first env-processed transition."""
obs, info = env.reset()
env_processor.reset()
action_processor.reset()
complementary_data: dict[str, Any] = {}
if hasattr(env, "get_raw_joint_positions"):
raw_joint_positions = env.get_raw_joint_positions()
if raw_joint_positions is not None:
complementary_data["raw_joint_positions"] = raw_joint_positions
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
return env_processor(data=transition)
def control_loop(
env: gym.Env,
env_processor: DataProcessorPipeline[EnvTransition, EnvTransition],
@@ -593,17 +633,7 @@ def control_loop(
print("- When not intervening, robot will stay still")
print("- Press Ctrl+C to exit")
# Reset environment and processors
obs, info = env.reset()
complementary_data = (
{"raw_joint_positions": info.pop("raw_joint_positions")} if "raw_joint_positions" in info else {}
)
env_processor.reset()
action_processor.reset()
# Process initial observation
transition = create_transition(observation=obs, info=info, complementary_data=complementary_data)
transition = env_processor(data=transition)
transition = reset_and_build_transition(env, env_processor, action_processor)
# Determine if gripper is used
use_gripper = cfg.env.processor.gripper.use_gripper if cfg.env.processor.gripper is not None else True
@@ -659,79 +689,81 @@ def control_loop(
episode_step = 0
episode_start_time = time.perf_counter()
while episode_idx < cfg.dataset.num_episodes_to_record:
step_start_time = time.perf_counter()
try:
while episode_idx < cfg.dataset.num_episodes_to_record:
step_start_time = time.perf_counter()
# Create a neutral action (no movement)
neutral_action = torch.tensor([0.0, 0.0, 0.0], dtype=torch.float32)
if use_gripper:
neutral_action = torch.cat([neutral_action, torch.tensor([0.0])]) # Gripper stay
# Use the new step function
transition = step_env_and_process_transition(
env=env,
transition=transition,
action=neutral_action,
env_processor=env_processor,
action_processor=action_processor,
)
terminated = transition.get(TransitionKey.DONE, False)
truncated = transition.get(TransitionKey.TRUNCATED, False)
if cfg.mode == "record":
observations = {
k: v.squeeze(0).cpu()
for k, v in transition[TransitionKey.OBSERVATION].items()
if isinstance(v, torch.Tensor)
}
# Use teleop_action if available, otherwise use the action from the transition
action_to_record = transition[TransitionKey.COMPLEMENTARY_DATA].get(
"teleop_action", transition[TransitionKey.ACTION]
)
frame = {
**observations,
ACTION: action_to_record.cpu(),
REWARD: np.array([transition[TransitionKey.REWARD]], dtype=np.float32),
DONE: np.array([terminated or truncated], dtype=bool),
}
# Create a neutral action (no movement)
neutral_action = torch.tensor([0.0, 0.0, 0.0], dtype=torch.float32)
if use_gripper:
discrete_penalty = transition[TransitionKey.COMPLEMENTARY_DATA].get("discrete_penalty", 0.0)
frame["complementary_info.discrete_penalty"] = np.array([discrete_penalty], dtype=np.float32)
neutral_action = torch.cat([neutral_action, torch.tensor([1.0])]) # Gripper stay
if dataset is not None:
frame["task"] = cfg.dataset.task
dataset.add_frame(frame)
episode_step += 1
# Handle episode termination
if terminated or truncated:
episode_time = time.perf_counter() - episode_start_time
logging.info(
f"Episode ended after {episode_step} steps in {episode_time:.1f}s with reward {transition[TransitionKey.REWARD]}"
transition = step_env_and_process_transition(
env=env,
transition=transition,
action=neutral_action,
env_processor=env_processor,
action_processor=action_processor,
)
episode_step = 0
episode_idx += 1
terminated = transition.get(TransitionKey.DONE, False)
truncated = transition.get(TransitionKey.TRUNCATED, False)
if dataset is not None:
if transition[TransitionKey.INFO].get(TeleopEvents.RERECORD_EPISODE, False):
logging.info(f"Re-recording episode {episode_idx}")
dataset.clear_episode_buffer()
episode_idx -= 1
else:
logging.info(f"Saving episode {episode_idx}")
dataset.save_episode()
if cfg.mode == "record":
observations = {
k: v.squeeze(0).cpu()
for k, v in transition[TransitionKey.OBSERVATION].items()
if isinstance(v, torch.Tensor)
}
action_to_record = transition[TransitionKey.COMPLEMENTARY_DATA].get(
"teleop_action", transition[TransitionKey.ACTION]
)
frame = {
**observations,
ACTION: action_to_record.cpu(),
REWARD: np.array([transition[TransitionKey.REWARD]], dtype=np.float32),
DONE: np.array([terminated or truncated], dtype=bool),
}
if use_gripper:
discrete_penalty = transition[TransitionKey.COMPLEMENTARY_DATA].get(
"discrete_penalty", 0.0
)
frame["complementary_info.discrete_penalty"] = np.array(
[discrete_penalty], dtype=np.float32
)
# Reset for new episode
obs, info = env.reset()
env_processor.reset()
action_processor.reset()
if dataset is not None:
frame["task"] = cfg.dataset.task
dataset.add_frame(frame)
transition = create_transition(observation=obs, info=info)
transition = env_processor(transition)
episode_step += 1
# Maintain fps timing
precise_sleep(max(dt - (time.perf_counter() - step_start_time), 0.0))
# Handle episode termination
if terminated or truncated:
episode_time = time.perf_counter() - episode_start_time
logging.info(
f"Episode ended after {episode_step} steps in {episode_time:.1f}s with reward {transition[TransitionKey.REWARD]}"
)
episode_step = 0
episode_idx += 1
if dataset is not None:
if transition[TransitionKey.INFO].get(TeleopEvents.RERECORD_EPISODE, False):
logging.info(f"Re-recording episode {episode_idx}")
dataset.clear_episode_buffer()
episode_idx -= 1
else:
logging.info(f"Saving episode {episode_idx}")
dataset.save_episode()
# Reset for new episode
transition = reset_and_build_transition(env, env_processor, action_processor)
# Maintain fps timing
precise_sleep(max(dt - (time.perf_counter() - step_start_time), 0.0))
finally:
if dataset is not None and dataset.writer is not None and dataset.writer.image_writer is not None:
logging.info("Waiting for image writer to finish...")
dataset.writer.image_writer.stop()
if dataset is not None and cfg.dataset.push_to_hub:
logging.info("Finalizing dataset before pushing to hub")
+59 -289
View File
@@ -51,6 +51,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from pprint import pformat
from typing import Any
import grpc
import torch
@@ -68,10 +69,14 @@ from lerobot.common.train_utils import (
)
from lerobot.common.wandb_utils import WandBLogger
from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.datasets import LeRobotDataset, make_dataset
from lerobot.policies import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies import make_policy, make_pre_post_processors
from lerobot.rl.algorithms.base import RLAlgorithm
from lerobot.rl.algorithms.factory import make_algorithm
from lerobot.rl.buffer import ReplayBuffer
from lerobot.rl.data_sources import OnlineOfflineMixer
from lerobot.rl.train_rl import TrainRLServerPipelineConfig
from lerobot.rl.trainer import RLTrainer
from lerobot.robots import so_follower # noqa: F401
from lerobot.teleoperators import gamepad, so_leader # noqa: F401
from lerobot.teleoperators.utils import TeleopEvents
@@ -92,13 +97,11 @@ from lerobot.utils.constants import (
from lerobot.utils.device_utils import get_safe_torch_device
from lerobot.utils.process import ProcessSignalHandler
from lerobot.utils.random_utils import set_seed
from lerobot.utils.transition import move_state_dict_to_device, move_transition_to_device
from lerobot.utils.utils import (
format_big_number,
init_logging,
)
from .buffer import ReplayBuffer, concatenate_batch_transitions
from .learner_service import MAX_WORKERS, SHUTDOWN_TIMEOUT, LearnerService
@@ -179,7 +182,7 @@ def train(cfg: TrainRLServerPipelineConfig, job_name: str | None = None):
def start_learner_threads(
cfg: TrainRLServerPipelineConfig,
wandb_logger: WandBLogger | None,
shutdown_event: any, # Event,
shutdown_event: Any, # Event
) -> None:
"""
Start the learner threads for training.
@@ -253,7 +256,7 @@ def start_learner_threads(
def add_actor_information_and_train(
cfg: TrainRLServerPipelineConfig,
wandb_logger: WandBLogger | None,
shutdown_event: any, # Event,
shutdown_event: Any, # Event
transition_queue: Queue,
interaction_message_queue: Queue,
parameters_queue: Queue,
@@ -266,8 +269,8 @@ def add_actor_information_and_train(
- Transfers transitions from the actor to the replay buffer.
- Logs received interaction messages.
- Ensures training begins only when the replay buffer has a sufficient number of transitions.
- Samples batches from the replay buffer and performs multiple critic updates.
- Periodically updates the actor, critic, and temperature optimizers.
- Delegates training updates to an ``RLAlgorithm``.
- Periodically pushes updated weights to actors.
- Logs training statistics, including loss values and optimization frequency.
NOTE: This function doesn't have a single responsibility, it should be split into multiple functions
@@ -286,17 +289,13 @@ def add_actor_information_and_train(
# of 7%
device = get_safe_torch_device(try_device=cfg.policy.device, log=True)
storage_device = get_safe_torch_device(try_device=cfg.policy.storage_device)
clip_grad_norm_value = cfg.policy.grad_clip_norm
online_step_before_learning = cfg.policy.online_step_before_learning
utd_ratio = cfg.policy.utd_ratio
fps = cfg.env.fps
log_freq = cfg.log_freq
save_freq = cfg.save_freq
policy_update_freq = cfg.policy.policy_update_freq
policy_parameters_push_frequency = cfg.policy.actor_learner_config.policy_parameters_push_frequency
saving_checkpoint = cfg.save_checkpoint
online_steps = cfg.policy.online_steps
async_prefetch = cfg.policy.async_prefetch
# Initialize logging for multiprocessing
if not use_threads(cfg):
@@ -308,7 +307,7 @@ def add_actor_information_and_train(
logging.info("Initializing policy")
policy: SACPolicy = make_policy(
policy = make_policy(
cfg=cfg.policy,
env_cfg=cfg.env,
)
@@ -317,15 +316,17 @@ def add_actor_information_and_train(
policy.train()
push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy)
algorithm = make_algorithm(cfg=cfg.algorithm, policy=policy)
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
# Push initial policy weights to actors
push_actor_policy_to_queue(parameters_queue=parameters_queue, algorithm=algorithm)
last_time_policy_pushed = time.time()
optimizers, lr_scheduler = make_optimizers_and_scheduler(cfg=cfg, policy=policy)
# If we are resuming, we need to load the training state
resume_optimization_step, resume_interaction_step = load_training_state(cfg=cfg, optimizers=optimizers)
log_training_info(cfg=cfg, policy=policy)
replay_buffer = initialize_replay_buffer(cfg, device, storage_device)
@@ -338,21 +339,35 @@ def add_actor_information_and_train(
device=device,
storage_device=storage_device,
)
batch_size: int = batch_size // 2 # We will sample from both replay buffer
# DataMixer: online-only or online/offline 50-50 mix
data_mixer = OnlineOfflineMixer(
online_buffer=replay_buffer,
offline_buffer=offline_replay_buffer,
online_ratio=cfg.online_ratio,
)
# RLTrainer owns the iterator, preprocessor, and creates optimizers.
trainer = RLTrainer(
algorithm=algorithm,
data_mixer=data_mixer,
batch_size=batch_size,
preprocessor=preprocessor,
)
# If we are resuming, we need to load the training state
optimizers = algorithm.get_optimizers()
resume_optimization_step, resume_interaction_step = load_training_state(cfg=cfg, optimizers=optimizers)
logging.info("Starting learner thread")
interaction_message = None
optimization_step = resume_optimization_step if resume_optimization_step is not None else 0
algorithm.optimization_step = optimization_step
interaction_step_shift = resume_interaction_step if resume_interaction_step is not None else 0
dataset_repo_id = None
if cfg.dataset is not None:
dataset_repo_id = cfg.dataset.repo_id
# Initialize iterators
online_iterator = None
offline_iterator = None
# NOTE: THIS IS THE MAIN LOOP OF THE LEARNER
while True:
# Exit the training loop if shutdown is requested
@@ -365,7 +380,6 @@ def add_actor_information_and_train(
transition_queue=transition_queue,
replay_buffer=replay_buffer,
offline_replay_buffer=offline_replay_buffer,
device=device,
dataset_repo_id=dataset_repo_id,
shutdown_event=shutdown_event,
)
@@ -382,180 +396,20 @@ def add_actor_information_and_train(
if len(replay_buffer) < online_step_before_learning:
continue
if online_iterator is None:
online_iterator = replay_buffer.get_iterator(
batch_size=batch_size, async_prefetch=async_prefetch, queue_size=2
)
if offline_replay_buffer is not None and offline_iterator is None:
offline_iterator = offline_replay_buffer.get_iterator(
batch_size=batch_size, async_prefetch=async_prefetch, queue_size=2
)
time_for_one_optimization_step = time.time()
for _ in range(utd_ratio - 1):
# Sample from the iterators
batch = next(online_iterator)
if dataset_repo_id is not None:
batch_offline = next(offline_iterator)
batch = concatenate_batch_transitions(
left_batch_transitions=batch, right_batch_transition=batch_offline
)
actions = batch[ACTION]
rewards = batch["reward"]
observations = batch["state"]
next_observations = batch["next_state"]
done = batch["done"]
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations
)
# Create a batch dictionary with all required elements for the forward method
forward_batch = {
ACTION: actions,
"reward": rewards,
"state": observations,
"next_state": next_observations,
"done": done,
"observation_feature": observation_features,
"next_observation_feature": next_observation_features,
"complementary_info": batch["complementary_info"],
}
# Use the forward method for critic loss
critic_output = policy.forward(forward_batch, model="critic")
# Main critic optimization
loss_critic = critic_output["loss_critic"]
optimizers["critic"].zero_grad()
loss_critic.backward()
critic_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=policy.critic_ensemble.parameters(), max_norm=clip_grad_norm_value
)
optimizers["critic"].step()
# Discrete critic optimization (if available)
if policy.config.num_discrete_actions is not None:
discrete_critic_output = policy.forward(forward_batch, model="discrete_critic")
loss_discrete_critic = discrete_critic_output["loss_discrete_critic"]
optimizers["discrete_critic"].zero_grad()
loss_discrete_critic.backward()
discrete_critic_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=policy.discrete_critic.parameters(), max_norm=clip_grad_norm_value
)
optimizers["discrete_critic"].step()
# Update target networks (main and discrete)
policy.update_target_networks()
# Sample for the last update in the UTD ratio
batch = next(online_iterator)
if dataset_repo_id is not None:
batch_offline = next(offline_iterator)
batch = concatenate_batch_transitions(
left_batch_transitions=batch, right_batch_transition=batch_offline
)
actions = batch[ACTION]
rewards = batch["reward"]
observations = batch["state"]
next_observations = batch["next_state"]
done = batch["done"]
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations
)
# Create a batch dictionary with all required elements for the forward method
forward_batch = {
ACTION: actions,
"reward": rewards,
"state": observations,
"next_state": next_observations,
"done": done,
"observation_feature": observation_features,
"next_observation_feature": next_observation_features,
}
critic_output = policy.forward(forward_batch, model="critic")
loss_critic = critic_output["loss_critic"]
optimizers["critic"].zero_grad()
loss_critic.backward()
critic_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=policy.critic_ensemble.parameters(), max_norm=clip_grad_norm_value
).item()
optimizers["critic"].step()
# Initialize training info dictionary
training_infos = {
"loss_critic": loss_critic.item(),
"critic_grad_norm": critic_grad_norm,
}
# Discrete critic optimization (if available)
if policy.config.num_discrete_actions is not None:
discrete_critic_output = policy.forward(forward_batch, model="discrete_critic")
loss_discrete_critic = discrete_critic_output["loss_discrete_critic"]
optimizers["discrete_critic"].zero_grad()
loss_discrete_critic.backward()
discrete_critic_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=policy.discrete_critic.parameters(), max_norm=clip_grad_norm_value
).item()
optimizers["discrete_critic"].step()
# Add discrete critic info to training info
training_infos["loss_discrete_critic"] = loss_discrete_critic.item()
training_infos["discrete_critic_grad_norm"] = discrete_critic_grad_norm
# Actor and temperature optimization (at specified frequency)
if optimization_step % policy_update_freq == 0:
for _ in range(policy_update_freq):
# Actor optimization
actor_output = policy.forward(forward_batch, model="actor")
loss_actor = actor_output["loss_actor"]
optimizers["actor"].zero_grad()
loss_actor.backward()
actor_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=policy.actor.parameters(), max_norm=clip_grad_norm_value
).item()
optimizers["actor"].step()
# Add actor info to training info
training_infos["loss_actor"] = loss_actor.item()
training_infos["actor_grad_norm"] = actor_grad_norm
# Temperature optimization
temperature_output = policy.forward(forward_batch, model="temperature")
loss_temperature = temperature_output["loss_temperature"]
optimizers["temperature"].zero_grad()
loss_temperature.backward()
temp_grad_norm = torch.nn.utils.clip_grad_norm_(
parameters=[policy.log_alpha], max_norm=clip_grad_norm_value
).item()
optimizers["temperature"].step()
# Add temperature info to training info
training_infos["loss_temperature"] = loss_temperature.item()
training_infos["temperature_grad_norm"] = temp_grad_norm
training_infos["temperature"] = policy.temperature
# One training step (trainer owns data_mixer iterator; algorithm owns UTD loop)
stats = trainer.training_step()
# Push policy to actors if needed
if time.time() - last_time_policy_pushed > policy_parameters_push_frequency:
push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy)
push_actor_policy_to_queue(parameters_queue=parameters_queue, algorithm=algorithm)
last_time_policy_pushed = time.time()
# Update target networks (main and discrete)
policy.update_target_networks()
training_infos = stats.to_log_dict()
# Log training metrics at specified intervals
optimization_step = algorithm.optimization_step
if optimization_step % log_freq == 0:
training_infos["replay_buffer_size"] = len(replay_buffer)
if offline_replay_buffer is not None:
@@ -583,7 +437,6 @@ def add_actor_information_and_train(
custom_step_key="Optimization step",
)
optimization_step += 1
if optimization_step % log_freq == 0:
logging.info(f"[LEARNER] Number of optimization step: {optimization_step}")
@@ -600,6 +453,8 @@ def add_actor_information_and_train(
offline_replay_buffer=offline_replay_buffer,
dataset_repo_id=dataset_repo_id,
fps=fps,
preprocessor=preprocessor,
postprocessor=postprocessor,
)
@@ -607,7 +462,7 @@ def start_learner(
parameters_queue: Queue,
transition_queue: Queue,
interaction_message_queue: Queue,
shutdown_event: any, # Event,
shutdown_event: Any, # Event
cfg: TrainRLServerPipelineConfig,
):
"""
@@ -684,6 +539,8 @@ def save_training_checkpoint(
offline_replay_buffer: ReplayBuffer | None = None,
dataset_repo_id: str | None = None,
fps: int = 30,
preprocessor=None,
postprocessor=None,
) -> None:
"""
Save training checkpoint and associated data.
@@ -707,6 +564,8 @@ def save_training_checkpoint(
offline_replay_buffer: Optional offline replay buffer to save
dataset_repo_id: Repository ID for dataset
fps: Frames per second for dataset
preprocessor: Optional preprocessor pipeline to save
postprocessor: Optional postprocessor pipeline to save
"""
logging.info(f"Checkpoint policy after step {optimization_step}")
_num_digits = max(6, len(str(online_steps)))
@@ -723,6 +582,8 @@ def save_training_checkpoint(
policy=policy,
optimizer=optimizers,
scheduler=None,
preprocessor=preprocessor,
postprocessor=postprocessor,
)
# Save interaction step manually
@@ -760,58 +621,6 @@ def save_training_checkpoint(
logging.info("Resume training")
def make_optimizers_and_scheduler(cfg: TrainRLServerPipelineConfig, policy: nn.Module):
"""
Creates and returns optimizers for the actor, critic, and temperature components of a reinforcement learning policy.
This function sets up Adam optimizers for:
- The **actor network**, ensuring that only relevant parameters are optimized.
- The **critic ensemble**, which evaluates the value function.
- The **temperature parameter**, which controls the entropy in soft actor-critic (SAC)-like methods.
It also initializes a learning rate scheduler, though currently, it is set to `None`.
NOTE:
- If the encoder is shared, its parameters are excluded from the actor's optimization process.
- The policy's log temperature (`log_alpha`) is wrapped in a list to ensure proper optimization as a standalone tensor.
Args:
cfg: Configuration object containing hyperparameters.
policy (nn.Module): The policy model containing the actor, critic, and temperature components.
Returns:
Tuple[Dict[str, torch.optim.Optimizer], Optional[torch.optim.lr_scheduler._LRScheduler]]:
A tuple containing:
- `optimizers`: A dictionary mapping component names ("actor", "critic", "temperature") to their respective Adam optimizers.
- `lr_scheduler`: Currently set to `None` but can be extended to support learning rate scheduling.
"""
optimizer_actor = torch.optim.Adam(
params=[
p
for n, p in policy.actor.named_parameters()
if not policy.config.shared_encoder or not n.startswith("encoder")
],
lr=cfg.policy.actor_lr,
)
optimizer_critic = torch.optim.Adam(params=policy.critic_ensemble.parameters(), lr=cfg.policy.critic_lr)
if cfg.policy.num_discrete_actions is not None:
optimizer_discrete_critic = torch.optim.Adam(
params=policy.discrete_critic.parameters(), lr=cfg.policy.critic_lr
)
optimizer_temperature = torch.optim.Adam(params=[policy.log_alpha], lr=cfg.policy.critic_lr)
lr_scheduler = None
optimizers = {
"actor": optimizer_actor,
"critic": optimizer_critic,
"temperature": optimizer_temperature,
}
if cfg.policy.num_discrete_actions is not None:
optimizers["discrete_critic"] = optimizer_discrete_critic
return optimizers, lr_scheduler
# Training setup functions
@@ -1016,33 +825,6 @@ def initialize_offline_replay_buffer(
# Utilities/Helpers functions
def get_observation_features(
policy: SACPolicy, observations: torch.Tensor, next_observations: torch.Tensor
) -> tuple[torch.Tensor | None, torch.Tensor | None]:
"""
Get observation features from the policy encoder. It act as cache for the observation features.
when the encoder is frozen, the observation features are not updated.
We can save compute by caching the observation features.
Args:
policy: The policy model
observations: The current observations
next_observations: The next observations
Returns:
tuple: observation_features, next_observation_features
"""
if policy.config.vision_encoder_name is None or not policy.config.freeze_vision_encoder:
return None, None
with torch.no_grad():
observation_features = policy.actor.encoder.get_cached_image_features(observations)
next_observation_features = policy.actor.encoder.get_cached_image_features(next_observations)
return observation_features, next_observation_features
def use_threads(cfg: TrainRLServerPipelineConfig) -> bool:
return cfg.policy.concurrency.learner == "threads"
@@ -1093,19 +875,11 @@ def check_nan_in_transition(
return nan_detected
def push_actor_policy_to_queue(parameters_queue: Queue, policy: nn.Module):
def push_actor_policy_to_queue(parameters_queue: Queue, algorithm: RLAlgorithm) -> None:
logging.debug("[LEARNER] Pushing actor policy to the queue")
# Create a dictionary to hold all the state dicts
state_dicts = {"policy": move_state_dict_to_device(policy.actor.state_dict(), device="cpu")}
# Add discrete critic if it exists
if hasattr(policy, "discrete_critic") and policy.discrete_critic is not None:
state_dicts["discrete_critic"] = move_state_dict_to_device(
policy.discrete_critic.state_dict(), device="cpu"
)
logging.debug("[LEARNER] Including discrete critic in state dict push")
state_dicts = algorithm.get_weights()
state_bytes = state_to_bytes(state_dicts)
parameters_queue.put(state_bytes)
@@ -1129,9 +903,8 @@ def process_transitions(
transition_queue: Queue,
replay_buffer: ReplayBuffer,
offline_replay_buffer: ReplayBuffer,
device: str,
dataset_repo_id: str | None,
shutdown_event: any,
shutdown_event: Any, # Event
):
"""Process all available transitions from the queue.
@@ -1139,7 +912,6 @@ def process_transitions(
transition_queue: Queue for receiving transitions from the actor
replay_buffer: Replay buffer to add transitions to
offline_replay_buffer: Offline replay buffer to add transitions to
device: Device to move transitions to
dataset_repo_id: Repository ID for dataset
shutdown_event: Event to signal shutdown
"""
@@ -1148,8 +920,6 @@ def process_transitions(
transition_list = bytes_to_transitions(buffer=transition_list)
for transition in transition_list:
transition = move_transition_to_device(transition=transition, device=device)
# Skip transitions with NaN values
if check_nan_in_transition(
observations=transition["state"],
@@ -1163,7 +933,7 @@ def process_transitions(
# Add to offline buffer if it's an intervention
if dataset_repo_id is not None and transition.get("complementary_info", {}).get(
TeleopEvents.IS_INTERVENTION
TeleopEvents.IS_INTERVENTION.value
):
offline_replay_buffer.add(**transition)
@@ -1172,7 +942,7 @@ def process_interaction_messages(
interaction_message_queue: Queue,
interaction_step_shift: int,
wandb_logger: WandBLogger | None,
shutdown_event: any,
shutdown_event: Any, # Event
) -> dict | None:
"""Process all available interaction messages from the queue.
+49
View File
@@ -0,0 +1,49 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Top-level pipeline config for distributed RL training (actor / learner)."""
from __future__ import annotations
from dataclasses import dataclass
from lerobot.configs.default import DatasetConfig
from lerobot.configs.train import TrainPipelineConfig
from lerobot.rl.algorithms.configs import RLAlgorithmConfig
from lerobot.rl.algorithms.factory import make_algorithm_config
from lerobot.rl.algorithms.sac import SACAlgorithmConfig # noqa: F401
@dataclass(kw_only=True)
class TrainRLServerPipelineConfig(TrainPipelineConfig):
# NOTE: In RL, we don't need an offline dataset
# TODO: Make `TrainPipelineConfig.dataset` optional
dataset: DatasetConfig | None = None # type: ignore[assignment] # because the parent class has made it's type non-optional
# Algorithm config.
algorithm: RLAlgorithmConfig | None = None
# Data mixer strategy name. Currently supports "online_offline".
mixer: str = "online_offline"
# Fraction sampled from online replay when using OnlineOfflineMixer.
online_ratio: float = 0.5
def validate(self) -> None:
super().validate()
if self.algorithm is None:
self.algorithm = make_algorithm_config("sac")
if getattr(self.algorithm, "policy_config", None) is None:
self.algorithm.policy_config = self.policy
+99
View File
@@ -0,0 +1,99 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from collections.abc import Iterator
from typing import Any
from lerobot.rl.algorithms.base import BatchType, RLAlgorithm
from lerobot.rl.algorithms.configs import TrainingStats
from lerobot.rl.data_sources.data_mixer import DataMixer
class RLTrainer:
"""Unified training step orchestrator.
Holds the algorithm, a DataMixer, and an optional preprocessor.
"""
def __init__(
self,
algorithm: RLAlgorithm,
data_mixer: DataMixer,
batch_size: int,
*,
preprocessor: Any | None = None,
):
self.algorithm = algorithm
self.data_mixer = data_mixer
self.batch_size = batch_size
self._preprocessor = preprocessor
self._iterator: Iterator[BatchType] | None = None
self.algorithm.make_optimizers_and_scheduler()
def _build_data_iterator(self) -> Iterator[BatchType]:
"""Create a fresh algorithm-configured iterator (optionally preprocessed)."""
raw = self.algorithm.configure_data_iterator(
data_mixer=self.data_mixer,
batch_size=self.batch_size,
)
if self._preprocessor is not None:
return _PreprocessedIterator(raw, self._preprocessor)
return raw
def reset_data_iterator(self) -> None:
"""Discard the current iterator so it will be rebuilt lazily next step."""
self._iterator = None
def set_data_mixer(self, data_mixer: DataMixer, *, reset: bool = True) -> None:
"""Swap the active data mixer, optionally resetting the iterator."""
self.data_mixer = data_mixer
if reset:
self.reset_data_iterator()
def training_step(self) -> TrainingStats:
"""Run one training step (algorithm-agnostic)."""
if self._iterator is None:
self._iterator = self._build_data_iterator()
return self.algorithm.update(self._iterator)
def preprocess_rl_batch(preprocessor: Any, batch: BatchType) -> BatchType:
"""Apply policy preprocessing to RL observations only."""
observations = batch["state"]
next_observations = batch["next_state"]
batch["state"] = preprocessor.process_observation(observations)
batch["next_state"] = preprocessor.process_observation(next_observations)
return batch
class _PreprocessedIterator:
"""Iterator wrapper that preprocesses each sampled RL batch."""
__slots__ = ("_raw", "_preprocessor")
def __init__(self, raw_iterator: Iterator[BatchType], preprocessor: Any) -> None:
self._raw = raw_iterator
self._preprocessor = preprocessor
def __iter__(self) -> _PreprocessedIterator:
return self
def __next__(self) -> BatchType:
batch = next(self._raw)
return preprocess_rl_batch(self._preprocessor, batch)
@@ -18,6 +18,7 @@ from dataclasses import dataclass, field
from typing import Any
import numpy as np
import torch
from lerobot.configs import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.model import RobotKinematics
@@ -31,6 +32,7 @@ from lerobot.processor import (
RobotObservation,
TransitionKey,
)
from lerobot.utils.constants import OBS_STATE
from lerobot.utils.rotation import Rotation
@@ -126,9 +128,18 @@ class EEReferenceAndDelta(RobotActionProcessorStep):
],
dtype=float,
)
r_abs = Rotation.from_rotvec([wx, wy, wz]).as_matrix()
delta_r = np.array(
[
wx * self.end_effector_step_sizes.get("wx", 1),
wy * self.end_effector_step_sizes.get("wy", 1),
wz * self.end_effector_step_sizes.get("wz", 1),
],
dtype=float,
)
r_mat = Rotation.from_rotvec(delta_r).as_matrix()
desired = np.eye(4, dtype=float)
desired[:3, :3] = ref[:3, :3] @ r_abs
desired[:3, :3] = ref[:3, :3] @ r_mat
desired[:3, 3] = ref[:3, 3] + delta_p
self._command_when_disabled = desired.copy()
@@ -353,13 +364,16 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
speed_factor: A scaling factor to convert the normalized velocity command to a position change.
clip_min: The minimum allowed gripper joint position.
clip_max: The maximum allowed gripper joint position.
discrete_gripper: If True, treat the input action as discrete (0: open, 1: close, 2: stay).
discrete_gripper: If True, interpret the input as a discrete class index
{0 = close, 1 = stay, 2 = open}, matching `GamepadTeleop.GripperAction`.
"""
speed_factor: float = 20.0
clip_min: float = 0.0
clip_max: float = 100.0
discrete_gripper: bool = False
scale_velocity: bool = False
use_ik_solution: bool = False
def action(self, action: RobotAction) -> RobotAction:
observation = self.transition.get(TransitionKey.OBSERVATION).copy()
@@ -369,18 +383,21 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
if observation is None:
raise ValueError("Joints observation is require for computing robot kinematics")
q_raw = np.array(
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")],
dtype=float,
)
if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA):
q_raw = self.transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
else:
q_raw = np.array(
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")],
dtype=float,
)
if q_raw is None:
raise ValueError("Joints observation is require for computing robot kinematics")
if self.discrete_gripper:
# Discrete gripper actions are in [0, 1, 2]
# 0: open, 1: close, 2: stay
# We need to shift them to [-1, 0, 1] and then scale them to clip_max
gripper_vel = (gripper_vel - 1) * self.clip_max
if self.discrete_gripper or self.scale_velocity:
# Map discrete command {0=close, 1=stay, 2=open} -> signed velocity.
# Negation accounts for SO100 sign (joint position increases on close).
# 0 -> +clip_max (close), 1 -> 0 (stay), 2 -> -clip_max (open)
gripper_vel = -(gripper_vel - 1) * self.clip_max
# Compute desired gripper position
delta = gripper_vel * float(self.speed_factor)
@@ -578,6 +595,7 @@ class InverseKinematicsRLStep(ProcessorStep):
# Compute inverse kinematics
q_target = self.kinematics.inverse_kinematics(self.q_curr, t_des)
q_target[-1] = gripper_pos # Set gripper position
self.q_curr = q_target
# TODO: This is sentitive to order of motor_names = q_target mapping
@@ -609,3 +627,50 @@ class InverseKinematicsRLStep(ProcessorStep):
def reset(self):
"""Resets the initial guess for the IK solver."""
self.q_curr = None
@dataclass
@ProcessorStepRegistry.register("ee_observation")
class EEObservationStep(ObservationProcessorStep):
use_rotation: bool = False
def observation(self, observation: dict) -> dict:
ee_pose_list = [
observation["ee.x"],
observation["ee.y"],
observation["ee.z"],
]
if self.use_rotation:
ee_pose_list.extend(
[
observation["ee.wx"],
observation["ee.wy"],
observation["ee.wz"],
]
)
# gripper_pos = action.pop("ee.gripper_pos")
ee_pose = torch.tensor(ee_pose_list, dtype=torch.float32).unsqueeze(0)
current_state = observation.get(OBS_STATE)
if current_state is None:
return observation
extended_state = torch.cat([current_state, ee_pose], dim=-1)
# Create new observation dict
new_observation = dict(observation)
new_observation[OBS_STATE] = extended_state
return new_observation
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
if OBS_STATE in features[PipelineFeatureType.OBSERVATION]:
original_feature = features[PipelineFeatureType.OBSERVATION][OBS_STATE]
new_shape = (original_feature.shape[0] + 3,) + original_feature.shape[1:]
features[PipelineFeatureType.OBSERVATION][OBS_STATE] = PolicyFeature(
type=original_feature.type, shape=new_shape
)
return features
@@ -168,6 +168,12 @@ class SOFollower(Robot):
self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout
self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded
# Set Goal_Position = Present_Position while torque is still disabled so
# that when torque is re-enabled at the end of this block the motors have
# zero positional error and do not snap to a stale register value.
present = self.bus.sync_read("Present_Position")
self.bus.sync_write("Goal_Position", present)
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
@@ -104,11 +104,14 @@ class KeyboardTeleop(Teleoperator):
def _on_press(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, True))
key = key.char
self.event_queue.put((key, True))
def _on_release(self, key):
if hasattr(key, "char"):
self.event_queue.put((key.char, False))
key = key.char
self.event_queue.put((key, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()
@@ -204,8 +207,6 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
# this is useful for retrieving other events like interventions for RL, episode success, etc.
self.misc_keys_queue.put(key)
self.current_pressed.clear()
action_dict = {
"delta_x": delta_x,
"delta_y": delta_y,
@@ -256,6 +257,8 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
]
is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys)
self.current_pressed.clear()
# Check for episode control commands from misc_keys_queue
terminate_episode = False
success = False
@@ -20,6 +20,7 @@ from .config_so_leader import (
SOLeaderConfig,
SOLeaderTeleopConfig,
)
from .so101_leader_follower import SO101LeaderFollower
from .so_leader import SO100Leader, SO101Leader, SOLeader
__all__ = [
@@ -27,6 +28,7 @@ __all__ = [
"SO100LeaderConfig",
"SO101Leader",
"SO101LeaderConfig",
"SO101LeaderFollower",
"SOLeader",
"SOLeaderConfig",
"SOLeaderTeleopConfig",
@@ -29,6 +29,11 @@ class SOLeaderConfig:
# Whether to use degrees for angles
use_degrees: bool = True
# Enable leader-follower mode where leader can both lead and follow
leader_follower_mode: bool = False
use_gripper: bool = True
@TeleoperatorConfig.register_subclass("so101_leader")
@TeleoperatorConfig.register_subclass("so100_leader")
@@ -0,0 +1,261 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import time
from collections import deque
from threading import Event, Thread
import numpy as np
from lerobot.teleoperators.so_leader.so_leader import SOLeader as SO101Leader
from lerobot.teleoperators.utils import TeleopEvents
PYNPUT_AVAILABLE = True
try:
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
logging.info(f"Could not import pynput: {e}")
logger = logging.getLogger(__name__)
class SO101LeaderFollower(SO101Leader):
"""
Extended SO101 Leader that can both lead (human control) and follow (mimic follower).
This class adds leader-follower functionality where:
- In follow mode: The leader arm mimics the follower's position (torque enabled)
- In lead mode: Human controls the leader (torque disabled) and provides actions
"""
def __init__(self, config):
super().__init__(config)
# Leader-follower state
self.is_intervening = False
# Initialize as False because configure() disables torque at connect time;
# send_action() will re-enable it on the first call when not intervening.
self.leader_torque_enabled = False
# Tracking error for automatic intervention detection
self.leader_tracking_error_queue = deque(maxlen=4)
# Keyboard event handling
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False,
}
self.keyboard_thread = None
self.stop_event = Event()
# Store last follower position for action computation
self.last_follower_pos = None
@property
def action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
"shape": (7,),
"names": {
"delta_x": 0,
"delta_y": 1,
"delta_z": 2,
"delta_wx": 3,
"delta_wy": 4,
"delta_wz": 5,
"gripper": 6,
},
}
else:
return {
"dtype": "float32",
"shape": (6,),
"names": {
"delta_x": 0,
"delta_y": 1,
"delta_z": 2,
"delta_wx": 3,
"delta_wy": 4,
"delta_wz": 5,
},
}
def connect(self, calibrate: bool = True) -> None:
"""Connect and configure for leader-follower mode."""
super().connect(calibrate)
# Configure for leader-follower mode with lower gains
# Lower gains allow manual intervention without injury risk
# self.bus.sync_write("Torque_Enable", 1)
for motor in self.bus.motors:
self.bus.write("P_Coefficient", motor, 16)
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 16)
# Start keyboard listener
self._start_keyboard_listener()
print("- Leader-Follower Mode:")
print(" - Press SPACE to toggle intervention (leader control)")
print(" - When not intervening, leader follows follower position")
print(" - When intervening, follower follows leader in end-effector space")
print(" - Press 's' to mark episode as success")
print(" - Press ESC to end episode as failure")
print(" - Press 'r' to re-record episode")
def _start_keyboard_listener(self):
"""Start keyboard listener thread for intervention control."""
def on_press(key):
try:
if key == keyboard.Key.space:
self.keyboard_events["intervention"] = not self.keyboard_events["intervention"]
self.is_intervening = self.keyboard_events["intervention"]
state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE"
logger.info(f"Toggled to {state}")
elif key == keyboard.Key.esc:
self.keyboard_events["failure"] = True
elif hasattr(key, "char"):
if key.char == "s":
self.keyboard_events["success"] = True
elif key.char == "r":
self.keyboard_events["rerecord"] = True
except Exception as e:
logger.error(f"Error handling key press: {e}")
def listen():
with keyboard.Listener(on_press=on_press) as listener:
while not self.stop_event.is_set():
time.sleep(0.1)
listener.stop()
self.keyboard_thread = Thread(target=listen, daemon=True)
self.keyboard_thread.start()
def send_action(self, action: dict[str, float]) -> None:
"""
Send position commands to leader arm (follow mode).
Args:
action: Dictionary of motor positions to command
"""
# Store follower position for later use
self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors])
if not self.is_intervening:
# Follow mode: enable torque and track follower
if not self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 1)
self.leader_torque_enabled = True
# Send follower positions to leader
goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors}
self.bus.sync_write("Goal_Position", goal_pos)
# Track error for automatic intervention detection
current_pos = self.bus.sync_read("Present_Position")
current_array = np.array([current_pos[motor] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
def get_action(self) -> dict[str, float]:
"""
Get action from leader arm.
In follow mode: Returns neutral/current positions
In lead mode: Returns actual leader positions for follower to track
"""
start = time.perf_counter()
if self.is_intervening:
# Lead mode: disable torque if needed and return leader positions
if self.leader_torque_enabled:
self.bus.sync_write("Torque_Enable", 0)
self.leader_torque_enabled = False
# Get current leader position
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
# Track error
if self.last_follower_pos is not None:
current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors])
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
self.leader_tracking_error_queue.append(error)
else:
# Follow mode: return current/neutral positions
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
"""Get current keyboard events."""
events = {}
# Map keyboard events to TeleopEvents
if self.keyboard_events["success"]:
events[TeleopEvents.SUCCESS] = True
self.keyboard_events["success"] = False
if self.keyboard_events["failure"]:
events[TeleopEvents.FAILURE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["failure"] = False
if self.keyboard_events["rerecord"]:
events[TeleopEvents.RERECORD_EPISODE] = True
events[TeleopEvents.TERMINATE_EPISODE] = True
self.keyboard_events["rerecord"] = False
# Always report intervention state
events[TeleopEvents.IS_INTERVENTION] = self.is_intervening
return events
def disconnect(self) -> None:
"""Disconnect and cleanup."""
self.stop_event.set()
if self.keyboard_thread:
self.keyboard_thread.join(timeout=1.0)
super().disconnect()
def reset(self) -> None:
"""Reset leader-follower state."""
self.is_intervening = False
self.leader_torque_enabled = True
self.leader_tracking_error_queue.clear()
self.keyboard_events = {
"intervention": False,
"success": False,
"failure": False,
"rerecord": False,
}
+3 -2
View File
@@ -52,9 +52,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
return SO100Leader(config)
elif config.type == "so101_leader":
from .so_leader import SO101Leader
from .so_leader import SO101LeaderFollower
return SO101Leader(config)
if getattr(config, "leader_follower_mode", False):
return SO101LeaderFollower(config)
elif config.type == "mock_teleop":
from tests.mocks.mock_teleop import MockTeleop
@@ -39,8 +39,8 @@ For more details, see the [Physical Intelligence π₀ blog post](https://www.ph
π₀.₅ represents a significant evolution from π₀, developed by Physical Intelligence to address a big challenge in robotics: open-world generalization. While robots can perform impressive tasks in controlled environments, π₀.₅ is designed to generalize to entirely new environments and situations that were never seen during training.
For more details, see the [Physical Intelligence π₀.₅ blog post](https://www.physicalintelligence.company/blog/pi05).
{% elif model_name == "sac" %}
[Soft Actor-Critic (SAC)](https://huggingface.co/papers/1801.01290) is an entropy-regularised actor-critic algorithm offering stable, sample-efficient learning in continuous-control environments.
{% elif model_name == "gaussian_actor" %}
This is a Gaussian Actor policy (Gaussian policy with a tanh squash) — the policy-side component used by [Soft Actor-Critic (SAC)](https://huggingface.co/papers/1801.01290) and related maximum-entropy continuous-control algorithms.
{% elif model_name == "reward_classifier" %}
A reward classifier is a lightweight neural network that scores observations or trajectories for task success, providing a learned reward signal or offline evaluation when explicit rewards are unavailable.
{% else %}
@@ -14,12 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.policies.sac.reward_model.modeling_classifier import ClassifierOutput
from lerobot.policies.gaussian_actor.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import ClassifierOutput
from lerobot.utils.constants import OBS_IMAGE, REWARD
from tests.utils import skip_if_package_missing
@@ -38,11 +37,8 @@ def test_classifier_output():
@skip_if_package_missing("transformers")
@pytest.mark.skip(
reason="helper2424/resnet10 needs to be updated to work with the latest version of transformers"
)
def test_binary_classifier_with_default_params():
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
config = RewardClassifierConfig()
config.input_features = {
@@ -82,11 +78,8 @@ def test_binary_classifier_with_default_params():
@skip_if_package_missing("transformers")
@pytest.mark.skip(
reason="helper2424/resnet10 needs to be updated to work with the latest version of transformers"
)
def test_multiclass_classifier():
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
num_classes = 5
config = RewardClassifierConfig()
@@ -124,11 +117,8 @@ def test_multiclass_classifier():
@skip_if_package_missing("transformers")
@pytest.mark.skip(
reason="helper2424/resnet10 needs to be updated to work with the latest version of transformers"
)
def test_default_device():
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
config = RewardClassifierConfig()
assert config.device == "cpu"
@@ -139,11 +129,8 @@ def test_default_device():
@skip_if_package_missing("transformers")
@pytest.mark.skip(
reason="helper2424/resnet10 needs to be updated to work with the latest version of transformers"
)
def test_explicit_device_setup():
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
from lerobot.policies.gaussian_actor.reward_model.modeling_classifier import Classifier
config = RewardClassifierConfig(device="cpu")
assert config.device == "cpu"
@@ -17,19 +17,19 @@
import pytest
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.policies.sac.configuration_sac import (
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import (
ActorLearnerConfig,
ActorNetworkConfig,
ConcurrencyConfig,
CriticNetworkConfig,
GaussianActorConfig,
PolicyConfig,
SACConfig,
)
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE
def test_sac_config_default_initialization():
config = SACConfig()
def test_gaussian_actor_config_default_initialization():
config = GaussianActorConfig()
assert config.normalization_mapping == {
"VISUAL": NormalizationMode.MEAN_STD,
@@ -55,9 +55,6 @@ def test_sac_config_default_initialization():
# Basic parameters
assert config.device == "cpu"
assert config.storage_device == "cpu"
assert config.discount == 0.99
assert config.temperature_init == 1.0
assert config.num_critics == 2
# Architecture specifics
assert config.vision_encoder_name is None
@@ -66,6 +63,8 @@ def test_sac_config_default_initialization():
assert config.shared_encoder is True
assert config.num_discrete_actions is None
assert config.image_embedding_pooling_dim == 8
assert config.state_encoder_hidden_dim == 256
assert config.latent_dim == 256
# Training parameters
assert config.online_steps == 1000000
@@ -73,20 +72,6 @@ def test_sac_config_default_initialization():
assert config.offline_buffer_capacity == 100000
assert config.async_prefetch is False
assert config.online_step_before_learning == 100
assert config.policy_update_freq == 1
# SAC algorithm parameters
assert config.num_subsample_critics is None
assert config.critic_lr == 3e-4
assert config.actor_lr == 3e-4
assert config.temperature_lr == 3e-4
assert config.critic_target_update_weight == 0.005
assert config.utd_ratio == 1
assert config.state_encoder_hidden_dim == 256
assert config.latent_dim == 256
assert config.target_entropy is None
assert config.use_backup_entropy is True
assert config.grad_clip_norm == 40.0
# Dataset stats defaults
expected_dataset_stats = {
@@ -105,11 +90,6 @@ def test_sac_config_default_initialization():
}
assert config.dataset_stats == expected_dataset_stats
# Critic network configuration
assert config.critic_network_kwargs.hidden_dims == [256, 256]
assert config.critic_network_kwargs.activate_final is True
assert config.critic_network_kwargs.final_activation is None
# Actor network configuration
assert config.actor_network_kwargs.hidden_dims == [256, 256]
assert config.actor_network_kwargs.activate_final is True
@@ -135,7 +115,6 @@ def test_sac_config_default_initialization():
assert config.concurrency.learner == "threads"
assert isinstance(config.actor_network_kwargs, ActorNetworkConfig)
assert isinstance(config.critic_network_kwargs, CriticNetworkConfig)
assert isinstance(config.policy_kwargs, PolicyConfig)
assert isinstance(config.actor_learner_config, ActorLearnerConfig)
assert isinstance(config.concurrency, ConcurrencyConfig)
@@ -175,22 +154,22 @@ def test_concurrency_config():
assert config.learner == "threads"
def test_sac_config_custom_initialization():
config = SACConfig(
def test_gaussian_actor_config_custom_initialization():
config = GaussianActorConfig(
device="cpu",
discount=0.95,
temperature_init=0.5,
num_critics=3,
latent_dim=128,
state_encoder_hidden_dim=128,
num_discrete_actions=3,
)
assert config.device == "cpu"
assert config.discount == 0.95
assert config.temperature_init == 0.5
assert config.num_critics == 3
assert config.latent_dim == 128
assert config.state_encoder_hidden_dim == 128
assert config.num_discrete_actions == 3
def test_validate_features():
config = SACConfig(
config = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(3,))},
)
@@ -198,7 +177,7 @@ def test_validate_features():
def test_validate_features_missing_observation():
config = SACConfig(
config = GaussianActorConfig(
input_features={"wrong_key": PolicyFeature(type=FeatureType.STATE, shape=(10,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(3,))},
)
@@ -209,7 +188,7 @@ def test_validate_features_missing_observation():
def test_validate_features_missing_action():
config = SACConfig(
config = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,))},
output_features={"wrong_key": PolicyFeature(type=FeatureType.ACTION, shape=(3,))},
)
@@ -0,0 +1,528 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import torch # noqa: E402
from torch import Tensor, nn # noqa: E402
from lerobot.configs.types import FeatureType, PolicyFeature # noqa: E402
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import GaussianActorConfig # noqa: E402
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import MLP, GaussianActorPolicy # noqa: E402
from lerobot.rl.algorithms.sac import SACAlgorithm, SACAlgorithmConfig # noqa: E402
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE # noqa: E402
from lerobot.utils.random_utils import seeded_context, set_seed # noqa: E402
try:
import transformers # noqa: F401
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
@pytest.fixture(autouse=True)
def set_random_seed():
seed = 42
set_seed(seed)
def test_mlp_with_default_args():
mlp = MLP(input_dim=10, hidden_dims=[256, 256])
x = torch.randn(10)
y = mlp(x)
assert y.shape == (256,)
def test_mlp_with_batch_dim():
mlp = MLP(input_dim=10, hidden_dims=[256, 256])
x = torch.randn(2, 10)
y = mlp(x)
assert y.shape == (2, 256)
def test_forward_with_empty_hidden_dims():
mlp = MLP(input_dim=10, hidden_dims=[])
x = torch.randn(1, 10)
assert mlp(x).shape == (1, 10)
def test_mlp_with_dropout():
mlp = MLP(input_dim=10, hidden_dims=[256, 256, 11], dropout_rate=0.1)
x = torch.randn(1, 10)
y = mlp(x)
assert y.shape == (1, 11)
drop_out_layers_count = sum(isinstance(layer, nn.Dropout) for layer in mlp.net)
assert drop_out_layers_count == 2
def test_mlp_with_custom_final_activation():
mlp = MLP(input_dim=10, hidden_dims=[256, 256], final_activation=torch.nn.Tanh())
x = torch.randn(1, 10)
y = mlp(x)
assert y.shape == (1, 256)
assert (y >= -1).all() and (y <= 1).all()
def test_gaussian_actor_policy_with_default_args():
with pytest.raises(ValueError, match="should be an instance of class `PreTrainedConfig`"):
GaussianActorPolicy()
def create_dummy_state(batch_size: int, state_dim: int = 10) -> Tensor:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_dummy_with_visual_input(batch_size: int, state_dim: int = 10) -> Tensor:
return {
OBS_IMAGE: torch.randn(batch_size, 3, 84, 84),
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_dummy_action(batch_size: int, action_dim: int = 10) -> Tensor:
return torch.randn(batch_size, action_dim)
def create_default_train_batch(
batch_size: int = 8, state_dim: int = 10, action_dim: int = 10
) -> dict[str, Tensor]:
return {
ACTION: create_dummy_action(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": create_dummy_state(batch_size, state_dim),
"next_state": create_dummy_state(batch_size, state_dim),
"done": torch.randn(batch_size),
}
def create_train_batch_with_visual_input(
batch_size: int = 8, state_dim: int = 10, action_dim: int = 10
) -> dict[str, Tensor]:
return {
ACTION: create_dummy_action(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": create_dummy_with_visual_input(batch_size, state_dim),
"next_state": create_dummy_with_visual_input(batch_size, state_dim),
"done": torch.randn(batch_size),
}
def create_observation_batch(batch_size: int = 8, state_dim: int = 10) -> dict[str, Tensor]:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_observation_batch_with_visual_input(batch_size: int = 8, state_dim: int = 10) -> dict[str, Tensor]:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
OBS_IMAGE: torch.randn(batch_size, 3, 84, 84),
}
def create_default_config(
state_dim: int, continuous_action_dim: int, has_discrete_action: bool = False
) -> GaussianActorConfig:
action_dim = continuous_action_dim
if has_discrete_action:
action_dim += 1
config = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(continuous_action_dim,))},
dataset_stats={
OBS_STATE: {
"min": [0.0] * state_dim,
"max": [1.0] * state_dim,
},
ACTION: {
"min": [0.0] * continuous_action_dim,
"max": [1.0] * continuous_action_dim,
},
},
)
config.validate_features()
return config
def create_config_with_visual_input(
state_dim: int, continuous_action_dim: int, has_discrete_action: bool = False
) -> GaussianActorConfig:
config = create_default_config(
state_dim=state_dim,
continuous_action_dim=continuous_action_dim,
has_discrete_action=has_discrete_action,
)
config.input_features[OBS_IMAGE] = PolicyFeature(type=FeatureType.VISUAL, shape=(3, 84, 84))
config.dataset_stats[OBS_IMAGE] = {
"mean": torch.randn(3, 1, 1),
"std": torch.randn(3, 1, 1),
}
config.state_encoder_hidden_dim = 32
config.latent_dim = 32
config.validate_features()
return config
def _make_algorithm(config: GaussianActorConfig) -> tuple[SACAlgorithm, GaussianActorPolicy]:
"""Helper to create policy + algorithm pair for tests that need critics."""
policy = GaussianActorPolicy(config=config)
policy.train()
algo_config = SACAlgorithmConfig.from_policy_config(config)
algorithm = SACAlgorithm(policy=policy, config=algo_config)
algorithm.make_optimizers_and_scheduler()
return algorithm, policy
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_gaussian_actor_policy_select_action(batch_size: int, state_dim: int, action_dim: int):
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
policy = GaussianActorPolicy(config=config)
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
selected_action = policy.select_action(observation_batch)
# squeeze(0) removes batch dim when batch_size==1
assert selected_action.shape[-1] == action_dim
def test_gaussian_actor_policy_select_action_with_discrete():
"""select_action should return continuous + discrete actions."""
config = create_default_config(state_dim=10, continuous_action_dim=6)
config.num_discrete_actions = 3
policy = GaussianActorPolicy(config=config)
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch(batch_size=1, state_dim=10)
# Squeeze to unbatched (single observation)
observation_batch = {k: v.squeeze(0) for k, v in observation_batch.items()}
selected_action = policy.select_action(observation_batch)
assert selected_action.shape[-1] == 7 # 6 continuous + 1 discrete
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_gaussian_actor_policy_forward(batch_size: int, state_dim: int, action_dim: int):
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
policy = GaussianActorPolicy(config=config)
policy.eval()
batch = create_default_train_batch(batch_size=batch_size, action_dim=action_dim, state_dim=state_dim)
with torch.no_grad():
output = policy.forward(batch)
assert "action" in output
assert "log_prob" in output
assert "action_mean" in output
assert output["action"].shape == (batch_size, action_dim)
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_gaussian_actor_training_through_sac(batch_size: int, state_dim: int, action_dim: int):
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
algorithm, policy = _make_algorithm(config)
batch = create_default_train_batch(batch_size=batch_size, action_dim=action_dim, state_dim=state_dim)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.item() is not None
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
actor_loss = algorithm._compute_loss_actor(forward_batch)
assert actor_loss.item() is not None
assert actor_loss.shape == ()
algorithm.optimizers["actor"].zero_grad()
actor_loss.backward()
algorithm.optimizers["actor"].step()
temp_loss = algorithm._compute_loss_temperature(forward_batch)
assert temp_loss.item() is not None
assert temp_loss.shape == ()
algorithm.optimizers["temperature"].zero_grad()
temp_loss.backward()
algorithm.optimizers["temperature"].step()
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_gaussian_actor_training_with_visual_input(batch_size: int, state_dim: int, action_dim: int):
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
algorithm, policy = _make_algorithm(config)
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.item() is not None
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
actor_loss = algorithm._compute_loss_actor(forward_batch)
assert actor_loss.item() is not None
assert actor_loss.shape == ()
algorithm.optimizers["actor"].zero_grad()
actor_loss.backward()
algorithm.optimizers["actor"].step()
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim
)
selected_action = policy.select_action(observation_batch)
assert selected_action.shape[-1] == action_dim
@pytest.mark.parametrize(
"batch_size,state_dim,action_dim,vision_encoder_name",
[(1, 6, 6, "lerobot/resnet10"), (1, 6, 6, "facebook/convnext-base-224")],
)
@pytest.mark.skipif(not TRANSFORMERS_AVAILABLE, reason="Transformers are not installed")
def test_gaussian_actor_policy_with_pretrained_encoder(
batch_size: int, state_dim: int, action_dim: int, vision_encoder_name: str
):
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
config.vision_encoder_name = vision_encoder_name
algorithm, policy = _make_algorithm(config)
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.item() is not None
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
actor_loss = algorithm._compute_loss_actor(forward_batch)
assert actor_loss.item() is not None
assert actor_loss.shape == ()
def test_gaussian_actor_training_with_shared_encoder():
batch_size = 2
action_dim = 10
state_dim = 10
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
config.shared_encoder = True
algorithm, policy = _make_algorithm(config)
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
actor_loss = algorithm._compute_loss_actor(forward_batch)
assert actor_loss.shape == ()
algorithm.optimizers["actor"].zero_grad()
actor_loss.backward()
algorithm.optimizers["actor"].step()
def test_gaussian_actor_training_with_discrete_critic():
batch_size = 2
continuous_action_dim = 9
full_action_dim = continuous_action_dim + 1
state_dim = 10
config = create_config_with_visual_input(
state_dim=state_dim, continuous_action_dim=continuous_action_dim, has_discrete_action=True
)
config.num_discrete_actions = 5
algorithm, policy = _make_algorithm(config)
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=full_action_dim
)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
discrete_critic_loss = algorithm._compute_loss_discrete_critic(forward_batch)
assert discrete_critic_loss.shape == ()
algorithm.optimizers["discrete_critic"].zero_grad()
discrete_critic_loss.backward()
algorithm.optimizers["discrete_critic"].step()
actor_loss = algorithm._compute_loss_actor(forward_batch)
assert actor_loss.shape == ()
algorithm.optimizers["actor"].zero_grad()
actor_loss.backward()
algorithm.optimizers["actor"].step()
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim
)
# Policy.select_action now handles both continuous + discrete
selected_action = policy.select_action({k: v.squeeze(0) for k, v in observation_batch.items()})
assert selected_action.shape[-1] == continuous_action_dim + 1
def test_sac_algorithm_target_entropy():
"""Target entropy is an SAC hyperparameter and lives on the algorithm."""
config = create_default_config(continuous_action_dim=10, state_dim=10)
algorithm, _ = _make_algorithm(config)
assert algorithm.target_entropy == -5.0
def test_sac_algorithm_target_entropy_with_discrete_action():
config = create_config_with_visual_input(state_dim=10, continuous_action_dim=6, has_discrete_action=True)
config.num_discrete_actions = 5
algorithm, _ = _make_algorithm(config)
assert algorithm.target_entropy == -3.5
def test_sac_algorithm_temperature():
import math
config = create_default_config(continuous_action_dim=10, state_dim=10)
algo_config = SACAlgorithmConfig.from_policy_config(config)
policy = GaussianActorPolicy(config=config)
algorithm = SACAlgorithm(policy=policy, config=algo_config)
assert algorithm.temperature == pytest.approx(1.0)
algorithm.log_alpha.data = torch.tensor([math.log(0.1)])
assert algorithm.temperature == pytest.approx(0.1)
def test_sac_algorithm_update_target_network():
config = create_default_config(state_dim=10, continuous_action_dim=6)
algo_config = SACAlgorithmConfig.from_policy_config(config)
algo_config.critic_target_update_weight = 1.0
policy = GaussianActorPolicy(config=config)
algorithm = SACAlgorithm(policy=policy, config=algo_config)
for p in algorithm.critic_ensemble.parameters():
p.data = torch.ones_like(p.data)
algorithm._update_target_networks()
for p in algorithm.critic_target.parameters():
assert torch.allclose(p.data, torch.ones_like(p.data))
@pytest.mark.parametrize("num_critics", [1, 3])
def test_sac_algorithm_with_critics_number_of_heads(num_critics: int):
batch_size = 2
action_dim = 10
state_dim = 10
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
policy = GaussianActorPolicy(config=config)
policy.train()
algo_config = SACAlgorithmConfig.from_policy_config(config)
algo_config.num_critics = num_critics
algorithm = SACAlgorithm(policy=policy, config=algo_config)
algorithm.make_optimizers_and_scheduler()
assert len(algorithm.critic_ensemble.critics) == num_critics
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
forward_batch = algorithm._prepare_forward_batch(batch)
critic_loss = algorithm._compute_loss_critic(forward_batch)
assert critic_loss.shape == ()
algorithm.optimizers["critic"].zero_grad()
critic_loss.backward()
algorithm.optimizers["critic"].step()
def test_gaussian_actor_policy_save_and_load(tmp_path):
"""Test that the policy can be saved and loaded from pretrained."""
root = tmp_path / "test_gaussian_actor_save_and_load"
state_dim = 10
action_dim = 10
batch_size = 2
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
policy = GaussianActorPolicy(config=config)
policy.eval()
policy.save_pretrained(root)
loaded_policy = GaussianActorPolicy.from_pretrained(root, config=config)
loaded_policy.eval()
assert policy.state_dict().keys() == loaded_policy.state_dict().keys()
for k in policy.state_dict():
assert torch.allclose(policy.state_dict()[k], loaded_policy.state_dict()[k], atol=1e-6)
with torch.no_grad():
with seeded_context(12):
observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
actions = policy.select_action(observation_batch)
with seeded_context(12):
loaded_observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
loaded_actions = loaded_policy.select_action(loaded_observation_batch)
assert torch.allclose(actions, loaded_actions)
def test_gaussian_actor_policy_save_and_load_with_discrete_critic(tmp_path):
"""Discrete critic should be saved/loaded as part of the policy."""
root = tmp_path / "test_gaussian_actor_save_and_load_discrete"
state_dim = 10
action_dim = 6
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
config.num_discrete_actions = 3
policy = GaussianActorPolicy(config=config)
policy.eval()
policy.save_pretrained(root)
loaded_policy = GaussianActorPolicy.from_pretrained(root, config=config)
loaded_policy.eval()
assert loaded_policy.discrete_critic is not None
dc_keys = [k for k in loaded_policy.state_dict() if k.startswith("discrete_critic.")]
assert len(dc_keys) > 0
for k in policy.state_dict():
assert torch.allclose(policy.state_dict()[k], loaded_policy.state_dict()[k], atol=1e-6)
-546
View File
@@ -1,546 +0,0 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import pytest
import torch
from torch import Tensor, nn
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.policies.sac.modeling_sac import MLP, SACPolicy
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE
from lerobot.utils.random_utils import seeded_context, set_seed
try:
import transformers # noqa: F401
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
@pytest.fixture(autouse=True)
def set_random_seed():
seed = 42
set_seed(seed)
def test_mlp_with_default_args():
mlp = MLP(input_dim=10, hidden_dims=[256, 256])
x = torch.randn(10)
y = mlp(x)
assert y.shape == (256,)
def test_mlp_with_batch_dim():
mlp = MLP(input_dim=10, hidden_dims=[256, 256])
x = torch.randn(2, 10)
y = mlp(x)
assert y.shape == (2, 256)
def test_forward_with_empty_hidden_dims():
mlp = MLP(input_dim=10, hidden_dims=[])
x = torch.randn(1, 10)
assert mlp(x).shape == (1, 10)
def test_mlp_with_dropout():
mlp = MLP(input_dim=10, hidden_dims=[256, 256, 11], dropout_rate=0.1)
x = torch.randn(1, 10)
y = mlp(x)
assert y.shape == (1, 11)
drop_out_layers_count = sum(isinstance(layer, nn.Dropout) for layer in mlp.net)
assert drop_out_layers_count == 2
def test_mlp_with_custom_final_activation():
mlp = MLP(input_dim=10, hidden_dims=[256, 256], final_activation=torch.nn.Tanh())
x = torch.randn(1, 10)
y = mlp(x)
assert y.shape == (1, 256)
assert (y >= -1).all() and (y <= 1).all()
def test_sac_policy_with_default_args():
with pytest.raises(ValueError, match="should be an instance of class `PreTrainedConfig`"):
SACPolicy()
def create_dummy_state(batch_size: int, state_dim: int = 10) -> Tensor:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_dummy_with_visual_input(batch_size: int, state_dim: int = 10) -> Tensor:
return {
OBS_IMAGE: torch.randn(batch_size, 3, 84, 84),
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_dummy_action(batch_size: int, action_dim: int = 10) -> Tensor:
return torch.randn(batch_size, action_dim)
def create_default_train_batch(
batch_size: int = 8, state_dim: int = 10, action_dim: int = 10
) -> dict[str, Tensor]:
return {
ACTION: create_dummy_action(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": create_dummy_state(batch_size, state_dim),
"next_state": create_dummy_state(batch_size, state_dim),
"done": torch.randn(batch_size),
}
def create_train_batch_with_visual_input(
batch_size: int = 8, state_dim: int = 10, action_dim: int = 10
) -> dict[str, Tensor]:
return {
ACTION: create_dummy_action(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": create_dummy_with_visual_input(batch_size, state_dim),
"next_state": create_dummy_with_visual_input(batch_size, state_dim),
"done": torch.randn(batch_size),
}
def create_observation_batch(batch_size: int = 8, state_dim: int = 10) -> dict[str, Tensor]:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
}
def create_observation_batch_with_visual_input(batch_size: int = 8, state_dim: int = 10) -> dict[str, Tensor]:
return {
OBS_STATE: torch.randn(batch_size, state_dim),
OBS_IMAGE: torch.randn(batch_size, 3, 84, 84),
}
def make_optimizers(policy: SACPolicy, has_discrete_action: bool = False) -> dict[str, torch.optim.Optimizer]:
"""Create optimizers for the SAC policy."""
optimizer_actor = torch.optim.Adam(
# Handle the case of shared encoder where the encoder weights are not optimized with the actor gradient
params=[
p
for n, p in policy.actor.named_parameters()
if not policy.config.shared_encoder or not n.startswith("encoder")
],
lr=policy.config.actor_lr,
)
optimizer_critic = torch.optim.Adam(
params=policy.critic_ensemble.parameters(),
lr=policy.config.critic_lr,
)
optimizer_temperature = torch.optim.Adam(
params=[policy.log_alpha],
lr=policy.config.critic_lr,
)
optimizers = {
"actor": optimizer_actor,
"critic": optimizer_critic,
"temperature": optimizer_temperature,
}
if has_discrete_action:
optimizers["discrete_critic"] = torch.optim.Adam(
params=policy.discrete_critic.parameters(),
lr=policy.config.critic_lr,
)
return optimizers
def create_default_config(
state_dim: int, continuous_action_dim: int, has_discrete_action: bool = False
) -> SACConfig:
action_dim = continuous_action_dim
if has_discrete_action:
action_dim += 1
config = SACConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(continuous_action_dim,))},
dataset_stats={
OBS_STATE: {
"min": [0.0] * state_dim,
"max": [1.0] * state_dim,
},
ACTION: {
"min": [0.0] * continuous_action_dim,
"max": [1.0] * continuous_action_dim,
},
},
)
config.validate_features()
return config
def create_config_with_visual_input(
state_dim: int, continuous_action_dim: int, has_discrete_action: bool = False
) -> SACConfig:
config = create_default_config(
state_dim=state_dim,
continuous_action_dim=continuous_action_dim,
has_discrete_action=has_discrete_action,
)
config.input_features[OBS_IMAGE] = PolicyFeature(type=FeatureType.VISUAL, shape=(3, 84, 84))
config.dataset_stats[OBS_IMAGE] = {
"mean": torch.randn(3, 1, 1),
"std": torch.randn(3, 1, 1),
}
# Let make tests a little bit faster
config.state_encoder_hidden_dim = 32
config.latent_dim = 32
config.validate_features()
return config
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_sac_policy_with_default_config(batch_size: int, state_dim: int, action_dim: int):
batch = create_default_train_batch(batch_size=batch_size, action_dim=action_dim, state_dim=state_dim)
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
policy = SACPolicy(config=config)
policy.train()
optimizers = make_optimizers(policy)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
assert actor_loss.item() is not None
assert actor_loss.shape == ()
actor_loss.backward()
optimizers["actor"].step()
temperature_loss = policy.forward(batch, model="temperature")["loss_temperature"]
assert temperature_loss.item() is not None
assert temperature_loss.shape == ()
temperature_loss.backward()
optimizers["temperature"].step()
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
selected_action = policy.select_action(observation_batch)
assert selected_action.shape == (batch_size, action_dim)
@pytest.mark.parametrize("batch_size,state_dim,action_dim", [(2, 6, 6), (1, 10, 10)])
def test_sac_policy_with_visual_input(batch_size: int, state_dim: int, action_dim: int):
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
policy = SACPolicy(config=config)
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
policy.train()
optimizers = make_optimizers(policy)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
assert actor_loss.item() is not None
assert actor_loss.shape == ()
actor_loss.backward()
optimizers["actor"].step()
temperature_loss = policy.forward(batch, model="temperature")["loss_temperature"]
assert temperature_loss.item() is not None
assert temperature_loss.shape == ()
temperature_loss.backward()
optimizers["temperature"].step()
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim
)
selected_action = policy.select_action(observation_batch)
assert selected_action.shape == (batch_size, action_dim)
# Let's check best candidates for pretrained encoders
@pytest.mark.parametrize(
"batch_size,state_dim,action_dim,vision_encoder_name",
[(1, 6, 6, "helper2424/resnet10"), (1, 6, 6, "facebook/convnext-base-224")],
)
@pytest.mark.skipif(not TRANSFORMERS_AVAILABLE, reason="Transformers are not installed")
@pytest.mark.skip(
reason="helper2424/resnet10 needs to be updated to work with the latest version of transformers"
)
def test_sac_policy_with_pretrained_encoder(
batch_size: int, state_dim: int, action_dim: int, vision_encoder_name: str
):
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
config.vision_encoder_name = vision_encoder_name
policy = SACPolicy(config=config)
policy.train()
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
optimizers = make_optimizers(policy)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
assert actor_loss.item() is not None
assert actor_loss.shape == ()
def test_sac_policy_with_shared_encoder():
batch_size = 2
action_dim = 10
state_dim = 10
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
config.shared_encoder = True
policy = SACPolicy(config=config)
policy.train()
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
policy.train()
optimizers = make_optimizers(policy)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
assert actor_loss.item() is not None
assert actor_loss.shape == ()
actor_loss.backward()
optimizers["actor"].step()
def test_sac_policy_with_discrete_critic():
batch_size = 2
continuous_action_dim = 9
full_action_dim = continuous_action_dim + 1 # the last action is discrete
state_dim = 10
config = create_config_with_visual_input(
state_dim=state_dim, continuous_action_dim=continuous_action_dim, has_discrete_action=True
)
num_discrete_actions = 5
config.num_discrete_actions = num_discrete_actions
policy = SACPolicy(config=config)
policy.train()
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=full_action_dim
)
policy.train()
optimizers = make_optimizers(policy, has_discrete_action=True)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
discrete_critic_loss = policy.forward(batch, model="discrete_critic")["loss_discrete_critic"]
assert discrete_critic_loss.item() is not None
assert discrete_critic_loss.shape == ()
discrete_critic_loss.backward()
optimizers["discrete_critic"].step()
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
assert actor_loss.item() is not None
assert actor_loss.shape == ()
actor_loss.backward()
optimizers["actor"].step()
policy.eval()
with torch.no_grad():
observation_batch = create_observation_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim
)
selected_action = policy.select_action(observation_batch)
assert selected_action.shape == (batch_size, full_action_dim)
discrete_actions = selected_action[:, -1].long()
discrete_action_values = set(discrete_actions.tolist())
assert all(action in range(num_discrete_actions) for action in discrete_action_values), (
f"Discrete action {discrete_action_values} is not in range({num_discrete_actions})"
)
def test_sac_policy_with_default_entropy():
config = create_default_config(continuous_action_dim=10, state_dim=10)
policy = SACPolicy(config=config)
assert policy.target_entropy == -5.0
def test_sac_policy_default_target_entropy_with_discrete_action():
config = create_config_with_visual_input(state_dim=10, continuous_action_dim=6, has_discrete_action=True)
policy = SACPolicy(config=config)
assert policy.target_entropy == -3.0
def test_sac_policy_with_predefined_entropy():
config = create_default_config(state_dim=10, continuous_action_dim=6)
config.target_entropy = -3.5
policy = SACPolicy(config=config)
assert policy.target_entropy == pytest.approx(-3.5)
def test_sac_policy_update_temperature():
"""Test that temperature property is always in sync with log_alpha."""
config = create_default_config(continuous_action_dim=10, state_dim=10)
policy = SACPolicy(config=config)
assert policy.temperature == pytest.approx(1.0)
policy.log_alpha.data = torch.tensor([math.log(0.1)])
# Temperature property automatically reflects log_alpha changes
assert policy.temperature == pytest.approx(0.1)
def test_sac_policy_update_target_network():
config = create_default_config(state_dim=10, continuous_action_dim=6)
config.critic_target_update_weight = 1.0
policy = SACPolicy(config=config)
policy.train()
for p in policy.critic_ensemble.parameters():
p.data = torch.ones_like(p.data)
policy.update_target_networks()
for p in policy.critic_target.parameters():
assert torch.allclose(p.data, torch.ones_like(p.data)), (
f"Target network {p.data} is not equal to {torch.ones_like(p.data)}"
)
@pytest.mark.parametrize("num_critics", [1, 3])
def test_sac_policy_with_critics_number_of_heads(num_critics: int):
batch_size = 2
action_dim = 10
state_dim = 10
config = create_config_with_visual_input(state_dim=state_dim, continuous_action_dim=action_dim)
config.num_critics = num_critics
policy = SACPolicy(config=config)
policy.train()
assert len(policy.critic_ensemble.critics) == num_critics
batch = create_train_batch_with_visual_input(
batch_size=batch_size, state_dim=state_dim, action_dim=action_dim
)
policy.train()
optimizers = make_optimizers(policy)
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
assert cirtic_loss.item() is not None
assert cirtic_loss.shape == ()
cirtic_loss.backward()
optimizers["critic"].step()
def test_sac_policy_save_and_load(tmp_path):
root = tmp_path / "test_sac_save_and_load"
state_dim = 10
action_dim = 10
batch_size = 2
config = create_default_config(state_dim=state_dim, continuous_action_dim=action_dim)
policy = SACPolicy(config=config)
policy.eval()
policy.save_pretrained(root)
loaded_policy = SACPolicy.from_pretrained(root, config=config)
loaded_policy.eval()
batch = create_default_train_batch(batch_size=1, state_dim=10, action_dim=10)
with torch.no_grad():
with seeded_context(12):
# Collect policy values before saving
cirtic_loss = policy.forward(batch, model="critic")["loss_critic"]
actor_loss = policy.forward(batch, model="actor")["loss_actor"]
temperature_loss = policy.forward(batch, model="temperature")["loss_temperature"]
observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
actions = policy.select_action(observation_batch)
with seeded_context(12):
# Collect policy values after loading
loaded_cirtic_loss = loaded_policy.forward(batch, model="critic")["loss_critic"]
loaded_actor_loss = loaded_policy.forward(batch, model="actor")["loss_actor"]
loaded_temperature_loss = loaded_policy.forward(batch, model="temperature")["loss_temperature"]
loaded_observation_batch = create_observation_batch(batch_size=batch_size, state_dim=state_dim)
loaded_actions = loaded_policy.select_action(loaded_observation_batch)
assert policy.state_dict().keys() == loaded_policy.state_dict().keys()
for k in policy.state_dict():
assert torch.allclose(policy.state_dict()[k], loaded_policy.state_dict()[k], atol=1e-6)
# Compare values before and after saving and loading
# They should be the same
assert torch.allclose(cirtic_loss, loaded_cirtic_loss)
assert torch.allclose(actor_loss, loaded_actor_loss)
assert torch.allclose(temperature_loss, loaded_temperature_loss)
assert torch.allclose(actions, loaded_actions)
+2 -2
View File
@@ -21,8 +21,8 @@ import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.policies.sac.reward_model.processor_classifier import make_classifier_processor
from lerobot.policies.gaussian_actor.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.policies.gaussian_actor.reward_model.processor_classifier import make_classifier_processor
from lerobot.processor import (
DataProcessorPipeline,
DeviceProcessorStep,
@@ -21,8 +21,8 @@ import pytest
import torch
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import GaussianActorConfig
from lerobot.policies.gaussian_actor.processor_gaussian_actor import make_gaussian_actor_pre_post_processors
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
@@ -38,7 +38,7 @@ from lerobot.utils.constants import ACTION, OBS_STATE
def create_default_config():
"""Create a default SAC configuration for testing."""
config = SACConfig()
config = GaussianActorConfig()
config.input_features = {
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
}
@@ -66,7 +66,7 @@ def test_make_sac_processor_basic():
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -88,12 +88,12 @@ def test_make_sac_processor_basic():
assert isinstance(postprocessor.steps[1], DeviceProcessorStep)
def test_sac_processor_normalization_modes():
def test_gaussian_actor_processor_normalization_modes():
"""Test that SAC processor correctly handles different normalization modes."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -121,13 +121,13 @@ def test_sac_processor_normalization_modes():
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_sac_processor_cuda():
def test_gaussian_actor_processor_cuda():
"""Test SAC processor with CUDA device."""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -153,13 +153,13 @@ def test_sac_processor_cuda():
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_sac_processor_accelerate_scenario():
def test_gaussian_actor_processor_accelerate_scenario():
"""Test SAC processor in simulated Accelerate scenario."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -180,13 +180,13 @@ def test_sac_processor_accelerate_scenario():
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
def test_sac_processor_multi_gpu():
def test_gaussian_actor_processor_multi_gpu():
"""Test SAC processor with multi-GPU setup."""
config = create_default_config()
config.device = "cuda:0"
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -206,11 +206,11 @@ def test_sac_processor_multi_gpu():
assert processed[TransitionKey.ACTION.value].device == device
def test_sac_processor_without_stats():
def test_gaussian_actor_processor_without_stats():
"""Test SAC processor creation without dataset statistics."""
config = create_default_config()
preprocessor, postprocessor = make_sac_pre_post_processors(config, dataset_stats=None)
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(config, dataset_stats=None)
# Should still create processors
assert preprocessor is not None
@@ -226,12 +226,12 @@ def test_sac_processor_without_stats():
assert processed is not None
def test_sac_processor_save_and_load():
def test_gaussian_actor_processor_save_and_load():
"""Test saving and loading SAC processor."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -257,14 +257,14 @@ def test_sac_processor_save_and_load():
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_sac_processor_mixed_precision():
def test_gaussian_actor_processor_mixed_precision():
"""Test SAC processor with mixed precision."""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
# Create processor
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -304,12 +304,12 @@ def test_sac_processor_mixed_precision():
assert processed[TransitionKey.ACTION.value].dtype == torch.float16
def test_sac_processor_batch_data():
def test_gaussian_actor_processor_batch_data():
"""Test SAC processor with batched data."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -329,12 +329,12 @@ def test_sac_processor_batch_data():
assert processed[TransitionKey.ACTION.value].shape == (batch_size, 5)
def test_sac_processor_edge_cases():
def test_gaussian_actor_processor_edge_cases():
"""Test SAC processor with edge cases."""
config = create_default_config()
stats = create_default_stats()
preprocessor, postprocessor = make_sac_pre_post_processors(
preprocessor, postprocessor = make_gaussian_actor_pre_post_processors(
config,
stats,
)
@@ -358,13 +358,13 @@ def test_sac_processor_edge_cases():
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_sac_processor_bfloat16_device_float32_normalizer():
def test_gaussian_actor_processor_bfloat16_device_float32_normalizer():
"""Test: DeviceProcessor(bfloat16) + NormalizerProcessor(float32) → output bfloat16 via automatic adaptation"""
config = create_default_config()
config.device = "cuda"
stats = create_default_stats()
preprocessor, _ = make_sac_pre_post_processors(
preprocessor, _ = make_gaussian_actor_pre_post_processors(
config,
stats,
)
+14 -7
View File
@@ -1804,13 +1804,15 @@ def test_stats_override_preservation_in_load_state_dict():
override_normalizer.stats[key][stat_name], original_stats[key][stat_name]
), f"Stats for {key}.{stat_name} should not match original stats"
# Verify that _tensor_stats are also correctly set to match the override stats
# Verify that _tensor_stats values match the override stats
# Note: visual stats are reshaped from (C,) to (C,1,1) by _reshape_visual_stats
expected_tensor_stats = to_tensor(override_stats)
for key in expected_tensor_stats:
for stat_name in expected_tensor_stats[key]:
if isinstance(expected_tensor_stats[key][stat_name], torch.Tensor):
torch.testing.assert_close(
override_normalizer._tensor_stats[key][stat_name], expected_tensor_stats[key][stat_name]
override_normalizer._tensor_stats[key][stat_name].squeeze(),
expected_tensor_stats[key][stat_name].squeeze(),
)
@@ -1849,12 +1851,16 @@ def test_stats_without_override_loads_normally():
# Stats should now match the original stats (normal behavior)
# Check that all keys and values match
assert set(new_normalizer.stats.keys()) == set(original_stats.keys())
# Note: visual stats are reshaped from (C,) to (C,1,1) by _reshape_visual_stats,
# so we squeeze before comparing values.
for key in original_stats:
assert set(new_normalizer.stats[key].keys()) == set(original_stats[key].keys())
for stat_name in original_stats[key]:
np.testing.assert_allclose(
new_normalizer.stats[key][stat_name], original_stats[key][stat_name], rtol=1e-6, atol=1e-6
)
actual = new_normalizer.stats[key][stat_name]
expected = original_stats[key][stat_name]
if hasattr(actual, "squeeze"):
actual = actual.squeeze()
np.testing.assert_allclose(actual, expected, rtol=1e-6, atol=1e-6)
def test_stats_explicit_provided_flag_detection():
@@ -2075,8 +2081,9 @@ def test_stats_reconstruction_after_load_state_dict():
assert ACTION in new_normalizer.stats
# Check that values are correct (converted back from tensors)
np.testing.assert_allclose(new_normalizer.stats[OBS_IMAGE]["mean"], [0.5, 0.5, 0.5])
np.testing.assert_allclose(new_normalizer.stats[OBS_IMAGE]["std"], [0.2, 0.2, 0.2])
# Note: visual stats are reshaped to (C,1,1), so we squeeze before comparing
np.testing.assert_allclose(new_normalizer.stats[OBS_IMAGE]["mean"].squeeze(), [0.5, 0.5, 0.5])
np.testing.assert_allclose(new_normalizer.stats[OBS_IMAGE]["std"].squeeze(), [0.2, 0.2, 0.2])
np.testing.assert_allclose(new_normalizer.stats[OBS_STATE]["min"], [0.0, -1.0])
np.testing.assert_allclose(new_normalizer.stats[OBS_STATE]["max"], [1.0, 1.0])
np.testing.assert_allclose(new_normalizer.stats[ACTION]["mean"], [0.0, 0.0])
+167 -4
View File
@@ -22,12 +22,14 @@ import pytest
import torch
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("grpc")
from torch.multiprocessing import Event, Queue
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.utils.constants import OBS_STR
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import GaussianActorConfig
from lerobot.rl.train_rl import TrainRLServerPipelineConfig
from lerobot.utils.constants import ACTION, OBS_STATE, OBS_STR
from lerobot.utils.transition import Transition
from tests.utils import skip_if_package_missing
@@ -79,7 +81,7 @@ def cfg():
port = find_free_port()
policy_cfg = SACConfig()
policy_cfg = GaussianActorConfig()
policy_cfg.actor_learner_config.learner_host = "127.0.0.1"
policy_cfg.actor_learner_config.learner_port = port
policy_cfg.concurrency.actor = "threads"
@@ -299,3 +301,164 @@ def test_end_to_end_parameters_flow(cfg, data_size):
assert received_params.keys() == input_params.keys()
for key in input_params:
assert torch.allclose(received_params[key], input_params[key])
def test_learner_algorithm_wiring():
"""Verify that make_algorithm constructs an SACAlgorithm from config,
make_optimizers_and_scheduler() creates the right optimizers, update() works, and
get_weights() output is serializable."""
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy
from lerobot.rl.algorithms.factory import make_algorithm
from lerobot.rl.algorithms.sac import SACAlgorithm, SACAlgorithmConfig
from lerobot.transport.utils import state_to_bytes
state_dim = 10
action_dim = 6
sac_cfg = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))},
dataset_stats={
OBS_STATE: {"min": [0.0] * state_dim, "max": [1.0] * state_dim},
ACTION: {"min": [0.0] * action_dim, "max": [1.0] * action_dim},
},
)
sac_cfg.validate_features()
policy = GaussianActorPolicy(config=sac_cfg)
policy.train()
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert isinstance(algorithm, SACAlgorithm)
optimizers = algorithm.make_optimizers_and_scheduler()
assert "actor" in optimizers
assert "critic" in optimizers
assert "temperature" in optimizers
batch_size = 4
def batch_iterator():
while True:
yield {
ACTION: torch.randn(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": {OBS_STATE: torch.randn(batch_size, state_dim)},
"next_state": {OBS_STATE: torch.randn(batch_size, state_dim)},
"done": torch.zeros(batch_size),
"complementary_info": {},
}
stats = algorithm.update(batch_iterator())
assert "loss_critic" in stats.losses
# get_weights -> state_to_bytes round-trip
weights = algorithm.get_weights()
assert len(weights) > 0
serialized = state_to_bytes(weights)
assert isinstance(serialized, bytes)
assert len(serialized) > 0
# RLTrainer with DataMixer
from lerobot.rl.buffer import ReplayBuffer
from lerobot.rl.data_sources import OnlineOfflineMixer
from lerobot.rl.trainer import RLTrainer
replay_buffer = ReplayBuffer(
capacity=50,
device="cpu",
state_keys=[OBS_STATE],
storage_device="cpu",
use_drq=False,
)
for _ in range(50):
replay_buffer.add(
state={OBS_STATE: torch.randn(state_dim)},
action=torch.randn(action_dim),
reward=1.0,
next_state={OBS_STATE: torch.randn(state_dim)},
done=False,
truncated=False,
)
data_mixer = OnlineOfflineMixer(online_buffer=replay_buffer, offline_buffer=None)
trainer = RLTrainer(
algorithm=algorithm,
data_mixer=data_mixer,
batch_size=batch_size,
)
trainer_stats = trainer.training_step()
assert "loss_critic" in trainer_stats.losses
def test_initial_and_periodic_weight_push_consistency():
"""Both initial and periodic weight pushes should use algorithm.get_weights()
and produce identical structures."""
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy
from lerobot.rl.algorithms.factory import make_algorithm
from lerobot.rl.algorithms.sac import SACAlgorithmConfig
from lerobot.transport.utils import bytes_to_state_dict, state_to_bytes
state_dim = 10
action_dim = 6
sac_cfg = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))},
dataset_stats={
OBS_STATE: {"min": [0.0] * state_dim, "max": [1.0] * state_dim},
ACTION: {"min": [0.0] * action_dim, "max": [1.0] * action_dim},
},
)
sac_cfg.validate_features()
policy = GaussianActorPolicy(config=sac_cfg)
policy.train()
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
algorithm.make_optimizers_and_scheduler()
# Simulate initial push (same code path the learner now uses)
initial_weights = algorithm.get_weights()
initial_bytes = state_to_bytes(initial_weights)
# Simulate periodic push
periodic_weights = algorithm.get_weights()
periodic_bytes = state_to_bytes(periodic_weights)
initial_decoded = bytes_to_state_dict(initial_bytes)
periodic_decoded = bytes_to_state_dict(periodic_bytes)
assert initial_decoded.keys() == periodic_decoded.keys()
def test_actor_side_algorithm_select_action_and_load_weights():
"""Simulate actor: create algorithm without optimizers, select_action, load_weights."""
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy
from lerobot.rl.algorithms.factory import make_algorithm
from lerobot.rl.algorithms.sac import SACAlgorithm, SACAlgorithmConfig
state_dim = 10
action_dim = 6
sac_cfg = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))},
dataset_stats={
OBS_STATE: {"min": [0.0] * state_dim, "max": [1.0] * state_dim},
ACTION: {"min": [0.0] * action_dim, "max": [1.0] * action_dim},
},
)
sac_cfg.validate_features()
# Actor side: no optimizers
policy = GaussianActorPolicy(config=sac_cfg)
policy.eval()
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert isinstance(algorithm, SACAlgorithm)
assert algorithm.optimizers == {}
# select_action should work
obs = {OBS_STATE: torch.randn(state_dim)}
action = policy.select_action(obs)
assert action.shape == (action_dim,)
# Simulate receiving weights from learner
fake_weights = algorithm.get_weights()
algorithm.load_weights(fake_weights, device="cpu")
+89
View File
@@ -0,0 +1,89 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for RL data mixing (DataMixer, OnlineOfflineMixer)."""
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import torch # noqa: E402
from lerobot.rl.buffer import ReplayBuffer # noqa: E402
from lerobot.rl.data_sources import OnlineOfflineMixer # noqa: E402
from lerobot.utils.constants import OBS_STATE # noqa: E402
def _make_buffer(capacity: int = 100, state_dim: int = 4) -> ReplayBuffer:
buf = ReplayBuffer(
capacity=capacity,
device="cpu",
state_keys=[OBS_STATE],
storage_device="cpu",
use_drq=False,
)
for i in range(capacity):
buf.add(
state={OBS_STATE: torch.randn(state_dim)},
action=torch.randn(2),
reward=1.0,
next_state={OBS_STATE: torch.randn(state_dim)},
done=bool(i % 10 == 9),
truncated=False,
)
return buf
def test_online_only_mixer_sample():
"""OnlineOfflineMixer with no offline buffer returns online-only batches."""
buf = _make_buffer(capacity=50)
mixer = OnlineOfflineMixer(online_buffer=buf, offline_buffer=None, online_ratio=0.5)
batch = mixer.sample(batch_size=8)
assert batch["state"][OBS_STATE].shape[0] == 8
assert batch["action"].shape[0] == 8
assert batch["reward"].shape[0] == 8
def test_online_only_mixer_ratio_one():
"""OnlineOfflineMixer with online_ratio=1.0 and no offline is equivalent to online-only."""
buf = _make_buffer(capacity=50)
mixer = OnlineOfflineMixer(online_buffer=buf, offline_buffer=None, online_ratio=1.0)
batch = mixer.sample(batch_size=10)
assert batch["state"][OBS_STATE].shape[0] == 10
def test_online_offline_mixer_sample():
"""OnlineOfflineMixer with two buffers returns concatenated batches."""
online = _make_buffer(capacity=50)
offline = _make_buffer(capacity=50)
mixer = OnlineOfflineMixer(
online_buffer=online,
offline_buffer=offline,
online_ratio=0.5,
)
batch = mixer.sample(batch_size=10)
assert batch["state"][OBS_STATE].shape[0] == 10
assert batch["action"].shape[0] == 10
# 5 from online, 5 from offline (approx)
assert batch["reward"].shape[0] == 10
def test_online_offline_mixer_iterator():
"""get_iterator yields batches of the requested size."""
buf = _make_buffer(capacity=50)
mixer = OnlineOfflineMixer(online_buffer=buf, offline_buffer=None)
it = mixer.get_iterator(batch_size=4, async_prefetch=False)
batch1 = next(it)
batch2 = next(it)
assert batch1["state"][OBS_STATE].shape[0] == 4
assert batch2["state"][OBS_STATE].shape[0] == 4
+1 -1
View File
@@ -20,7 +20,7 @@ from queue import Queue
import pytest
pytest.importorskip("grpc")
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from torch.multiprocessing import Queue as TorchMPQueue # noqa: E402
+525
View File
@@ -0,0 +1,525 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the RL algorithm abstraction and SACAlgorithm implementation."""
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import torch # noqa: E402
from lerobot.configs.types import FeatureType, PolicyFeature # noqa: E402
from lerobot.policies.gaussian_actor.configuration_gaussian_actor import GaussianActorConfig # noqa: E402
from lerobot.policies.gaussian_actor.modeling_gaussian_actor import GaussianActorPolicy # noqa: E402
from lerobot.rl.algorithms.configs import RLAlgorithmConfig, TrainingStats # noqa: E402
from lerobot.rl.algorithms.factory import make_algorithm # noqa: E402
from lerobot.rl.algorithms.sac import SACAlgorithm, SACAlgorithmConfig # noqa: E402
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE # noqa: E402
from lerobot.utils.random_utils import set_seed # noqa: E402
# ---------------------------------------------------------------------------
# Helpers (reuse patterns from tests/policies/test_gaussian_actor_policy.py)
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def set_random_seed():
set_seed(42)
def _make_sac_config(
state_dim: int = 10,
action_dim: int = 6,
num_discrete_actions: int | None = None,
with_images: bool = False,
) -> GaussianActorConfig:
config = GaussianActorConfig(
input_features={OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(state_dim,))},
output_features={ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))},
dataset_stats={
OBS_STATE: {"min": [0.0] * state_dim, "max": [1.0] * state_dim},
ACTION: {"min": [0.0] * action_dim, "max": [1.0] * action_dim},
},
num_discrete_actions=num_discrete_actions,
)
if with_images:
config.input_features[OBS_IMAGE] = PolicyFeature(type=FeatureType.VISUAL, shape=(3, 84, 84))
config.dataset_stats[OBS_IMAGE] = {
"mean": torch.randn(3, 1, 1).tolist(),
"std": torch.randn(3, 1, 1).abs().tolist(),
}
config.latent_dim = 32
config.state_encoder_hidden_dim = 32
config.validate_features()
return config
def _make_algorithm(
state_dim: int = 10,
action_dim: int = 6,
utd_ratio: int = 1,
policy_update_freq: int = 1,
num_discrete_actions: int | None = None,
with_images: bool = False,
) -> tuple[SACAlgorithm, GaussianActorPolicy]:
sac_cfg = _make_sac_config(
state_dim=state_dim,
action_dim=action_dim,
num_discrete_actions=num_discrete_actions,
with_images=with_images,
)
policy = GaussianActorPolicy(config=sac_cfg)
policy.train()
algo_config = SACAlgorithmConfig.from_policy_config(sac_cfg)
algo_config.utd_ratio = utd_ratio
algo_config.policy_update_freq = policy_update_freq
algorithm = SACAlgorithm(policy=policy, config=algo_config)
algorithm.make_optimizers_and_scheduler()
return algorithm, policy
def _make_batch(
batch_size: int = 4,
state_dim: int = 10,
action_dim: int = 6,
with_images: bool = False,
) -> dict:
obs = {OBS_STATE: torch.randn(batch_size, state_dim)}
next_obs = {OBS_STATE: torch.randn(batch_size, state_dim)}
if with_images:
obs[OBS_IMAGE] = torch.randn(batch_size, 3, 84, 84)
next_obs[OBS_IMAGE] = torch.randn(batch_size, 3, 84, 84)
return {
ACTION: torch.randn(batch_size, action_dim),
"reward": torch.randn(batch_size),
"state": obs,
"next_state": next_obs,
"done": torch.zeros(batch_size),
"complementary_info": {},
}
def _batch_iterator(**batch_kwargs):
"""Infinite iterator that yields fresh batches (mirrors a real DataMixer iterator)."""
while True:
yield _make_batch(**batch_kwargs)
# ===========================================================================
# Registry / config tests
# ===========================================================================
def test_sac_algorithm_config_registered():
"""SACAlgorithmConfig should be discoverable through the registry."""
assert "sac" in RLAlgorithmConfig.get_known_choices()
cls = RLAlgorithmConfig.get_choice_class("sac")
assert cls is SACAlgorithmConfig
def test_sac_algorithm_config_from_policy_config():
"""from_policy_config embeds the policy config and uses SAC defaults."""
sac_cfg = _make_sac_config()
algo_cfg = SACAlgorithmConfig.from_policy_config(sac_cfg)
assert algo_cfg.policy_config is sac_cfg
assert algo_cfg.discrete_critic_network_kwargs is sac_cfg.discrete_critic_network_kwargs
# Defaults come from SACAlgorithmConfig, not from the policy config.
assert algo_cfg.utd_ratio == 1
assert algo_cfg.policy_update_freq == 1
assert algo_cfg.grad_clip_norm == 40.0
assert algo_cfg.actor_lr == 3e-4
# ===========================================================================
# TrainingStats tests
# ===========================================================================
def test_training_stats_defaults():
stats = TrainingStats()
assert stats.losses == {}
assert stats.grad_norms == {}
assert stats.extra == {}
# ===========================================================================
# get_weights
# ===========================================================================
def test_get_weights_returns_policy_state_dict():
algorithm, policy = _make_algorithm()
weights = algorithm.get_weights()
assert "policy" in weights
actor_state_dict = policy.actor.state_dict()
for key in actor_state_dict:
assert key in weights["policy"]
assert torch.equal(weights["policy"][key].cpu(), actor_state_dict[key].cpu())
def test_get_weights_includes_discrete_critic_when_present():
algorithm, _ = _make_algorithm(num_discrete_actions=3, action_dim=6)
weights = algorithm.get_weights()
assert "discrete_critic" in weights
assert len(weights["discrete_critic"]) > 0
def test_get_weights_excludes_discrete_critic_when_absent():
algorithm, _ = _make_algorithm()
weights = algorithm.get_weights()
assert "discrete_critic" not in weights
def test_get_weights_are_on_cpu():
algorithm, _ = _make_algorithm(num_discrete_actions=3, action_dim=6)
weights = algorithm.get_weights()
for group_name, state_dict in weights.items():
for key, tensor in state_dict.items():
assert tensor.device == torch.device("cpu"), f"{group_name}/{key} is not on CPU"
# ===========================================================================
# select_action (lives on the policy, not the algorithm)
# ===========================================================================
def test_select_action_returns_correct_shape():
action_dim = 6
_, policy = _make_algorithm(state_dim=10, action_dim=action_dim)
policy.eval()
obs = {OBS_STATE: torch.randn(10)}
action = policy.select_action(obs)
assert action.shape == (action_dim,)
def test_select_action_with_discrete_critic():
continuous_dim = 5
_, policy = _make_algorithm(state_dim=10, action_dim=continuous_dim, num_discrete_actions=3)
policy.eval()
obs = {OBS_STATE: torch.randn(10)}
action = policy.select_action(obs)
assert action.shape == (continuous_dim + 1,)
# ===========================================================================
# update (single batch, utd_ratio=1)
# ===========================================================================
def test_update_returns_training_stats():
algorithm, _ = _make_algorithm()
stats = algorithm.update(_batch_iterator())
assert isinstance(stats, TrainingStats)
assert "loss_critic" in stats.losses
assert isinstance(stats.losses["loss_critic"], float)
def test_update_populates_actor_and_temperature_losses():
"""With policy_update_freq=1 and step 0, actor/temperature should be updated."""
algorithm, _ = _make_algorithm(policy_update_freq=1)
stats = algorithm.update(_batch_iterator())
assert "loss_actor" in stats.losses
assert "loss_temperature" in stats.losses
assert "temperature" in stats.extra
@pytest.mark.parametrize("policy_update_freq", [2, 3])
def test_update_skips_actor_at_non_update_steps(policy_update_freq):
"""Actor/temperature should only update when optimization_step % freq == 0."""
algorithm, _ = _make_algorithm(policy_update_freq=policy_update_freq)
it = _batch_iterator()
# Step 0: should update actor
stats_0 = algorithm.update(it)
assert "loss_actor" in stats_0.losses
# Step 1: should NOT update actor
stats_1 = algorithm.update(it)
assert "loss_actor" not in stats_1.losses
def test_update_increments_optimization_step():
algorithm, _ = _make_algorithm()
it = _batch_iterator()
assert algorithm.optimization_step == 0
algorithm.update(it)
assert algorithm.optimization_step == 1
algorithm.update(it)
assert algorithm.optimization_step == 2
def test_update_with_discrete_critic():
algorithm, _ = _make_algorithm(num_discrete_actions=3, action_dim=6)
stats = algorithm.update(_batch_iterator(action_dim=7)) # continuous + 1 discrete
assert "loss_discrete_critic" in stats.losses
assert "discrete_critic" in stats.grad_norms
# ===========================================================================
# update with UTD ratio > 1
# ===========================================================================
@pytest.mark.parametrize("utd_ratio", [2, 4])
def test_update_with_utd_ratio(utd_ratio):
algorithm, _ = _make_algorithm(utd_ratio=utd_ratio)
stats = algorithm.update(_batch_iterator())
assert isinstance(stats, TrainingStats)
assert "loss_critic" in stats.losses
assert algorithm.optimization_step == 1
def test_update_utd_ratio_pulls_utd_batches():
"""next(batch_iterator) should be called exactly utd_ratio times."""
utd_ratio = 3
algorithm, _ = _make_algorithm(utd_ratio=utd_ratio)
call_count = 0
def counting_iterator():
nonlocal call_count
while True:
call_count += 1
yield _make_batch()
algorithm.update(counting_iterator())
assert call_count == utd_ratio
def test_update_utd_ratio_3_critic_warmup_changes_weights():
"""With utd_ratio=3, critic weights should change after update (3 critic steps)."""
algorithm, policy = _make_algorithm(utd_ratio=3)
critic_params_before = {n: p.clone() for n, p in algorithm.critic_ensemble.named_parameters()}
algorithm.update(_batch_iterator())
changed = False
for n, p in algorithm.critic_ensemble.named_parameters():
if not torch.equal(p, critic_params_before[n]):
changed = True
break
assert changed, "Critic weights should have changed after UTD update"
# ===========================================================================
# get_observation_features
# ===========================================================================
def test_get_observation_features_returns_none_without_frozen_encoder():
algorithm, _ = _make_algorithm(with_images=False)
obs = {OBS_STATE: torch.randn(4, 10)}
next_obs = {OBS_STATE: torch.randn(4, 10)}
feat, next_feat = algorithm.get_observation_features(obs, next_obs)
assert feat is None
assert next_feat is None
# ===========================================================================
# optimization_step setter
# ===========================================================================
def test_optimization_step_can_be_set_for_resume():
algorithm, _ = _make_algorithm()
algorithm.optimization_step = 100
assert algorithm.optimization_step == 100
# ===========================================================================
# make_algorithm factory
# ===========================================================================
def test_make_algorithm_returns_sac_for_sac_policy():
sac_cfg = _make_sac_config()
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert isinstance(algorithm, SACAlgorithm)
assert algorithm.optimizers == {}
def test_make_optimizers_creates_expected_keys():
"""make_optimizers_and_scheduler() should populate the algorithm with Adam optimizers."""
sac_cfg = _make_sac_config()
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
optimizers = algorithm.make_optimizers_and_scheduler()
assert "actor" in optimizers
assert "critic" in optimizers
assert "temperature" in optimizers
assert all(isinstance(v, torch.optim.Adam) for v in optimizers.values())
assert algorithm.get_optimizers() is optimizers
def test_actor_side_no_optimizers():
"""Actor-side usage: no optimizers needed, make_optimizers_and_scheduler is not called."""
sac_cfg = _make_sac_config()
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert isinstance(algorithm, SACAlgorithm)
assert algorithm.optimizers == {}
def test_make_algorithm_uses_sac_algorithm_defaults():
"""make_algorithm populates SACAlgorithmConfig with its own defaults."""
sac_cfg = _make_sac_config()
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert algorithm.config.utd_ratio == 1
assert algorithm.config.policy_update_freq == 1
assert algorithm.config.grad_clip_norm == 40.0
def test_unknown_algorithm_name_raises_in_registry():
"""The ChoiceRegistry is the source of truth for unknown algorithm names."""
with pytest.raises(KeyError):
RLAlgorithmConfig.get_choice_class("unknown_algo")
# ===========================================================================
# load_weights (round-trip with get_weights)
# ===========================================================================
def test_load_weights_round_trip():
"""get_weights -> load_weights should restore identical parameters on a fresh policy."""
algo_src, _ = _make_algorithm(state_dim=10, action_dim=6)
algo_src.update(_batch_iterator())
sac_cfg = _make_sac_config(state_dim=10, action_dim=6)
policy_dst = GaussianActorPolicy(config=sac_cfg)
algo_dst = SACAlgorithm(policy=policy_dst, config=algo_src.config)
weights = algo_src.get_weights()
algo_dst.load_weights(weights, device="cpu")
dst_actor_state_dict = algo_dst.policy.actor.state_dict()
for key, tensor in weights["policy"].items():
assert torch.equal(
dst_actor_state_dict[key].cpu(),
tensor.cpu(),
), f"Policy param '{key}' mismatch after load_weights"
def test_load_weights_round_trip_with_discrete_critic():
algo_src, _ = _make_algorithm(num_discrete_actions=3, action_dim=6)
algo_src.update(_batch_iterator(action_dim=7))
sac_cfg = _make_sac_config(num_discrete_actions=3, action_dim=6)
policy_dst = GaussianActorPolicy(config=sac_cfg)
algo_dst = SACAlgorithm(policy=policy_dst, config=algo_src.config)
weights = algo_src.get_weights()
algo_dst.load_weights(weights, device="cpu")
assert "discrete_critic" in weights
assert len(weights["discrete_critic"]) > 0
dst_discrete_critic_state_dict = algo_dst.policy.discrete_critic.state_dict()
for key, tensor in weights["discrete_critic"].items():
assert torch.equal(
dst_discrete_critic_state_dict[key].cpu(),
tensor.cpu(),
), f"Discrete critic param '{key}' mismatch after load_weights"
def test_load_weights_ignores_missing_discrete_critic():
"""load_weights should not fail when weights lack discrete_critic on a non-discrete policy."""
algorithm, _ = _make_algorithm()
weights = algorithm.get_weights()
algorithm.load_weights(weights, device="cpu")
def test_actor_side_weight_sync_with_discrete_critic():
"""End-to-end: learner ``algorithm.get_weights()`` -> actor ``policy.load_actor_weights()``."""
# Learner side: train the algorithm so its weights diverge from init.
algo_src, _ = _make_algorithm(num_discrete_actions=3, action_dim=6)
algo_src.update(_batch_iterator(action_dim=7))
weights = algo_src.get_weights()
# Actor side: fresh policy, no algorithm/optimizer.
sac_cfg = _make_sac_config(num_discrete_actions=3, action_dim=6)
policy_actor = GaussianActorPolicy(config=sac_cfg)
# Snapshot initial actor state for the "did it change?" assertion below.
initial_discrete_critic_state_dict = {
k: v.clone() for k, v in policy_actor.discrete_critic.state_dict().items()
}
policy_actor.load_actor_weights(weights, device="cpu")
# Actor weights match the learner's exported actor state dict.
actor_state_dict = policy_actor.actor.state_dict()
for key, tensor in weights["policy"].items():
assert torch.equal(actor_state_dict[key].cpu(), tensor.cpu()), (
f"Actor param '{key}' not synced by load_actor_weights"
)
# Discrete critic weights match the learner's exported discrete critic.
discrete_critic_state_dict = policy_actor.discrete_critic.state_dict()
for key, tensor in weights["discrete_critic"].items():
assert torch.equal(discrete_critic_state_dict[key].cpu(), tensor.cpu()), (
f"Discrete critic param '{key}' not synced by load_actor_weights"
)
# Sanity: the discrete critic actually changed (otherwise the sync is trivial).
changed = any(
not torch.equal(initial_discrete_critic_state_dict[key], discrete_critic_state_dict[key])
for key in initial_discrete_critic_state_dict
if key in discrete_critic_state_dict
)
assert changed, "Discrete critic weights did not change between init and after sync"
# ===========================================================================
# TrainingStats generic losses dict
# ===========================================================================
def test_training_stats_generic_losses():
stats = TrainingStats(
losses={"loss_bc": 0.5, "loss_q": 1.2},
extra={"temperature": 0.1},
)
assert stats.losses["loss_bc"] == 0.5
assert stats.losses["loss_q"] == 1.2
assert stats.extra["temperature"] == 0.1
# ===========================================================================
# Registry-driven build_algorithm
# ===========================================================================
def test_build_algorithm_via_config():
"""SACAlgorithmConfig.build_algorithm should produce a working SACAlgorithm."""
sac_cfg = _make_sac_config()
algo_config = SACAlgorithmConfig.from_policy_config(sac_cfg)
algo_config.utd_ratio = 2
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = algo_config.build_algorithm(policy)
assert isinstance(algorithm, SACAlgorithm)
assert algorithm.config.utd_ratio == 2
def test_make_algorithm_uses_build_algorithm():
"""make_algorithm should delegate to config.build_algorithm (no hardcoded if/else)."""
sac_cfg = _make_sac_config()
policy = GaussianActorPolicy(config=sac_cfg)
algorithm = make_algorithm(cfg=SACAlgorithmConfig.from_policy_config(sac_cfg), policy=policy)
assert isinstance(algorithm, SACAlgorithm)
+127
View File
@@ -0,0 +1,127 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import torch # noqa: E402
from torch import Tensor # noqa: E402
from lerobot.rl.algorithms.base import RLAlgorithm # noqa: E402
from lerobot.rl.algorithms.configs import TrainingStats # noqa: E402
from lerobot.rl.trainer import RLTrainer # noqa: E402
from lerobot.utils.constants import ACTION, OBS_STATE # noqa: E402
class _DummyRLAlgorithmConfig:
"""Dummy config for testing."""
class _DummyRLAlgorithm(RLAlgorithm):
config_class = _DummyRLAlgorithmConfig
name = "dummy_rl_algorithm"
def __init__(self):
self.configure_calls = 0
self.update_calls = 0
def select_action(self, observation: dict[str, Tensor]) -> Tensor:
return torch.zeros(1)
def configure_data_iterator(
self,
data_mixer,
batch_size: int,
*,
async_prefetch: bool = True,
queue_size: int = 2,
):
self.configure_calls += 1
return data_mixer.get_iterator(
batch_size=batch_size,
async_prefetch=async_prefetch,
queue_size=queue_size,
)
def make_optimizers_and_scheduler(self):
return {}
def update(self, batch_iterator):
self.update_calls += 1
_ = next(batch_iterator)
return TrainingStats(losses={"dummy": 1.0})
def load_weights(self, weights, device="cpu") -> None:
_ = (weights, device)
class _SimpleMixer:
def get_iterator(self, batch_size: int, async_prefetch: bool = True, queue_size: int = 2):
_ = (async_prefetch, queue_size)
while True:
yield {
"state": {OBS_STATE: torch.randn(batch_size, 3)},
ACTION: torch.randn(batch_size, 2),
"reward": torch.randn(batch_size),
"next_state": {OBS_STATE: torch.randn(batch_size, 3)},
"done": torch.zeros(batch_size),
"truncated": torch.zeros(batch_size),
"complementary_info": None,
}
def test_trainer_lazy_iterator_lifecycle_and_reset():
algo = _DummyRLAlgorithm()
mixer = _SimpleMixer()
trainer = RLTrainer(algorithm=algo, data_mixer=mixer, batch_size=4)
# First call builds iterator once.
trainer.training_step()
assert algo.configure_calls == 1
assert algo.update_calls == 1
# Second call reuses existing iterator.
trainer.training_step()
assert algo.configure_calls == 1
assert algo.update_calls == 2
# Explicit reset forces lazy rebuild on next step.
trainer.reset_data_iterator()
trainer.training_step()
assert algo.configure_calls == 2
assert algo.update_calls == 3
def test_trainer_set_data_mixer_resets_by_default():
algo = _DummyRLAlgorithm()
mixer_a = _SimpleMixer()
mixer_b = _SimpleMixer()
trainer = RLTrainer(algorithm=algo, data_mixer=mixer_a, batch_size=2)
trainer.training_step()
assert algo.configure_calls == 1
trainer.set_data_mixer(mixer_b, reset=True)
trainer.training_step()
assert algo.configure_calls == 2
def test_algorithm_optimization_step_contract_defaults():
algo = _DummyRLAlgorithm()
assert algo.optimization_step == 0
algo.optimization_step = 11
assert algo.optimization_step == 11
+1 -1
View File
@@ -22,7 +22,7 @@ from unittest.mock import patch
import pytest
pytest.importorskip("grpc")
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from lerobot.utils.process import ProcessSignalHandler # noqa: E402
-1
View File
@@ -19,7 +19,6 @@ from collections.abc import Callable
import pytest
pytest.importorskip("grpc")
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
import torch # noqa: E402