From d0278ea0932f5b5ff2733324cdff6f63e29c37f3 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Tue, 12 May 2026 15:16:28 +0200 Subject: [PATCH] feat(smolvla2): render state panel in autonomous mode too Dry-run REPL had a clean ANSI-clear-+-rich-panel layout via ``_redraw`` showing task / subtask / plan / memory / queued-actions / pending-tool-calls; autonomous mode just had bare ``> `` plus log lines scrolling past the user. Same data, two presentations. Extract ``_make_state_panel_renderer(runtime, mode_label=...)`` and use it from both ``_run_repl`` (called per user input) and ``_run_autonomous`` (called both on user input *and* on a 0.5s background timer so subtask / plan / memory refreshes from the runtime's own loop become visible without the user typing anything). Title bar shows ``dry-run`` vs ``autonomous`` so it's obvious which mode you're in. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../scripts/lerobot_smolvla2_runtime.py | 125 ++++++++++++------ 1 file changed, 83 insertions(+), 42 deletions(-) diff --git a/src/lerobot/scripts/lerobot_smolvla2_runtime.py b/src/lerobot/scripts/lerobot_smolvla2_runtime.py index 986b37194..e27657b8b 100644 --- a/src/lerobot/scripts/lerobot_smolvla2_runtime.py +++ b/src/lerobot/scripts/lerobot_smolvla2_runtime.py @@ -737,12 +737,34 @@ def _run_autonomous( daemon=True, ) thread.start() + + redraw = _make_state_panel_renderer(runtime, mode_label="autonomous") + redraw() print( - "[smolvla2] autonomous loop running. Type interjections / " - "questions on stdin (Ctrl+C to stop).", + " [autonomous] type interjections / '?' questions on stdin, " + "'stop' or Ctrl+C to quit", flush=True, ) + # Background panel-redraw thread so state changes from the runtime + # loop (subtask refresh, plan update, etc.) are visible without the + # user typing anything. 2 Hz is plenty — generation runs at most + # ~1 Hz on MPS. + _panel_stop = threading.Event() + + def _panel_loop() -> None: + while not _panel_stop.is_set(): + try: + redraw() + except Exception: # noqa: BLE001 + pass + _panel_stop.wait(0.5) + + panel_thread = threading.Thread( + target=_panel_loop, name="smolvla2-panel-redraw", daemon=True + ) + panel_thread.start() + try: while thread.is_alive(): try: @@ -766,6 +788,7 @@ def _run_autonomous( except KeyboardInterrupt: print("\n[smolvla2] interrupt — stopping", flush=True) finally: + _panel_stop.set() runtime.stop() # Give the loop a moment to drain. for _ in range(10): @@ -781,6 +804,61 @@ def _run_autonomous( return 0 +def _make_state_panel_renderer( + runtime: Any, + *, + mode_label: str, +) -> Callable[[list[str] | None], None]: + """Return a closure that prints the task/subtask/plan/memory panel. + + Used by both ``_run_repl`` (dry-run, called per user input) and + ``_run_autonomous`` (real robot, called on a 2 Hz timer + + whenever the user types). Centralises the visual format so the + two modes look identical. + """ + from rich.console import Console # noqa: PLC0415 + + console = Console(highlight=False) + + def _redraw(robot_lines: list[str] | None = None) -> None: + console.clear() + console.rule(f"[bold]SmolVLA2[/] · {mode_label}", style="cyan") + st = runtime.state + for key, label in ( + ("task", "task"), + ("current_subtask", "subtask"), + ("current_plan", "plan"), + ("current_memory", "memory"), + ): + value = st.get(key) + if value: + console.print(f" [bold cyan]{label:<8}[/] {value}") + else: + console.print(f" [dim]{label:<8} (not set)[/]") + queue_len = ( + len(st["action_queue"]) + if isinstance(st.get("action_queue"), (list, tuple)) + or hasattr(st.get("action_queue"), "__len__") + else 0 + ) + pending = len(st.get("tool_calls_pending") or []) + console.print( + f" [dim]queued actions: {queue_len} pending tool calls: {pending}[/]" + ) + console.rule(style="cyan") + if robot_lines: + for line in robot_lines: + console.print(f" [magenta]{line.strip()}[/]") + console.print() + if not st.get("task"): + console.print( + " [dim]Type the task to begin. Lines ending in '?' are VQA, " + "anything else is an interjection. Type 'stop' to exit.[/]" + ) + + return _redraw + + def _build_tools(no_tts: bool, tts_voice: str) -> dict[str, Any]: """Instantiate the tools declared on this dataset/policy.""" if no_tts: @@ -976,48 +1054,11 @@ def _run_repl(runtime: Any, *, initial_task: str | None, max_ticks: int | None) ) return 2 + _redraw = _make_state_panel_renderer(runtime, mode_label="dry-run") + # Keep a local ``console`` just for the styled input prompt; the + # state panel is owned by the shared renderer. console = Console(highlight=False) - def _redraw(robot_lines: list[str] | None = None) -> None: - # ANSI clear screen + home cursor. Falls back gracefully on - # dumb terminals — they just see scrolled output, which is - # fine. - console.clear() - console.rule("[bold]SmolVLA2[/] · dry-run", style="cyan") - st = runtime.state - for key, label in ( - ("task", "task"), - ("current_subtask", "subtask"), - ("current_plan", "plan"), - ("current_memory", "memory"), - ): - value = st.get(key) - if value: - console.print(f" [bold cyan]{label:<8}[/] {value}") - else: - console.print(f" [dim]{label:<8} (not set)[/]") - queue_len = ( - len(st["action_queue"]) - if isinstance(st.get("action_queue"), (list, tuple)) - or hasattr(st.get("action_queue"), "__len__") - else 0 - ) - pending = len(st.get("tool_calls_pending") or []) - console.print( - f" [dim]queued actions: {queue_len} pending tool calls: {pending}[/]" - ) - console.rule(style="cyan") - if robot_lines: - for line in robot_lines: - console.print(f" [magenta]{line.strip()}[/]") - console.print() - # Help line under the divider when nothing is set yet. - if not st.get("task"): - console.print( - " [dim]Type the task to begin. Lines ending in '?' are VQA, " - "anything else is an interjection. Type 'stop' to exit.[/]" - ) - last_logs: list[str] = [] _redraw() if initial_task is None: