diff --git a/src/lerobot/annotations/steerable_pipeline/vlm_client.py b/src/lerobot/annotations/steerable_pipeline/vlm_client.py index 5dbbb3cb7..e55e9bae4 100644 --- a/src/lerobot/annotations/steerable_pipeline/vlm_client.py +++ b/src/lerobot/annotations/steerable_pipeline/vlm_client.py @@ -456,6 +456,9 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]: procs: list[subprocess.Popen] = [] ready_events: list[threading.Event] = [] ready_markers = ("Uvicorn running", "Application startup complete") + # Single lock for all server-stream threads so multibyte chars from + # different servers don't interleave and tear UTF-8 sequences. + print_lock = threading.Lock() base_cmd = config.serve_command or ( f"vllm serve {shlex.quote(config.model_id)} " @@ -489,24 +492,17 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]: ready_events.append(ready) def _stream(idx: int, p: subprocess.Popen, ev: threading.Event) -> None: + # Read whole lines and emit each line atomically under the + # shared print_lock so output from N servers stays readable. assert p.stdout is not None - buf = "" - tag_emitted = False - while True: - ch = p.stdout.read(1) - if ch == "": - return - if not tag_emitted: - sys.stdout.write(f"[server-{idx}] ") - tag_emitted = True - sys.stdout.write(ch) - sys.stdout.flush() - buf += ch - if ch in ("\n", "\r"): - if any(m in buf for m in ready_markers): - ev.set() - buf = "" - tag_emitted = False + for line in iter(p.stdout.readline, ""): + with print_lock: + sys.stdout.write(f"[server-{idx}] {line}") + if not line.endswith(("\n", "\r")): + sys.stdout.write("\n") + sys.stdout.flush() + if any(m in line for m in ready_markers): + ev.set() threading.Thread(target=_stream, args=(i, proc, ready), daemon=True).start()