mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-25 05:29:55 +00:00
fix(smolvla2): unblock action dispatch when high-level LLM stalls loop
The runtime is single-threaded. `HighLevelSubtaskFwd` at HzTrigger(1.0) fires every loop iteration on MPS because each `select_message` call takes ~2 s, longer than its 1/hz period. The whole tick stretches to ~2.5 s, so `DispatchAction` (HzTrigger 30) only pops a single action per loop iteration — the queue drains at ~0.4 actions/sec instead of 30 and the robot barely moves between chunk refreshes. Two changes, both purely about scheduling — no threading: * Gate `HighLevelSubtaskFwd` to fire only when the action queue is empty, matching `LowLevelForward`'s refresh condition. The slow LLM call now happens during the "think" phase between chunks, not on every dispatch tick. Restores a clean sense → think → act cycle. * `DispatchAction` catches up via wall-clock: when the trigger fires after a stall, pop `round(elapsed * hz)` entries and send only the most recent. Open-loop chunks are timestamped at ctrl_hz; sending stale joint targets one-by-one would just lag the robot further behind. The dynamixel smooths to the latest goal anyway. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -167,23 +167,59 @@ class DispatchAction(InferenceStep):
|
|||||||
In dry-run mode (``robot_executor=None``) the step still pops the
|
In dry-run mode (``robot_executor=None``) the step still pops the
|
||||||
queue so it doesn't grow unbounded — the popped tensor is logged
|
queue so it doesn't grow unbounded — the popped tensor is logged
|
||||||
instead of executed.
|
instead of executed.
|
||||||
|
|
||||||
|
Wall-clock catch-up: the action queue represents an open-loop
|
||||||
|
trajectory at a fixed step rate (``trigger.hz`` ≈ ``ctrl_hz``).
|
||||||
|
When the main loop stalls — e.g. an LLM call for the high-level
|
||||||
|
subtask blocks for ~2 s on MPS — the dispatch trigger fires only
|
||||||
|
once over that whole interval. Naively popping a single entry per
|
||||||
|
fire makes the robot lag further and further behind the planned
|
||||||
|
timeline, and a 50-step chunk would take ~125 s to drain instead
|
||||||
|
of ~1.7 s. Track real elapsed time between dispatches and pop
|
||||||
|
``round(elapsed * hz)`` entries, sending the most recent one. The
|
||||||
|
skipped intermediate joint targets are stale anyway — the dynamixel
|
||||||
|
will smooth toward the latest goal position.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
robot_executor: Any = None
|
robot_executor: Any = None
|
||||||
trigger: Trigger = field(default_factory=lambda: HzTrigger(hz=50.0))
|
trigger: Trigger = field(default_factory=lambda: HzTrigger(hz=50.0))
|
||||||
|
_last_dispatch_t: float | None = field(default=None, init=False)
|
||||||
|
|
||||||
def run(self, state: dict[str, Any]) -> dict[str, Any] | None:
|
def run(self, state: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
|
import time as _time # noqa: PLC0415
|
||||||
|
|
||||||
queue = state.get("action_queue")
|
queue = state.get("action_queue")
|
||||||
if not queue:
|
if not queue:
|
||||||
|
# Reset wall-clock anchor when the queue is empty so the
|
||||||
|
# next chunk doesn't see a huge fake "elapsed" window.
|
||||||
|
self._last_dispatch_t = None
|
||||||
return None
|
return None
|
||||||
action = queue.popleft() if hasattr(queue, "popleft") else queue.pop(0)
|
|
||||||
if self.robot_executor is not None:
|
now = _time.monotonic()
|
||||||
self.robot_executor(action)
|
hz = getattr(self.trigger, "hz", 30.0)
|
||||||
# Track lifetime dispatch count so the REPL panel can show
|
if self._last_dispatch_t is None or hz <= 0:
|
||||||
# whether the action loop is actually doing useful work, even
|
n_to_pop = 1
|
||||||
# while the text head produces gibberish (the typical real-
|
else:
|
||||||
# robot failure mode for a memorised model).
|
elapsed = now - self._last_dispatch_t
|
||||||
state["actions_dispatched"] = state.get("actions_dispatched", 0) + 1
|
# ``max(1, ...)`` so we always pop at least one when the
|
||||||
|
# trigger fires; ``min(len(queue), ...)`` so we don't run
|
||||||
|
# off the end of the chunk.
|
||||||
|
n_to_pop = max(1, min(len(queue), int(round(elapsed * hz))))
|
||||||
|
self._last_dispatch_t = now
|
||||||
|
|
||||||
|
# Drain ``n_to_pop`` stale entries, keep only the latest as the
|
||||||
|
# action actually sent. The intermediate joint targets would
|
||||||
|
# all be ~10–30 ms apart in chunk time — the robot can't track
|
||||||
|
# them individually anyway when the host loop is slow.
|
||||||
|
latest = None
|
||||||
|
for _ in range(n_to_pop):
|
||||||
|
if not queue:
|
||||||
|
break
|
||||||
|
latest = queue.popleft() if hasattr(queue, "popleft") else queue.pop(0)
|
||||||
|
state["actions_dispatched"] = state.get("actions_dispatched", 0) + 1
|
||||||
|
|
||||||
|
if latest is not None and self.robot_executor is not None:
|
||||||
|
self.robot_executor(latest)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -316,6 +352,17 @@ class HighLevelSubtaskFwd(InferenceStep):
|
|||||||
def run(self, state: dict[str, Any]) -> dict[str, Any] | None:
|
def run(self, state: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
if self.policy is None or not state.get("task"):
|
if self.policy is None or not state.get("task"):
|
||||||
return None
|
return None
|
||||||
|
# Gate to chunk boundaries: only generate a fresh subtask when
|
||||||
|
# the action queue is empty (i.e. right before LowLevelForward
|
||||||
|
# refreshes the chunk). ``select_message`` takes ~2 s on MPS,
|
||||||
|
# and running it every loop iteration starves DispatchAction
|
||||||
|
# at ctrl_hz=30 — the queue drains at ~0.4 actions/sec instead
|
||||||
|
# of 30/sec and the robot barely moves. Tying it to the same
|
||||||
|
# "queue empty" condition as the chunk refresh produces a
|
||||||
|
# clean sense → think → act cycle.
|
||||||
|
queue = state.get("action_queue") or []
|
||||||
|
if len(queue) > 0:
|
||||||
|
return None
|
||||||
ctx = _msgs_for_subtask(state)
|
ctx = _msgs_for_subtask(state)
|
||||||
observation = _maybe_observation(self.observation_provider)
|
observation = _maybe_observation(self.observation_provider)
|
||||||
msg = _generate_with_policy(
|
msg = _generate_with_policy(
|
||||||
|
|||||||
Reference in New Issue
Block a user