Compare commits

...

6 Commits

Author SHA1 Message Date
Maximellerbach 4c666599b4 cleaning up
Co-authored-by: Copilot <copilot@github.com>
2026-05-05 13:44:02 +02:00
Maximellerbach 8d9f137f70 small phase fix and documenting potential issues
Co-authored-by: Copilot <copilot@github.com>
2026-05-05 11:59:16 +02:00
Maximellerbach 375f23559e update docstring
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 19:00:14 +02:00
Maximellerbach 7a62ea21d8 feat(dagger): adding smooth handover
Co-authored-by: Copilot <copilot@github.com>
2026-05-04 18:44:30 +02:00
Jash Shah fdbfc015a2 fix(peft): fix LoRA resume from Hub (PosixPath + double wrap) (#3485) 2026-05-04 10:52:37 +02:00
Haoming Song d656da8ccc fix(pi): keep training sampling outside compiled forwards (#3487)
Move PI0 and PI0.5 noise/time sampling into the policy wrappers so the compiled PyTorch cores receive them as tensor inputs.

This keeps Beta sampling out of torch.compile on MPS, avoiding aten::_sample_dirichlet compilation errors while preserving the CUDA training path.

Validation: .venv/bin/python -m pre_commit run --files src/lerobot/policies/pi0/modeling_pi0.py src/lerobot/policies/pi05/modeling_pi05.py; .venv/bin/python -m pytest -sv -rs tests/policies/pi0_pi05/test_pi0.py tests/policies/pi0_pi05/test_pi05.py tests/policies/pi0_pi05/test_pi0_rtc.py tests/policies/pi0_pi05/test_pi05_rtc.py

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-04-30 13:21:17 +02:00
7 changed files with 159 additions and 86 deletions
+4 -2
View File
@@ -514,7 +514,7 @@ def make_policy(
logging.info("Loading policy's PEFT adapter.")
peft_pretrained_path = cfg.pretrained_path
peft_pretrained_path = str(cfg.pretrained_path)
peft_config = PeftConfig.from_pretrained(peft_pretrained_path)
kwargs["pretrained_name_or_path"] = peft_config.base_model_name_or_path
@@ -527,7 +527,9 @@ def make_policy(
)
policy = policy_cls.from_pretrained(**kwargs)
policy = PeftModel.from_pretrained(policy, peft_pretrained_path, config=peft_config)
policy = PeftModel.from_pretrained(
policy, peft_pretrained_path, config=peft_config, is_trainable=True
)
else:
# Make a fresh policy.
+5 -10
View File
@@ -748,16 +748,8 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
return embs, pad_masks, att_masks, adarms_cond
def forward(
self, images, img_masks, lang_tokens, lang_masks, state, actions, noise=None, time=None
) -> Tensor:
def forward(self, images, img_masks, lang_tokens, lang_masks, state, actions, noise, time) -> Tensor:
"""Do a full training forward pass and compute the loss."""
if noise is None:
noise = self.sample_noise(actions.shape, actions.device)
if time is None:
time = self.sample_time(actions.shape[0], actions.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * actions
u_t = noise - actions
@@ -1292,8 +1284,11 @@ class PI0Policy(PreTrainedPolicy):
state = self.prepare_state(batch)
actions = self.prepare_action(batch)
noise = self.model.sample_noise(actions.shape, actions.device)
time = self.model.sample_time(actions.shape[0], actions.device)
# Compute loss
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions)
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time)
# Truncate losses to actual action dimensions
original_action_dim = self.config.output_features[ACTION].shape[0]
+5 -8
View File
@@ -728,14 +728,8 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
return embs, pad_masks, att_masks, adarms_cond
def forward(self, images, img_masks, tokens, masks, actions, noise=None, time=None) -> Tensor:
def forward(self, images, img_masks, tokens, masks, actions, noise, time) -> Tensor:
"""Do a full training forward pass and compute the loss."""
if noise is None:
noise = self.sample_noise(actions.shape, actions.device)
if time is None:
time = self.sample_time(actions.shape[0], actions.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * actions
u_t = noise - actions
@@ -1262,8 +1256,11 @@ class PI05Policy(PreTrainedPolicy):
actions = self.prepare_action(batch)
noise = self.model.sample_noise(actions.shape, actions.device)
time = self.model.sample_time(actions.shape[0], actions.device)
# Compute loss (no separate state needed for PI05)
losses = self.model.forward(images, img_masks, tokens, masks, actions)
losses = self.model.forward(images, img_masks, tokens, masks, actions, noise, time)
# Truncate losses to actual action dimensions
original_action_dim = self.config.output_features[ACTION].shape[0]
+123 -58
View File
@@ -33,12 +33,13 @@ Recording modes:
``record_autonomous=False``: Only correction windows are recorded.
Each correction (start to stop) becomes one episode.
Teleoperator expectations:
The user is responsible for keeping the leader arm aligned with the
follower arm at the moment a correction begins. Programmatic motor
handover (``enable_torque`` / ``disable_torque`` / ``write_goal_positions``)
is intentionally not invoked here see the TODO in
:func:`DAggerStrategy._apply_transition` for the open design decision.
Teleoperator handover:
On AUTONOMOUS PAUSED, actuated teleops (those with non-empty
``feedback_features``, e.g. SO-101, OpenArmMini) are smoothly driven to
the follower's last position via ``send_feedback`` so the operator takes
over without a jerk. Non-actuated teleops cannot be driven,
so on PAUSED CORRECTING the follower is instead slid to the teleop's
current pose before the correction begins.
"""
from __future__ import annotations
@@ -175,17 +176,27 @@ class DAggerEvents:
# ---------------------------------------------------------------------------
# TODO(Steven): re-enable programmatic teleop alignment once we decide whether
# to enforce motor-control methods on every Teleoperator. Until then the user
# is responsible for moving the leader arm to the follower's pose at the moment
# a correction begins.
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 50
) -> None:
"""Smoothly move teleop to target position via linear interpolation.
def _teleop_supports_feedback(teleop: Teleoperator) -> bool:
"""Return True when the teleop can receive position feedback (is actuated).
TODO(Maxime): See if it is possible to unify this interface across teleops instead of duck-typing.
"""
return (
bool(teleop.feedback_features)
and hasattr(teleop, "disable_torque")
and hasattr(teleop, "enable_torque")
)
Requires the teleoperator to support motor control methods
(``enable_torque``, ``write_goal_positions``, ``get_action``).
def _teleop_smooth_move_to(
teleop: Teleoperator, target_pos: dict, duration_s: float = 2.0, fps: int = 30
) -> None:
"""Smoothly move an actuated teleop to ``target_pos`` via linear interpolation.
Requires the teleoperator to support feedback
(i.e. have non-empty ``feedback_features`` and implement ``disable_torque`` / ``enable_torque``).
TODO(Maxime): This blocks up to ``duration_s`` seconds, during this time
the follower robot doesn't receive new actions, this could be an issue on LeKiwi.
"""
teleop.enable_torque()
current = teleop.get_action()
@@ -193,13 +204,28 @@ def _teleop_smooth_move_to(
for step in range(steps + 1):
t = step / steps
interp = {}
for k in current:
if k in target_pos:
interp[k] = current[k] * (1 - t) + target_pos[k] * t
else:
interp[k] = current[k]
teleop.write_goal_positions(interp)
interp = {
k: current[k] * (1 - t) + target_pos[k] * t if k in target_pos else current[k] for k in current
}
teleop.send_feedback(interp)
time.sleep(1 / fps)
def _follower_smooth_move_to(
robot: ThreadSafeRobot, current: dict, target: dict, duration_s: float = 1.0, fps: int = 30
) -> None:
"""Smoothly move the follower robot from ``current`` to ``target`` action.
Used when the teleop is non-actuated: instead of driving the leader arm
to the follower, we bring the follower to the teleop's current pose.
Both ``current`` and ``target`` must be in robot-action key space.
"""
steps = max(int(duration_s * fps), 1)
for step in range(steps + 1):
t = step / steps
interp = {k: current[k] * (1 - t) + target[k] * t if k in target else current[k] for k in current}
robot.send_action(interp)
time.sleep(1 / fps)
@@ -415,9 +441,6 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): re-enable once Teleoperator motor-control methods are
# standardised; until then the user pre-aligns the leader by hand.
# teleop.disable_torque()
engine.resume()
last_action: dict[str, Any] | None = None
@@ -441,8 +464,16 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
phase = events.phase
obs = robot.get_observation()
@@ -532,9 +563,6 @@ class DAggerStrategy(RolloutStrategy):
finally:
logger.info("DAgger continuous control loop ended — pausing engine")
engine.pause()
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -570,9 +598,6 @@ class DAggerStrategy(RolloutStrategy):
engine.reset()
interpolator.reset()
events.reset()
# TODO(Steven): re-enable once Teleoperator motor-control methods are
# standardised; until then the user pre-aligns the leader by hand.
# teleop.disable_torque()
engine.resume()
last_action: dict[str, Any] | None = None
@@ -600,8 +625,16 @@ class DAggerStrategy(RolloutStrategy):
transition = events.consume_transition()
if transition is not None:
old_phase, new_phase = transition
self._apply_transition(old_phase, new_phase, engine, interpolator, robot, teleop)
last_action = None
self._apply_transition(
old_phase,
new_phase,
engine,
interpolator,
ctx,
last_action,
)
if new_phase == DAggerPhase.AUTONOMOUS:
last_action = None
# Correction ended -> save episode (blocking if not streaming)
if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
@@ -679,9 +712,6 @@ class DAggerStrategy(RolloutStrategy):
finally:
logger.info("DAgger corrections-only loop ended — pausing engine")
engine.pause()
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
@@ -698,36 +728,71 @@ class DAggerStrategy(RolloutStrategy):
new_phase: DAggerPhase,
engine,
interpolator,
robot: ThreadSafeRobot,
teleop: Teleoperator,
ctx: RolloutContext,
prev_action: dict | None,
) -> None:
"""Execute side-effects for a validated phase transition."""
"""Execute side-effects for a validated phase transition, including smooth handovers.
AUTONOMOUS -> PAUSED (actuated teleop):
Pause the engine, then drive the leader arm to the follower's last
commanded position so the operator takes over without a jerk.
PAUSED -> CORRECTING (non-actuated teleop):
Slide the follower to the teleop's current pose so the robot meets
the operator's hand rather than jumping to it on the first frame.
CORRECTING -> PAUSED (actuated teleop):
Re-enable torque to hold position after correction.
This will be potentially useful if cancelling the correction recording
PAUSED -> AUTONOMOUS:
Reset and resume the inference engine.
"""
teleop = ctx.hardware.teleop
robot = ctx.hardware.robot_wrapper
logger.info("Phase transition: %s -> %s", old_phase.value, new_phase.value)
if old_phase == DAggerPhase.AUTONOMOUS and new_phase == DAggerPhase.PAUSED:
logger.info("Pausing engine robot holds position")
logger.info("Pausing engine - robot holds position")
engine.pause()
obs = robot.get_observation()
_robot_pos = {
k: v for k, v in obs.items() if k.endswith(".pos") and k in robot.observation_features
}
# TODO(Steven): once Teleoperator motor-control methods are
# standardised, drive the leader to the follower's pose here so the
# operator does not need to pre-align the arm by hand. Until then
# the user is responsible for the alignment.
# _teleop_smooth_move_to(teleop, _robot_pos, duration_s=2.0, fps=50)
elif new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode — human teleop control")
# TODO(Steven): re-enable once Teleoperator motor-control methods
# are standardised across all teleop implementations.
# teleop.disable_torque()
if _teleop_supports_feedback(teleop) and prev_action is not None:
# TODO(Maxime): prev_action is in robot action key space (output of robot_action_processor).
# send_feedback expects teleop feedback key space. For homogeneous setups (e.g. SO-101
# leader + SO-101 follower) the keys are identical so this works. If the processor pipeline
# does non-trivial key renaming (e.g. a rename_map on action keys), the interpolation in
# _teleop_smooth_move_to silently no-ops and the arm doesn't move.
logger.info("Smooth handover: moving leader arm to follower position")
_teleop_smooth_move_to(teleop, prev_action)
elif old_phase == DAggerPhase.PAUSED and new_phase == DAggerPhase.CORRECTING:
logger.info("Entering correction mode - human teleop control")
if not _teleop_supports_feedback(teleop) and prev_action is not None:
logger.info("Smooth handover: sliding follower to teleop position")
obs = robot.get_observation()
teleop_action = teleop.get_action()
processed = ctx.processors.teleop_action_processor((teleop_action, obs))
target = ctx.processors.robot_action_processor((processed, obs))
_follower_smooth_move_to(robot, prev_action, target)
# unlock the teleop for human control
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
elif old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
if _teleop_supports_feedback(teleop):
teleop.enable_torque()
elif new_phase == DAggerPhase.AUTONOMOUS:
logger.info("Resuming autonomous mode resetting engine and interpolator")
logger.info("Resuming autonomous mode - resetting engine and interpolator")
interpolator.reset()
engine.reset()
engine.resume()
# release teleop before resuming the policy
if _teleop_supports_feedback(teleop):
teleop.disable_torque()
# ------------------------------------------------------------------
# Background push (shared by both modes)
# ------------------------------------------------------------------
+8 -3
View File
@@ -277,9 +277,14 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
if cfg.peft is not None:
if cfg.is_reward_model_training:
raise ValueError("PEFT is only supported for policy training. ")
logging.info("Using PEFT! Wrapping model.")
peft_cli_overrides = dataclasses.asdict(cfg.peft)
policy = policy.wrap_with_peft(peft_cli_overrides=peft_cli_overrides)
from peft import PeftModel
if isinstance(policy, PeftModel):
logging.info("PEFT adapter already loaded from checkpoint, skipping wrap_with_peft.")
else:
logging.info("Using PEFT! Wrapping model.")
peft_cli_overrides = dataclasses.asdict(cfg.peft)
policy = policy.wrap_with_peft(peft_cli_overrides=peft_cli_overrides)
# Wait for all processes to finish model creation before continuing
accelerator.wait_for_everyone()
@@ -112,7 +112,7 @@ class OpenArmMini(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -348,8 +348,9 @@ class OpenArmMini(Teleoperator):
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")
self.write_goal_positions(feedback)
@check_if_not_connected
def disconnect(self) -> None:
@@ -59,7 +59,7 @@ class SOLeader(Teleoperator):
@property
def feedback_features(self) -> dict[str, type]:
return {}
return self.action_features
@property
def is_connected(self) -> bool:
@@ -130,6 +130,12 @@ class SOLeader(Teleoperator):
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def enable_torque(self) -> None:
self.bus.enable_torque()
def disable_torque(self) -> None:
self.bus.disable_torque()
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
@@ -145,9 +151,11 @@ class SOLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
@check_if_not_connected
def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
goals = {k.removesuffix(".pos"): v for k, v in feedback.items() if k.endswith(".pos")}
if goals:
self.bus.sync_write("Goal_Position", goals)
@check_if_not_connected
def disconnect(self) -> None: