feat(smolvla2): --mode flag, skip task picker with --task, timed /action

Lets the operator skip the interactive startup entirely and go straight
to the command line:

- New --mode {action,question} arg; when given, the startup mode prompt
  is skipped.
- When --task is passed explicitly on the CLI, the startup task picker
  is skipped (the dataset-bootstrap task still shows the picker so you
  can override it).

Also adds a timed action burst: /action <seconds> runs the robot for N
seconds, then the autonomous loop auto-reverts to question mode and
clears the action queue. Plain /action stays unlimited.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-05-18 14:26:12 +02:00
parent 7a68bf13d9
commit 516ffc7687
+85 -22
View File
@@ -139,7 +139,23 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
dest="task",
type=str,
default=None,
help="Initial task. If omitted, the first stdin line is treated as the task.",
help=(
"Initial task. When given, the startup task picker is skipped "
"and this task is used directly. If omitted, the picker is "
"shown (or the first stdin line is treated as the task)."
),
)
p.add_argument(
"--mode",
dest="mode",
type=str,
choices=["action", "question"],
default=None,
help=(
"Start-up run mode. 'action' runs the robot; 'question' starts "
"paused for VQA. When given, the startup mode prompt is "
"skipped. Can still be flipped at runtime with /action /question."
),
)
p.add_argument(
"--no_robot",
@@ -991,6 +1007,7 @@ def _print_runtime_help() -> None:
print(
"[smolvla2] commands:\n"
" /action run the robot (default mode)\n"
" /action <seconds> run the robot for N seconds, then auto-pause to question\n"
" /question pause the action loop; typed lines become VQA questions\n"
" /help show this help\n"
" task: <text> switch task (clears plan / memory / subtask)\n"
@@ -1003,17 +1020,40 @@ def _print_runtime_help() -> None:
def _handle_slash_command(runtime: Any, line: str) -> bool:
"""Handle ``/action`` / ``/question`` / ``/help``.
``/vlm`` and ``/vqa`` are kept as aliases for ``/question``. Returns
``True`` when ``line`` was a recognised command (and was consumed),
``False`` otherwise.
``/action`` accepts an optional duration — ``/action 10`` runs the
robot for 10 seconds, then the autonomous loop auto-reverts to
``question`` mode. ``/vlm`` and ``/vqa`` are aliases for
``/question``. Returns ``True`` when ``line`` was a recognised
command (and was consumed), ``False`` otherwise.
"""
cmd = line.strip().lower()
parts = line.strip().split()
if not parts:
return False
cmd = parts[0].lower()
if cmd in {"/action", "/act"}:
runtime.state["mode"] = "action"
print("[smolvla2] mode: action — robot running", flush=True)
seconds: float | None = None
if len(parts) > 1:
try:
seconds = float(parts[1])
except ValueError:
seconds = None
if seconds is not None and seconds > 0:
import time as _time # noqa: PLC0415
runtime.state["action_deadline"] = _time.monotonic() + seconds
print(
f"[smolvla2] mode: action — running for {seconds:g}s, "
"then back to question",
flush=True,
)
else:
runtime.state["action_deadline"] = None
print("[smolvla2] mode: action — robot running", flush=True)
return True
if cmd in {"/question", "/q", "/vlm", "/vqa"}:
runtime.state["mode"] = "question"
runtime.state["action_deadline"] = None
# Drop any queued chunk so no stale action fires while paused.
queue = runtime.state.get("action_queue")
if hasattr(queue, "clear"):
@@ -1129,14 +1169,30 @@ def _run_autonomous(
def _panel_loop() -> None:
while not _panel_stop.is_set():
if runtime.state.get("mode", "action") == "action":
try:
redraw()
# Re-print the prompt the redraw just cleared so the
# operator always has a visible ``> `` to type at.
print("> ", end="", flush=True)
except Exception: # noqa: BLE001
pass
st = runtime.state
if st.get("mode", "action") == "action":
# Timed burst (``/action <seconds>``): once the deadline
# passes, auto-revert to question mode and clear the
# queue so the robot stops.
deadline = st.get("action_deadline")
if deadline is not None and time.monotonic() >= deadline:
st["mode"] = "question"
st["action_deadline"] = None
queue = st.get("action_queue")
if hasattr(queue, "clear"):
queue.clear()
print(
"\n[smolvla2] timed action elapsed — back to question mode",
flush=True,
)
else:
try:
redraw()
# Re-print the prompt the redraw just cleared so
# the operator always has a visible ``> ``.
print("> ", end="", flush=True)
except Exception: # noqa: BLE001
pass
_panel_stop.wait(0.7)
panel_thread = threading.Thread(
@@ -1446,6 +1502,11 @@ def main(argv: list[str] | None = None) -> int:
# autonomous robot mode benefit, since the model is memorised on
# the exact training prompts and matching wording is what gets
# recall to fire.
# Was a task given explicitly on the CLI? Captured before the
# dataset bootstrap fills ``args.task`` — an explicit ``--task``
# skips the startup task picker entirely.
cli_task_given = args.task is not None
bootstrap_state: dict[str, str] = {}
if args.dataset_repo_id is not None:
bootstrap_state = _bootstrap_state_from_dataset(
@@ -1461,15 +1522,17 @@ def main(argv: list[str] | None = None) -> int:
)
# Startup mode prompt — choose action (run the robot) vs question
# (VQA only) *before* the task picker, so the operator sets intent
# up front. It can still be flipped any time with /action /question.
startup_mode = _select_mode_interactively()
# (VQA only) *before* the task picker. Skipped when ``--mode`` was
# passed on the CLI. Can still be flipped at runtime with
# /action /question.
startup_mode = args.mode or _select_mode_interactively()
# Always offer the startup task picker on an interactive terminal:
# list the dataset's tasks (the canonical / --task one shown as the
# default) so the operator can pick another or type a custom task.
# Non-TTY runs keep the "first stdin line is the task" path.
args.task = _select_task_interactively(ds_meta, args.task)
# Startup task picker — list the dataset's tasks so the operator can
# pick one or type a custom task. Skipped when ``--task`` was passed
# explicitly on the CLI. Non-TTY runs keep the "first stdin line is
# the task" path.
if not cli_task_given:
args.task = _select_task_interactively(ds_meta, args.task)
observation_provider: Callable[[], dict | None] | None = None
robot_executor: Callable[[Any], None] | None = None