refactor(smolvla2): command-driven runtime — no startup prompts

Replace the startup mode prompt + task picker with a single
command-driven prompt. The runtime now comes up immediately at the
command line in `paused` mode (robot idle) and the operator drives it:

  /action "task"     run the robot on a task (bare = resume, number = timed burst)
  /pause             stop the action loop — robot holds position
  /question "..."    pause and answer one VQA question (camera prompt + overlay)
  /help / stop

- Removed _select_mode_interactively / _select_task_interactively /
  _dataset_task_strings (the interactive pickers).
- mode value renamed "question" -> "paused"; --mode choices are now
  action|paused (default paused).
- /question takes the question inline and runs it via _handle_slash_command
  (pauses first, so the policy isn't used concurrently).
- The ENTER-to-start gate only fires when starting in action mode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-05-18 14:37:51 +02:00
parent 516ffc7687
commit e7c5613a39
4 changed files with 201 additions and 353 deletions
@@ -16,7 +16,7 @@
Reads non-blocking stdin lines, classifies each one heuristically:
"stop" / "quit" / "exit" → state["stop"] = True
"/action" / "/question" → set state["mode"]
"/action" / "/pause" → set state["mode"]
ends with "?" → user_vqa_query event
starts with "task:" or first line → set runtime task
anything else → user_interjection event
@@ -75,14 +75,14 @@ class StdinReader:
state["stop"] = True
return
# Slash commands flip the run mode. ``/question`` pauses the
# action loop (the action steps gate on ``state["mode"]``);
# ``/action`` resumes it. ``/vlm`` / ``/vqa`` are kept as aliases.
if lower in {"/action", "/act"}:
# Slash commands flip the run mode. ``/pause`` stops the action
# loop (the action steps gate on ``state["mode"]``); ``/action``
# resumes it.
if lower.split(" ", 1)[0] in {"/action", "/act", "/run"}:
state["mode"] = "action"
return
if lower in {"/question", "/q", "/vlm", "/vqa"}:
state["mode"] = "question"
if lower in {"/pause", "/p"}:
state["mode"] = "paused"
queue = state.get("action_queue")
if hasattr(queue, "clear"):
queue.clear()
@@ -33,8 +33,8 @@ Stable keys (read by multiple steps):
events_this_tick list[str] triggers consumed this tick
_tick Tick current tick (set by the loop)
mode str "action" (run the robot) | "question" (VQA
only, action loop paused)
mode str "action" (run the robot) | "paused"
(action loop stopped robot holds)
log_lines list[str] human-readable status lines printed each tick
"""
@@ -93,7 +93,7 @@ def make_state_panel(state: dict[str, Any]) -> Any:
table.add_row("", footer)
run_mode = state.get("mode", "action")
mode_tag = (
"[green]action[/]" if run_mode == "action" else "[yellow]question (paused)[/]"
"[green]action[/]" if run_mode == "action" else "[yellow]paused[/]"
)
return Panel(
table,
+191 -343
View File
@@ -76,8 +76,7 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
type=str,
required=True,
help=(
"Local directory or Hugging Face Hub repo id pointing at a "
"trained SmolVLA2 ``pretrained_model``."
"Local directory or Hugging Face Hub repo id pointing at a trained SmolVLA2 ``pretrained_model``."
),
)
p.add_argument(
@@ -149,12 +148,12 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"--mode",
dest="mode",
type=str,
choices=["action", "question"],
choices=["action", "paused"],
default=None,
help=(
"Start-up run mode. 'action' runs the robot; 'question' starts "
"paused for VQA. When given, the startup mode prompt is "
"skipped. Can still be flipped at runtime with /action /question."
"Start-up run mode. 'action' runs the robot immediately on "
"--task; 'paused' (the default) comes up at the command line "
"with the robot idle. Flip any time with /action and /pause."
),
)
p.add_argument(
@@ -205,7 +204,7 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
default=None,
help=(
"Optional JSON dict describing camera configs to attach to "
"the robot (e.g. ``'{\"top\": {\"type\": \"opencv\", \"index\": 0}}'``). "
'the robot (e.g. ``\'{"top": {"type": "opencv", "index": 0}}\'``). '
"Camera keys MUST match the ``observation.images.*`` features "
"the policy was trained on."
),
@@ -220,7 +219,7 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"``RobotConfig.max_relative_target``. Accepts either a float "
"(applied to every motor — e.g. ``5.0`` degrees) or a JSON "
"object mapping motor names to caps "
"(e.g. ``'{\"shoulder_pan\": 5, \"gripper\": 30}'``). The "
'(e.g. ``\'{"shoulder_pan": 5, "gripper": 30}\'``). The '
"robot driver clips each commanded position relative to the "
"current measured position before sending — same kill-switch "
"``lerobot-record`` uses. Default ``None`` = no clipping."
@@ -260,9 +259,7 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"~1/(forward-pass latency)."
),
)
p.add_argument(
"--ctrl_hz", type=float, default=50.0, help="Action dispatch rate."
)
p.add_argument("--ctrl_hz", type=float, default=50.0, help="Action dispatch rate.")
p.add_argument(
"--high_level_hz",
type=float,
@@ -424,9 +421,7 @@ def _build_observation_provider(
ds = LeRobotDataset(dataset_repo_id, episodes=[episode])
if len(ds) == 0:
raise ValueError(
f"Dataset {dataset_repo_id!r} episode {episode} is empty."
)
raise ValueError(f"Dataset {dataset_repo_id!r} episode {episode} is empty.")
# Optional: apply the same torchvision-v2 augmentation pipeline
# that training used, so dry-run sees frames from the augmented
@@ -474,11 +469,7 @@ def _build_observation_provider(
# Keep only observation keys; the runtime's text path will
# merge these with its own lang_tokens / lang_masks.
observation = {
k: v
for k, v in sample.items()
if isinstance(k, str) and k.startswith("observation.")
}
observation = {k: v for k, v in sample.items() if isinstance(k, str) and k.startswith("observation.")}
# Defensive: if something further upstream forgot the batch
# dim, add it now so downstream Tensor ops don't crash.
for k, v in list(observation.items()):
@@ -614,9 +605,7 @@ def _build_robot(
cls = RobotConfig.get_choice_class(robot_type)
except KeyError as exc:
available = sorted(RobotConfig._choice_registry.keys())
raise ValueError(
f"Unknown robot type {robot_type!r}. Available choices: {available}"
) from exc
raise ValueError(f"Unknown robot type {robot_type!r}. Available choices: {available}") from exc
kwargs: dict[str, Any] = {}
if robot_port:
kwargs["port"] = robot_port
@@ -650,23 +639,19 @@ def _build_robot(
cameras: dict[str, Any] = {}
for cam_name, cam_dict in cameras_raw.items():
if not isinstance(cam_dict, dict):
raise ValueError(
f"camera {cam_name!r} value must be a dict, got {cam_dict!r}"
)
raise ValueError(f"camera {cam_name!r} value must be a dict, got {cam_dict!r}")
cam_dict = dict(cam_dict) # don't mutate caller's parsed JSON
cam_type = cam_dict.pop("type", None)
if cam_type is None:
raise ValueError(
f"camera {cam_name!r} is missing a 'type' field "
f"(e.g. 'opencv', 'intelrealsense')"
f"camera {cam_name!r} is missing a 'type' field (e.g. 'opencv', 'intelrealsense')"
)
try:
cam_cls = CameraConfig.get_choice_class(cam_type)
except KeyError as exc:
available = sorted(CameraConfig._choice_registry.keys())
raise ValueError(
f"camera {cam_name!r}: unknown type {cam_type!r}. "
f"Available choices: {available}"
f"camera {cam_name!r}: unknown type {cam_type!r}. Available choices: {available}"
) from exc
cameras[cam_name] = cam_cls(**cam_dict)
kwargs["cameras"] = cameras
@@ -720,9 +705,7 @@ def _build_robot_observation_provider(
)
torch_device = torch.device(device) if isinstance(device, str) else device
robot_type = getattr(robot, "robot_type", None) or getattr(
getattr(robot, "config", None), "type", None
)
robot_type = getattr(robot, "robot_type", None) or getattr(getattr(robot, "config", None), "type", None)
# Pre-compute the camera-key → target (H, W) map from
# ``ds_features``. The training distribution sees frames at the
@@ -793,14 +776,16 @@ def _build_robot_observation_provider(
if not _resize_logged["done"]:
logger.warning(
"camera %s: live=%dx%d, training=%dx%d (resize=%s)",
cam_key, cur_h, cur_w, target_h, target_w,
cam_key,
cur_h,
cur_w,
target_h,
target_w,
"yes" if (cur_h, cur_w) != (target_h, target_w) else "no — already matched",
)
if (cur_h, cur_w) == (target_h, target_w):
continue
raw[cam_key] = _cv2.resize(
img, (target_w, target_h), interpolation=_cv2.INTER_AREA
)
raw[cam_key] = _cv2.resize(img, (target_w, target_h), interpolation=_cv2.INTER_AREA)
_resize_logged["done"] = True
# Also print the state vector once so the operator
# can eyeball it against the dataset's stats. State
@@ -809,14 +794,14 @@ def _build_robot_observation_provider(
# neutral home pose can easily sit a couple σ off
# the supervised support region.
if "observation.state" in (ds_features or {}):
state_names = (
ds_features["observation.state"].get("names") or []
)
state_names = ds_features["observation.state"].get("names") or []
state_vals = [raw.get(n) for n in state_names]
logger.warning(
"robot state at startup: %s",
{n: round(v, 2) if isinstance(v, float) else v
for n, v in zip(state_names, state_vals, strict=False)},
{
n: round(v, 2) if isinstance(v, float) else v
for n, v in zip(state_names, state_vals, strict=False)
},
)
except Exception as exc: # noqa: BLE001
logger.warning("camera resize to dataset shape failed: %s", exc)
@@ -828,15 +813,21 @@ def _build_robot_observation_provider(
# ``observation.state`` tensor. Then tensor-ise +
# device-place + add batch dim.
obs_tensors = build_inference_frame(
raw, torch_device, ds_features=ds_features,
task=task, robot_type=robot_type,
raw,
torch_device,
ds_features=ds_features,
task=task,
robot_type=robot_type,
)
else:
# No dataset features available — fall back to the
# generic numpy-only path; only works when the robot
# already returns dataset-shaped keys.
obs_tensors = prepare_observation_for_inference(
raw, torch_device, task=task, robot_type=robot_type,
raw,
torch_device,
task=task,
robot_type=robot_type,
)
except Exception as exc: # noqa: BLE001
logger.warning("observation prep failed: %s", exc)
@@ -863,9 +854,7 @@ def _build_robot_observation_provider(
_log_obs_tensors_once("robot", obs_tensors, _obs_logged)
observation = {
k: v
for k, v in obs_tensors.items()
if isinstance(k, str) and k.startswith("observation.")
k: v for k, v in obs_tensors.items() if isinstance(k, str) and k.startswith("observation.")
}
for k, v in list(observation.items()):
if isinstance(v, torch.Tensor):
@@ -914,155 +903,116 @@ def _build_robot_action_executor(
return _executor
def _dataset_task_strings(ds_meta: Any) -> list[str]:
"""Pull the unique task strings from a ``LeRobotDatasetMetadata``.
``ds_meta.tasks`` is a pandas DataFrame indexed by the task string;
return the index as a plain list (empty when no dataset / no tasks).
"""
if ds_meta is None:
return []
tasks = getattr(ds_meta, "tasks", None)
if tasks is None:
return []
try:
return [str(t) for t in list(tasks.index)]
except Exception: # noqa: BLE001
return []
def _select_task_interactively(ds_meta: Any, current_task: str | None) -> str | None:
"""Prompt the operator to pick a task from the dataset or type one.
Called at startup. ``current_task`` is whatever was already resolved
(``--task`` or the dataset's canonical task); it becomes the default
that an empty ``Enter`` selects, and is marked ``(current)`` in the
menu. Non-TTY / scripted runs return ``current_task`` unchanged so
the existing "first stdin line becomes the task" behaviour is kept.
"""
if not sys.stdin.isatty():
return current_task
tasks = _dataset_task_strings(ds_meta)
if not tasks:
prompt = "[smolvla2] Enter the task"
if current_task:
prompt += f" [Enter = {current_task!r}]"
try:
typed = input(prompt + ": ").strip()
except (EOFError, KeyboardInterrupt):
return current_task
return typed or current_task
print("[smolvla2] Select a task:", flush=True)
for i, task in enumerate(tasks, 1):
marker = " (current)" if task == current_task else ""
print(f" [{i}] {task}{marker}", flush=True)
print(" [c] type a custom task", flush=True)
hint = " (Enter = current)" if current_task else ""
try:
raw = input(f"task>{hint} ").strip()
except (EOFError, KeyboardInterrupt):
return current_task
if not raw:
return current_task or tasks[0]
if raw.lower() in {"c", "custom"}:
try:
return input("[smolvla2] Enter the task: ").strip() or current_task
except (EOFError, KeyboardInterrupt):
return current_task
if raw.isdigit():
idx = int(raw) - 1
if 0 <= idx < len(tasks):
return tasks[idx]
print("[smolvla2] invalid choice — keeping the current task", flush=True)
return current_task or tasks[0]
# Treat anything else as a custom task string typed directly.
return raw
def _select_mode_interactively() -> str:
"""Ask which mode to start in: ``action`` (run the robot) or
``question`` (VQA only, robot paused).
Shown at startup, before the task picker. Non-TTY / scripted runs
default to ``action`` so existing pipelines are unaffected.
"""
if not sys.stdin.isatty():
return "action"
print("[smolvla2] Start in which mode?", flush=True)
print(" [1] action — run the robot autonomously (default)", flush=True)
print(" [2] question — ask the VLM questions (VQA); robot stays paused", flush=True)
try:
raw = input("mode> (Enter = action) ").strip().lower()
except (EOFError, KeyboardInterrupt):
return "action"
if raw in {"2", "question", "q", "/question", "/q", "vlm", "vqa", "/vlm", "/vqa"}:
return "question"
return "action"
def _print_runtime_help() -> None:
"""Print the slash-command reference."""
print(
"[smolvla2] commands:\n"
" /action run the robot (default mode)\n"
" /action <seconds> run the robot for N seconds, then auto-pause to question\n"
" /question pause the action loop; typed lines become VQA questions\n"
' /action "task" run the robot; an argument switches to that task\n'
" /action resume the robot on the current task\n"
" /action <seconds> run the robot for N seconds, then auto-pause\n"
" /pause pause the action loop — robot holds position\n"
' /question "..." pause and answer one VQA question\n'
" /help show this help\n"
" task: <text> switch task (clears plan / memory / subtask)\n"
" rephrase: <text> reword the task in place\n"
" stop | quit | exit end the session",
flush=True,
)
def _handle_slash_command(runtime: Any, line: str) -> bool:
"""Handle ``/action`` / ``/question`` / ``/help``.
``/action`` accepts an optional duration ``/action 10`` runs the
robot for 10 seconds, then the autonomous loop auto-reverts to
``question`` mode. ``/vlm`` and ``/vqa`` are aliases for
``/question``. Returns ``True`` when ``line`` was a recognised
command (and was consumed), ``False`` otherwise.
"""
parts = line.strip().split()
if not parts:
def _is_number(text: str) -> bool:
"""True if ``text`` parses as a float (a ``/action`` duration arg)."""
try:
float(text)
return True
except ValueError:
return False
cmd = parts[0].lower()
if cmd in {"/action", "/act"}:
def _strip_quotes(text: str) -> str:
"""Strip one pair of surrounding quotes from a command argument."""
text = text.strip()
if len(text) >= 2 and text[0] == text[-1] and text[0] in {'"', "'"}:
return text[1:-1].strip()
return text
def _clear_action_queue(runtime: Any) -> None:
"""Drop any queued action chunk so nothing fires while paused."""
queue = runtime.state.get("action_queue")
if hasattr(queue, "clear"):
queue.clear()
def _handle_slash_command(runtime: Any, line: str) -> bool:
"""Dispatch the runtime slash commands.
``/action ["task"]`` run the robot; a quoted/bare argument sets a
new task, a bare number is a timed burst
(seconds), no argument resumes the current
task.
``/pause`` pause the action loop the robot holds.
``/question "text"`` pause and answer one VQA question.
``/help`` print the command reference.
Returns ``True`` when ``line`` was a recognised command (consumed).
"""
stripped = line.strip()
if not stripped.startswith("/"):
return False
head, _, rest = stripped.partition(" ")
cmd = head.lower()
rest = _strip_quotes(rest)
if cmd in {"/action", "/act", "/run"}:
runtime.state["mode"] = "action"
seconds: float | None = None
if len(parts) > 1:
try:
seconds = float(parts[1])
except ValueError:
seconds = None
if seconds is not None and seconds > 0:
if rest and _is_number(rest):
import time as _time # noqa: PLC0415
runtime.state["action_deadline"] = _time.monotonic() + seconds
secs = float(rest)
runtime.state["action_deadline"] = _time.monotonic() + secs
print(
f"[smolvla2] mode: action — running for {seconds:g}s, "
"then back to question",
f"[smolvla2] action — running {secs:g}s, then auto-pause",
flush=True,
)
else:
runtime.state["action_deadline"] = None
print("[smolvla2] mode: action — robot running", flush=True)
if rest:
runtime.set_task(rest)
# New task → drop the stale subtask so the high-level
# loop regenerates one for the new goal.
runtime.state["current_subtask"] = None
print(f"[smolvla2] action — task: {rest!r}", flush=True)
elif runtime.state.get("task"):
print(
f"[smolvla2] action — resuming: {runtime.state['task']!r}",
flush=True,
)
else:
runtime.state["mode"] = "paused"
print(
'[smolvla2] no task set — use /action "your task"',
flush=True,
)
return True
if cmd in {"/question", "/q", "/vlm", "/vqa"}:
runtime.state["mode"] = "question"
if cmd in {"/pause", "/p"}:
runtime.state["mode"] = "paused"
runtime.state["action_deadline"] = None
# Drop any queued chunk so no stale action fires while paused.
queue = runtime.state.get("action_queue")
if hasattr(queue, "clear"):
queue.clear()
print(
"[smolvla2] mode: question — action loop paused; type VQA questions",
flush=True,
)
_clear_action_queue(runtime)
print("[smolvla2] paused — robot holding position", flush=True)
return True
if cmd in {"/question", "/q", "/ask", "/vqa", "/vlm"}:
# A question always pauses the action loop first so the policy
# is not used concurrently by the background runtime thread.
runtime.state["mode"] = "paused"
runtime.state["action_deadline"] = None
_clear_action_queue(runtime)
if not rest:
print('[smolvla2] usage: /question "your question"', flush=True)
return True
_run_vqa_query(runtime, rest)
return True
if cmd in {"/help", "/?"}:
_print_runtime_help()
return True
@@ -1072,8 +1022,8 @@ def _handle_slash_command(runtime: Any, line: str) -> bool:
def _run_vqa_query(runtime: Any, question: str) -> None:
"""Run one interactive VQA question against the runtime's policy.
Used by both loops when in ``/question`` mode the action loop is
paused so the policy is free for a synchronous VQA call.
Invoked by ``/question`` the action loop is paused first so the
policy is free for a synchronous VQA call.
"""
from lerobot.policies.smolvla2.inference.vqa import handle_vqa_query # noqa: PLC0415
@@ -1105,11 +1055,14 @@ def _run_autonomous(
import threading # noqa: PLC0415
import time # noqa: PLC0415
if not auto_start:
# Only gate on ENTER when the robot will actually move at startup
# (``--mode=action``). The default is paused — the command line
# comes up immediately and nothing moves until ``/action``.
if not auto_start and runtime.state.get("mode", "paused") == "action":
try:
input(
"[smolvla2] Robot connected. Press ENTER to start the autonomous "
"control loop, Ctrl+C to abort. "
"[smolvla2] Robot connected — starting in ACTION mode. "
"Press ENTER to begin, Ctrl+C to abort. "
)
except (EOFError, KeyboardInterrupt):
print("\n[smolvla2] aborted before start", flush=True)
@@ -1145,14 +1098,11 @@ def _run_autonomous(
runtime._flush_logs = _flush_into_scrollback # type: ignore[method-assign]
redraw = _make_state_panel_renderer(
runtime, mode_label="autonomous", scrollback=_scrollback
)
redraw = _make_state_panel_renderer(runtime, mode_label="autonomous", scrollback=_scrollback)
redraw()
print(
" [autonomous] type interjections / '?' questions on stdin; "
"/question for VQA mode, /action to resume, /help for commands, "
"'stop' or Ctrl+C to quit",
' [autonomous] /action "task" to run · /pause to stop · '
'/question "..." to ask · /help · stop',
flush=True,
)
@@ -1176,13 +1126,13 @@ def _run_autonomous(
# queue so the robot stops.
deadline = st.get("action_deadline")
if deadline is not None and time.monotonic() >= deadline:
st["mode"] = "question"
st["mode"] = "paused"
st["action_deadline"] = None
queue = st.get("action_queue")
if hasattr(queue, "clear"):
queue.clear()
print(
"\n[smolvla2] timed action elapsed — back to question mode",
"\n[smolvla2] timed action elapsed — paused",
flush=True,
)
else:
@@ -1195,9 +1145,7 @@ def _run_autonomous(
pass
_panel_stop.wait(0.7)
panel_thread = threading.Thread(
target=_panel_loop, name="smolvla2-panel-redraw", daemon=True
)
panel_thread = threading.Thread(target=_panel_loop, name="smolvla2-panel-redraw", daemon=True)
panel_thread.start()
try:
@@ -1211,72 +1159,27 @@ def _run_autonomous(
lower = line.lower()
if lower in {"stop", "quit", "exit"}:
break
# Slash commands (/action, /question, /help) flip the run mode.
# The runtime is command-driven: /action "task", /pause,
# /question "...", /help. ``_handle_slash_command`` runs the
# VQA query inline for /question (the action loop is paused
# first, so the policy isn't in concurrent use).
if _handle_slash_command(runtime, line):
# Redraw once so the panel reflects the new mode. In
# ``/question`` the timer redraw is now suspended, so
# this is the last clear — the VQA prompt stays stable.
try:
redraw()
except Exception: # noqa: BLE001
pass
if runtime.state.get("mode") == "question":
print(
" [question] type a VQA question and press Enter; "
"/action to resume the robot.",
flush=True,
)
continue
# ``task: <text>`` always overrides the active task — both
# at first set and to switch tasks mid-run. Without the
# prefix and with a task already set, an utterance becomes
# either a VQA query (ends in ``?``) or an interjection
# (the user_interjection_response recipe — generates a
# fresh plan + ``<say>`` paired with the new instruction).
# Typing a rephrasing of the current task as an
# interjection is the trained way to redirect without
# resetting the high-level plan from scratch.
# ``task: <text>`` — full task switch, clears plan/memory/subtask
# ``rephrase: <text>`` — swap the task string in place,
# keep plan/memory/subtask. Tests
# prompt robustness from the
# n_task_rephrasings training
# augmentation: the model should
# behave the same on equivalent
# phrasings of the same task.
# bare line ending in ``?`` — VQA
# bare line — interjection
if lower.startswith("task:"):
new_task = line[5:].strip()
if new_task:
runtime.set_task(new_task)
runtime.state["current_plan"] = None
runtime.state["current_memory"] = None
runtime.state["current_subtask"] = None
continue
if lower.startswith("rephrase:"):
rephrased = line[len("rephrase:"):].strip()
if rephrased:
runtime.state["task"] = rephrased
runtime.state.setdefault("log_lines", []).append(
f"Task rephrased: {rephrased} (plan/memory preserved)"
)
continue
if not runtime.state.get("task"):
runtime.set_task(line)
continue
# ``/question`` mode: the whole line is a VQA question,
# handled synchronously (the action loop is paused so the
# policy is not in concurrent use by the background thread).
if runtime.state.get("mode", "action") == "question":
_run_vqa_query(runtime, line)
continue
if lower.endswith("?"):
runtime.state["recent_vqa_query"] = line
runtime.state.setdefault("events_this_tick", []).append("user_vqa_query")
else:
# A bare (non-slash) line is treated as a user interjection
# — the trained ``user_interjection_response`` path. ``stop``
# already handled above; everything else routes here.
if runtime.state.get("task"):
runtime.state["recent_interjection"] = line
runtime.state.setdefault("events_this_tick", []).append("user_interjection")
else:
print(
'[smolvla2] no task yet — use /action "your task" to start',
flush=True,
)
except KeyboardInterrupt:
print("\n[smolvla2] interrupt — stopping", flush=True)
finally:
@@ -1317,26 +1220,21 @@ def _make_state_panel_renderer(
console.clear()
st = runtime.state
run_mode = st.get("mode", "action")
mode_tag = (
"[green]mode: action[/]"
if run_mode == "action"
else "[yellow]mode: question (action loop paused)[/]"
)
console.rule(
f"[bold]SmolVLA2[/] · {mode_label} · {mode_tag}", style="cyan"
)
mode_tag = "[green]mode: action[/]" if run_mode == "action" else "[yellow]mode: paused[/]"
console.rule(f"[bold]SmolVLA2[/] · {mode_label} · {mode_tag}", style="cyan")
# Always-visible command hint so the operator never has to
# remember the slash commands (the one-shot startup line scrolls
# away under the timer redraw).
# remember the slash commands.
if run_mode == "action":
console.print(
" [dim]commands:[/] [bold]/question[/] ask a VQA question · "
"[bold]/help[/] all commands · [bold]stop[/] quit"
" [dim]commands:[/] [bold]/pause[/] stop · "
'[bold]/question "..."[/bold] ask · [bold]/help[/] · '
"[bold]stop[/]"
)
else:
console.print(
" [yellow]VQA mode[/] — type a question + Enter; "
"[bold]/action[/] resumes the robot."
' [dim]commands:[/] [bold]/action "task"[/bold] run · '
'[bold]/question "..."[/bold] ask · [bold]/help[/] · '
"[bold]stop[/]"
)
for key, label in (
("task", "task"),
@@ -1351,8 +1249,7 @@ def _make_state_panel_renderer(
console.print(f" [dim]{label:<8} (not set)[/]")
queue_len = (
len(st["action_queue"])
if isinstance(st.get("action_queue"), (list, tuple))
or hasattr(st.get("action_queue"), "__len__")
if isinstance(st.get("action_queue"), (list, tuple)) or hasattr(st.get("action_queue"), "__len__")
else 0
)
pending = len(st.get("tool_calls_pending") or [])
@@ -1381,11 +1278,7 @@ def _make_state_panel_renderer(
sub_empty = int(st.get("subtask_empty_count") or 0)
if raw_subtask is not None or sub_rep or sub_gib or sub_empty:
raw_display = (raw_subtask or "(empty)")[:80]
color = (
"yellow"
if (sub_rep >= 3 or sub_gib >= 3 or sub_empty >= 3)
else "dim"
)
color = "yellow" if (sub_rep >= 3 or sub_gib >= 3 or sub_empty >= 3) else "dim"
console.print(
f" [{color}]subtask diag repeat:{sub_rep} "
f"gibberish:{sub_gib} empty:{sub_empty} "
@@ -1396,9 +1289,7 @@ def _make_state_panel_renderer(
mem_gib = int(st.get("memory_gibberish_count") or 0)
plan_gib = int(st.get("plan_gibberish_count") or 0)
if mem_gib or plan_gib:
console.print(
f" [dim]gen rejects memory:{mem_gib} plan:{plan_gib}[/]"
)
console.print(f" [dim]gen rejects memory:{mem_gib} plan:{plan_gib}[/]")
console.rule(style="cyan")
# Runtime scrollback — log lines pushed from generation steps
# (warnings, gibberish rejections, plan/say speech, vqa
@@ -1413,9 +1304,9 @@ def _make_state_panel_renderer(
console.print()
if not st.get("task"):
console.print(
" [dim]Type the task to begin. /question switches to VQA mode, "
"/action resumes the robot, /help lists commands. "
"Type 'stop' to exit.[/]"
' [dim]Type [bold]/action "your task"[/bold] to begin. '
'[bold]/question "..."[/bold] to ask, /help for commands, '
"stop to exit.[/]"
)
return _redraw
@@ -1497,16 +1388,10 @@ def main(argv: list[str] | None = None) -> int:
args.policy_path, args.dataset_repo_id
)
# Bootstrap canonical task / plan / memory / subtask from the
# dataset whenever one is provided — both REPL dry-run and
# autonomous robot mode benefit, since the model is memorised on
# the exact training prompts and matching wording is what gets
# recall to fire.
# Was a task given explicitly on the CLI? Captured before the
# dataset bootstrap fills ``args.task`` — an explicit ``--task``
# skips the startup task picker entirely.
cli_task_given = args.task is not None
# Bootstrap the canonical task from the dataset whenever one is
# provided, so ``/action`` (no argument) has a sensible task to
# resume. The model is memorised on the exact training wording, so
# matching it is what gets recall to fire.
bootstrap_state: dict[str, str] = {}
if args.dataset_repo_id is not None:
bootstrap_state = _bootstrap_state_from_dataset(
@@ -1517,22 +1402,15 @@ def main(argv: list[str] | None = None) -> int:
if bootstrap_state.get("task") and not args.task:
args.task = bootstrap_state["task"]
print(
f"[smolvla2] using canonical task from dataset: {args.task!r}",
f"[smolvla2] canonical task from dataset: {args.task!r}",
flush=True,
)
# Startup mode prompt — choose action (run the robot) vs question
# (VQA only) *before* the task picker. Skipped when ``--mode`` was
# passed on the CLI. Can still be flipped at runtime with
# /action /question.
startup_mode = args.mode or _select_mode_interactively()
# Startup task picker — list the dataset's tasks so the operator can
# pick one or type a custom task. Skipped when ``--task`` was passed
# explicitly on the CLI. Non-TTY runs keep the "first stdin line is
# the task" path.
if not cli_task_given:
args.task = _select_task_interactively(ds_meta, args.task)
# No startup promptsthe runtime is command-driven. It comes up at
# the command line in ``paused`` mode (robot idle) unless ``--mode``
# forces a mode. The operator drives it with /action, /pause and
# /question.
startup_mode = args.mode or "paused"
observation_provider: Callable[[], dict | None] | None = None
robot_executor: Callable[[Any], None] | None = None
@@ -1540,8 +1418,7 @@ def main(argv: list[str] | None = None) -> int:
if autonomous_mode:
print(
f"[smolvla2] connecting to robot.type={args.robot_type} "
f"port={args.robot_port}",
f"[smolvla2] connecting to robot.type={args.robot_type} port={args.robot_port}",
flush=True,
)
robot = _build_robot(
@@ -1662,8 +1539,7 @@ def _run_repl(runtime: Any, *, initial_task: str | None, max_ticks: int | None)
from rich.console import Console # noqa: PLC0415
except ImportError:
print(
"[smolvla2] rich is required for the interactive REPL. "
"`pip install rich` and re-run.",
"[smolvla2] rich is required for the interactive REPL. `pip install rich` and re-run.",
file=sys.stderr,
)
return 2
@@ -1692,21 +1568,10 @@ def _run_repl(runtime: Any, *, initial_task: str | None, max_ticks: int | None)
if lower in {"stop", "quit", "exit"}:
break
# Slash commands (/action, /question, /help) flip the run mode.
# Command-driven: /action "task", /pause, /question "...",
# /help. ``_handle_slash_command`` runs the VQA query inline
# for /question (single-threaded REPL — no concurrency).
if _handle_slash_command(runtime, line):
_redraw(last_logs)
continue
# ``/question`` mode: a typed line (that isn't a task
# command) is a VQA question — run it synchronously and skip
# the action pipeline tick entirely.
if (
runtime.state.get("task")
and runtime.state.get("mode", "action") == "question"
and not lower.startswith(("task:", "rephrase:"))
):
runtime.state["log_lines"] = []
_run_vqa_query(runtime, line)
last_logs = list(runtime.state.get("log_lines") or [])
_redraw(last_logs)
ticks_done += 1
@@ -1714,34 +1579,17 @@ def _run_repl(runtime: Any, *, initial_task: str | None, max_ticks: int | None)
break
continue
# Inject the user input as the right kind of event,
# then run a single pipeline tick to consume it.
if lower.startswith("task:"):
new_task = line[5:].strip()
if new_task:
runtime.set_task(new_task)
runtime.state["current_plan"] = None
runtime.state["current_memory"] = None
runtime.state["current_subtask"] = None
elif lower.startswith("rephrase:"):
rephrased = line[len("rephrase:"):].strip()
if rephrased:
runtime.state["task"] = rephrased
runtime.state.setdefault("log_lines", []).append(
f"Task rephrased: {rephrased} (plan/memory preserved)"
)
elif not runtime.state.get("task"):
runtime.set_task(line)
elif lower.endswith("?"):
runtime.state["recent_vqa_query"] = line
runtime.state.setdefault("events_this_tick", []).append(
"user_vqa_query"
)
else:
runtime.state["recent_interjection"] = line
runtime.state.setdefault("events_this_tick", []).append(
"user_interjection"
# A bare (non-slash) line is a user interjection — needs a
# task to be meaningful.
if not runtime.state.get("task"):
print(
'[smolvla2] no task yet — use /action "your task"',
flush=True,
)
_redraw(last_logs)
continue
runtime.state["recent_interjection"] = line
runtime.state.setdefault("events_this_tick", []).append("user_interjection")
last_logs = runtime.step_once() or []
_redraw(last_logs)