From 33a4b4a5a0134b36298491c23639393821ea2955 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 6 May 2026 18:30:56 +0200 Subject: [PATCH] feat(smolvla2): autonomous robot mode in lerobot-smolvla2-runtime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The runtime CLI was deliberately scoped to dry-run only: it hard-coded ``robot_executor=None`` and printed a "real-robot integration is a follow-up" warning even when ``--no_robot`` was omitted. The runtime *engine* was already structured for real-robot operation (separate ``LowLevelForward`` chunk-rate generation + ``DispatchAction`` ctrl-rate dispatch with a ``robot_executor`` hook); only the wiring was missing. Add the wiring: * ``_load_policy_and_preprocessor`` now also returns the postprocessor (action denormaliser). * ``--robot.type`` / ``--robot.port`` / ``--robot.id`` / ``--robot.cameras`` (JSON) build a ``Robot`` via ``make_robot_from_config`` and connect it. * ``_build_robot_observation_provider`` reads ``robot.get_observation()`` each call, drops the language columns (runtime drives messages itself), and runs the policy's preprocessor (rename → batch → device → normalise). * ``_build_robot_action_executor`` postprocesses the policy's action tensor (denormalise), converts to the ``{joint: value}`` dict via ``make_robot_action(action, ds_meta.features)``, and calls ``robot.send_action(...)``. Optional ``--max_action_norm`` safety clip rejects ticks whose action L2 norm exceeds the threshold (kill-switch when bringing up a new robot). * ``_run_autonomous`` runs ``runtime.run()`` in a background thread (the policy must keep generating chunks at chunk_hz and dispatching at ctrl_hz regardless of stdin) and handles user interjections / VQA queries from the foreground stdin loop. Confirmation prompt before start (skip with ``--auto_start``); Ctrl+C stops the thread and disconnects the robot cleanly. * Autonomous mode requires ``--dataset.repo_id`` for action stats / feature shapes — pass the same dataset the policy was trained on. The bootstrap path that pulls canonical task / plan / memory runs in both REPL and autonomous modes so the model's first prompt matches training distribution. Dry-run REPL behaviour is unchanged when ``--robot.type`` is not passed. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../scripts/lerobot_smolvla2_runtime.py | 446 +++++++++++++++--- 1 file changed, 377 insertions(+), 69 deletions(-) diff --git a/src/lerobot/scripts/lerobot_smolvla2_runtime.py b/src/lerobot/scripts/lerobot_smolvla2_runtime.py index 45e0cf23d..fc3db0e78 100644 --- a/src/lerobot/scripts/lerobot_smolvla2_runtime.py +++ b/src/lerobot/scripts/lerobot_smolvla2_runtime.py @@ -131,6 +131,75 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: action="store_true", help="Skip robot connection — language-only / dry-run mode.", ) + # --- Real-robot mode args ---------------------------------------- + # Setting ``--robot.type`` flips the runtime into autonomous mode: + # it connects to the robot, builds an observation provider that + # reads ``robot.get_observation()`` instead of dataset frames, and + # an action executor that postprocesses (denormalises) the policy's + # output and calls ``robot.send_action(...)`` at ``--ctrl_hz``. The + # high-level REPL-style stdin still works in a background thread + # for interjections / VQA. + p.add_argument( + "--robot.type", + dest="robot_type", + type=str, + default=None, + help=( + "Robot config choice (e.g. ``so101``, ``so101_follower``). " + "When set, the runtime drives the actual robot at " + "``--ctrl_hz`` instead of running the dataset-driven dry-run " + "REPL. Implies ``--autonomous`` unless ``--no_robot`` is also " + "passed (in which case the flag is ignored). See " + "``lerobot.robots`` for available choices." + ), + ) + p.add_argument( + "--robot.port", + dest="robot_port", + type=str, + default=None, + help="Serial port for the robot (e.g. ``/dev/tty.usbmodem...``).", + ) + p.add_argument( + "--robot.id", + dest="robot_id", + type=str, + default=None, + help="Optional robot identifier (passed through to ``RobotConfig.id``).", + ) + p.add_argument( + "--robot.cameras", + dest="robot_cameras", + type=str, + default=None, + help=( + "Optional JSON dict describing camera configs to attach to " + "the robot (e.g. ``'{\"top\": {\"type\": \"opencv\", \"index\": 0}}'``). " + "Camera keys MUST match the ``observation.images.*`` features " + "the policy was trained on." + ), + ) + p.add_argument( + "--max_action_norm", + dest="max_action_norm", + type=float, + default=None, + help=( + "Safety clip: reject any individual action whose L2 norm " + "exceeds this value. Default ``None`` = no clipping. Useful " + "as a kill-switch when bringing up a new robot/task pair." + ), + ) + p.add_argument( + "--auto_start", + action="store_true", + help=( + "Skip the ``Press ENTER to start`` confirmation prompt before " + "the autonomous control loop begins. Off by default — having " + "to confirm catches a lot of stupid mistakes (wrong policy, " + "wrong robot, robot not at home pose)." + ), + ) p.add_argument( "--no_tts", action="store_true", @@ -168,21 +237,13 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: def _load_policy_and_preprocessor( policy_path: str, dataset_repo_id: str | None, -) -> tuple[Any, Any, Any]: +) -> tuple[Any, Any, Any, Any]: """Load a SmolVLA2 checkpoint (local path or Hub repo id). - When ``dataset_repo_id`` is provided, the dataset's metadata is used - to derive policy features (matching the standard - ``make_policy(cfg, ds_meta=...)`` flow used by ``lerobot-train`` and - ``lerobot-record``). When it isn't, we fall back to instantiating - the policy directly via ``from_pretrained`` — this skips the - feature-derivation path that ``make_policy`` insists on, but also - means we can't load the saved preprocessor pipeline (which depends - on ``input_features`` / ``output_features``). For inference-only - dry-runs this is fine; the policy still loads. - - Returns ``(policy, preprocessor, ds_meta)`` where ``preprocessor`` - and ``ds_meta`` may be ``None`` if no dataset was provided. + Returns ``(policy, preprocessor, postprocessor, ds_meta)``. + ``preprocessor`` / ``postprocessor`` / ``ds_meta`` are ``None`` + when no dataset is provided (rare — needed for autonomous robot + mode to have action-denormalisation stats). """ from lerobot.configs import PreTrainedConfig # noqa: PLC0415 from lerobot.policies.factory import make_policy, make_pre_post_processors # noqa: PLC0415 @@ -192,34 +253,22 @@ def _load_policy_and_preprocessor( ds_meta = None preprocessor = None + postprocessor = None if dataset_repo_id is not None: from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata # noqa: PLC0415 ds_meta = LeRobotDatasetMetadata(dataset_repo_id) policy = make_policy(cfg, ds_meta=ds_meta) - # NOTE: we deliberately pass ``pretrained_path=None`` here even - # though the checkpoint ships a ``policy_preprocessor.json``. - # ``RenderMessagesStep`` carries a ``TrainingRecipe`` field that - # isn't faithfully serialized into that JSON, so the saved - # pipeline can't currently be round-tripped via - # ``PolicyProcessorPipeline.from_pretrained`` — it crashes with - # ``RenderMessagesStep.__init__() missing 1 required argument: - # 'recipe'``. Building fresh from ``cfg`` re-runs - # ``make_smolvla2_pre_post_processors``, which loads the recipe - # YAML referenced by ``cfg.recipe_path`` and wires it back into - # ``RenderMessagesStep`` correctly. Normalization stats come - # from ``ds_meta.stats`` (the same dataset the user is feeding - # into the runtime), so no quality loss in practice. - preprocessor, _ = make_pre_post_processors( + # ``pretrained_path=None`` rebuilds fresh — the saved + # ``policy_preprocessor.json`` doesn't round-trip + # ``RenderMessagesStep.recipe``. Stats come from the dataset + # the user is feeding through, so normalisation is consistent. + preprocessor, postprocessor = make_pre_post_processors( cfg, pretrained_path=None, dataset_stats=ds_meta.stats, ) else: - # No dataset: instantiate the policy class directly so we don't - # need ds_meta. This bypasses ``make_policy``'s feature-shape - # derivation, which is fine for a pretrained checkpoint where - # the saved config already carries those shapes. from lerobot.policies.factory import get_policy_class # noqa: PLC0415 policy_cls = get_policy_class(cfg.type) @@ -227,7 +276,7 @@ def _load_policy_and_preprocessor( policy.to(cfg.device) policy.eval() - return policy, preprocessor, ds_meta + return policy, preprocessor, postprocessor, ds_meta def _build_observation_provider( @@ -372,6 +421,230 @@ def _bootstrap_state_from_dataset( return out +def _build_robot( + *, + robot_type: str, + robot_port: str | None, + robot_id: str | None, + robot_cameras_json: str | None, +): + """Build and connect a robot from CLI args. + + Mirrors how ``lerobot-record`` builds a robot but takes the args + flat from argparse instead of through draccus, so the runtime + keeps its plain ``--key=value`` CLI surface. + """ + import json # noqa: PLC0415 + + from lerobot.robots import ( # noqa: PLC0415 + RobotConfig, + make_robot_from_config, + ) + + cls = RobotConfig.get_choice_class(robot_type) + kwargs: dict[str, Any] = {} + if robot_port: + kwargs["port"] = robot_port + if robot_id: + kwargs["id"] = robot_id + if robot_cameras_json: + try: + kwargs["cameras"] = json.loads(robot_cameras_json) + except json.JSONDecodeError as exc: + raise ValueError( + f"--robot.cameras must be a JSON object, got {robot_cameras_json!r}: {exc}" + ) from exc + cfg = cls(**kwargs) + robot = make_robot_from_config(cfg) + robot.connect() + return robot + + +def _build_robot_observation_provider( + *, + robot, + preprocessor: Any, + device: str, + task: str | None, +) -> Callable[[], dict | None]: + """Closure that reads from the robot, runs the policy preprocessor. + + Each call: ``robot.get_observation()`` → wrap as a flat sample dict + → drop language columns (the runtime drives messages itself) → + preprocessor (rename, batch dim, normalise, device-place) → return + the observation batch ready for ``policy.select_action`` and + ``policy.select_message``. + """ + import torch # noqa: PLC0415 + + def _provider() -> dict | None: + try: + raw = robot.get_observation() + except Exception as exc: # noqa: BLE001 + logger.warning("robot.get_observation failed: %s", exc) + return None + + sample: dict[str, Any] = dict(raw) + if task: + sample.setdefault("task", task) + # The render step expects either both language columns or + # neither — runtime supplies messages itself, so make sure + # nothing leaks through. + for k in ("language_persistent", "language_events"): + sample.pop(k, None) + + if preprocessor is not None: + try: + sample = preprocessor(sample) + except Exception as exc: # noqa: BLE001 + logger.warning("preprocessor failed on robot observation: %s", exc) + return None + + observation = { + k: v + for k, v in sample.items() + if isinstance(k, str) and k.startswith("observation.") + } + for k, v in list(observation.items()): + if isinstance(v, torch.Tensor): + observation[k] = v.to(device) + return observation + + return _provider + + +def _build_robot_action_executor( + *, + robot, + postprocessor: Any, + ds_meta: Any, + max_action_norm: float | None, +) -> Callable[[Any], None]: + """Closure that postprocesses an action and dispatches to the robot. + + Mirrors ``lerobot-record``'s ``predict_action`` tail: postprocess + (denormalise) → ``make_robot_action`` (tensor → ``{joint: value}`` + dict) → ``robot.send_action(...)``. Optional safety clip on the + action's L2 norm acts as a kill switch when bringing up a new + robot/task pair. + """ + import torch # noqa: PLC0415 + + from lerobot.policies.utils import make_robot_action # noqa: PLC0415 + + def _executor(action: Any) -> None: + try: + if postprocessor is not None: + action = postprocessor(action) + if isinstance(action, torch.Tensor): + if max_action_norm is not None: + norm = float(action.float().norm().item()) + if norm > max_action_norm: + logger.warning( + "action norm %.3f > max_action_norm=%.3f — " + "rejecting tick", + norm, max_action_norm, + ) + return + if action.ndim > 1 and action.shape[0] == 1: + action = action.squeeze(0) + action_dict = make_robot_action(action, ds_meta.features) + elif isinstance(action, dict): + action_dict = action + else: + logger.warning("unsupported action type %r — skipping", type(action)) + return + robot.send_action(action_dict) + except Exception as exc: # noqa: BLE001 + logger.error("robot.send_action failed: %s", exc, exc_info=True) + + return _executor + + +def _run_autonomous( + runtime: Any, + *, + robot, + auto_start: bool, + initial_task: str | None, + max_ticks: int | None, +) -> int: + """Drive the runtime continuously at ``ctrl_hz`` while accepting + stdin events in the foreground. + + Different from ``_run_repl`` (dataset dry-run): the policy needs + to keep generating action chunks at ``chunk_hz`` and dispatching + them at ``ctrl_hz`` regardless of whether the user is typing, so + ``runtime.run()`` runs in a background thread and stdin handling + happens here in the main thread. + """ + import threading # noqa: PLC0415 + import time # noqa: PLC0415 + + if not auto_start: + try: + input( + "[smolvla2] Robot connected. Press ENTER to start the autonomous " + "control loop, Ctrl+C to abort. " + ) + except (EOFError, KeyboardInterrupt): + print("\n[smolvla2] aborted before start", flush=True) + return 130 + + if initial_task: + runtime.set_task(initial_task) + + thread = threading.Thread( + target=runtime.run, + kwargs={"max_ticks": max_ticks}, + name="smolvla2-runtime-loop", + daemon=True, + ) + thread.start() + print( + "[smolvla2] autonomous loop running. Type interjections / " + "questions on stdin (Ctrl+C to stop).", + flush=True, + ) + + try: + while thread.is_alive(): + try: + line = input("> ").strip() + except EOFError: + break + if not line: + continue + lower = line.lower() + if lower in {"stop", "quit", "exit"}: + break + if not runtime.state.get("task"): + runtime.set_task(line[5:].strip() if lower.startswith("task:") else line) + continue + if lower.endswith("?"): + runtime.state["recent_vqa_query"] = line + runtime.state.setdefault("events_this_tick", []).append("user_vqa_query") + else: + runtime.state["recent_interjection"] = line + runtime.state.setdefault("events_this_tick", []).append("user_interjection") + except KeyboardInterrupt: + print("\n[smolvla2] interrupt — stopping", flush=True) + finally: + runtime.stop() + # Give the loop a moment to drain. + for _ in range(10): + if not thread.is_alive(): + break + time.sleep(0.1) + try: + robot.disconnect() + print("[smolvla2] robot disconnected", flush=True) + except Exception as exc: # noqa: BLE001 + print(f"[smolvla2] WARNING: robot.disconnect raised {exc}", flush=True) + + return 0 + + def _build_tools(no_tts: bool, tts_voice: str) -> dict[str, Any]: """Instantiate the tools declared on this dataset/policy.""" if no_tts: @@ -423,14 +696,69 @@ def main(argv: list[str] | None = None) -> int: ) _silence_noisy_loggers() + autonomous_mode = bool(args.robot_type) and not args.no_robot + if autonomous_mode and not args.dataset_repo_id: + print( + "[smolvla2] ERROR: autonomous robot mode requires --dataset.repo_id " + "for action-denormalisation stats and feature shapes. Pass the " + "same dataset the policy was trained on.", + file=sys.stderr, + ) + return 2 + print(f"[smolvla2] loading policy from {args.policy_path}", flush=True) - policy, preprocessor, _ds_meta = _load_policy_and_preprocessor( + policy, preprocessor, postprocessor, ds_meta = _load_policy_and_preprocessor( args.policy_path, args.dataset_repo_id ) - observation_provider: Callable[[], dict | None] | None = None + # Bootstrap canonical task / plan / memory / subtask from the + # dataset whenever one is provided — both REPL dry-run and + # autonomous robot mode benefit, since the model is memorised on + # the exact training prompts and matching wording is what gets + # recall to fire. bootstrap_state: dict[str, str] = {} if args.dataset_repo_id is not None: + bootstrap_state = _bootstrap_state_from_dataset( + dataset_repo_id=args.dataset_repo_id, + episode=args.dataset_episode, + start_frame=args.dataset_start_frame, + ) + if bootstrap_state.get("task") and not args.task: + args.task = bootstrap_state["task"] + print( + f"[smolvla2] using canonical task from dataset: {args.task!r}", + flush=True, + ) + + observation_provider: Callable[[], dict | None] | None = None + robot_executor: Callable[[Any], None] | None = None + robot = None + + if autonomous_mode: + print( + f"[smolvla2] connecting to robot.type={args.robot_type} " + f"port={args.robot_port}", + flush=True, + ) + robot = _build_robot( + robot_type=args.robot_type, + robot_port=args.robot_port, + robot_id=args.robot_id, + robot_cameras_json=args.robot_cameras, + ) + observation_provider = _build_robot_observation_provider( + robot=robot, + preprocessor=preprocessor, + device=str(getattr(policy.config, "device", "cpu")), + task=args.task, + ) + robot_executor = _build_robot_action_executor( + robot=robot, + postprocessor=postprocessor, + ds_meta=ds_meta, + max_action_norm=args.max_action_norm, + ) + elif args.dataset_repo_id is not None: print( f"[smolvla2] streaming observations from {args.dataset_repo_id} " f"episode={args.dataset_episode} " @@ -445,38 +773,11 @@ def main(argv: list[str] | None = None) -> int: preprocessor=preprocessor, device=str(getattr(policy.config, "device", "cpu")), ) - # Pull the dataset's canonical task + the persistent atoms in - # force at the chosen start frame. The model is heavily - # memorised on the *exact* training prompts (task wording, - # current plan, current memory) — feeding ad-hoc user - # alternatives gives it nothing to recall against, so it - # collapses to its dominant training mode (VQA JSON). Reading - # the canonical state straight from the dataset gives the - # runtime a starting point that lines up with training. - bootstrap_state = _bootstrap_state_from_dataset( - dataset_repo_id=args.dataset_repo_id, - episode=args.dataset_episode, - start_frame=args.dataset_start_frame, - ) - if bootstrap_state.get("task") and not args.task: - args.task = bootstrap_state["task"] - print( - f"[smolvla2] using canonical task from dataset: {args.task!r}", - flush=True, - ) tools = _build_tools(args.no_tts, args.tts_voice) if tools: print(f"[smolvla2] tools loaded: {list(tools)}", flush=True) - robot_executor = None - if not args.no_robot: - print( - "[smolvla2] WARNING: real-robot integration is a follow-up. " - "Running in dry-run mode for now (no actions executed).", - flush=True, - ) - from lerobot.policies.smolvla2.inference import SmolVLA2Runtime # noqa: PLC0415 runtime = SmolVLA2Runtime( @@ -485,10 +786,9 @@ def main(argv: list[str] | None = None) -> int: observation_provider=observation_provider, robot_executor=robot_executor, # No background event collector — the REPL drives ticks - # synchronously after each user input. The runtime's own - # ``run()`` loop is bypassed here in favour of ``step_once()`` - # so the input prompt and the live state panel co-exist - # cleanly. + # synchronously after each user input (REPL mode). Autonomous + # mode runs ``runtime.run()`` in a thread; stdin events are + # injected from the foreground. event_collector=None, chunk_hz=args.chunk_hz, ctrl_hz=args.ctrl_hz, @@ -496,10 +796,10 @@ def main(argv: list[str] | None = None) -> int: ) if args.task: runtime.set_task(args.task) - # Bootstrap plan/memory from the dataset so the first prompt the - # runtime builds matches what training rendered (task + active - # plan + active memory). Without this the runtime starts with - # plan/memory empty, which only matched the very-early frames in + # Seed plan/memory/subtask so the first prompt the runtime builds + # mirrors what training rendered (task + active plan + active + # memory + optional current subtask). Without this the runtime + # starts empty, which only matched the very-early frames during # training and is an out-of-distribution prompt for the rest. if bootstrap_state.get("plan"): runtime.state["current_plan"] = bootstrap_state["plan"] @@ -508,6 +808,14 @@ def main(argv: list[str] | None = None) -> int: if bootstrap_state.get("subtask"): runtime.state["current_subtask"] = bootstrap_state["subtask"] + if autonomous_mode: + return _run_autonomous( + runtime, + robot=robot, + auto_start=args.auto_start, + initial_task=args.task, + max_ticks=args.max_ticks, + ) return _run_repl(runtime, initial_task=args.task, max_ticks=args.max_ticks)