From 25bb0f6d6ae53a2596e45bfdd3a7307cfc2a7bf3 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Tue, 28 Apr 2026 23:19:37 +0200 Subject: [PATCH] fix(annotate): lock-protect per-line writes for parallel server streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 8 server-streaming threads writing chars unsynchronized cause UTF-8 sequences from different servers to interleave mid-byte, garbling the terminal output. Switch to line-buffered reads with a single shared print lock — output stays readable, ready-marker detection still works on the line containing 'Uvicorn running' / 'Application startup complete'. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../steerable_pipeline/vlm_client.py | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/lerobot/annotations/steerable_pipeline/vlm_client.py b/src/lerobot/annotations/steerable_pipeline/vlm_client.py index 5dbbb3cb7..e55e9bae4 100644 --- a/src/lerobot/annotations/steerable_pipeline/vlm_client.py +++ b/src/lerobot/annotations/steerable_pipeline/vlm_client.py @@ -456,6 +456,9 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]: procs: list[subprocess.Popen] = [] ready_events: list[threading.Event] = [] ready_markers = ("Uvicorn running", "Application startup complete") + # Single lock for all server-stream threads so multibyte chars from + # different servers don't interleave and tear UTF-8 sequences. + print_lock = threading.Lock() base_cmd = config.serve_command or ( f"vllm serve {shlex.quote(config.model_id)} " @@ -489,24 +492,17 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]: ready_events.append(ready) def _stream(idx: int, p: subprocess.Popen, ev: threading.Event) -> None: + # Read whole lines and emit each line atomically under the + # shared print_lock so output from N servers stays readable. assert p.stdout is not None - buf = "" - tag_emitted = False - while True: - ch = p.stdout.read(1) - if ch == "": - return - if not tag_emitted: - sys.stdout.write(f"[server-{idx}] ") - tag_emitted = True - sys.stdout.write(ch) - sys.stdout.flush() - buf += ch - if ch in ("\n", "\r"): - if any(m in buf for m in ready_markers): - ev.set() - buf = "" - tag_emitted = False + for line in iter(p.stdout.readline, ""): + with print_lock: + sys.stdout.write(f"[server-{idx}] {line}") + if not line.endswith(("\n", "\r")): + sys.stdout.write("\n") + sys.stdout.flush() + if any(m in line for m in ready_markers): + ev.set() threading.Thread(target=_stream, args=(i, proc, ready), daemon=True).start()