mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-17 01:30:14 +00:00
fix(annotate): lock-protect per-line writes for parallel server streams
8 server-streaming threads writing chars unsynchronized cause UTF-8 sequences from different servers to interleave mid-byte, garbling the terminal output. Switch to line-buffered reads with a single shared print lock — output stays readable, ready-marker detection still works on the line containing 'Uvicorn running' / 'Application startup complete'. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -456,6 +456,9 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]:
|
||||
procs: list[subprocess.Popen] = []
|
||||
ready_events: list[threading.Event] = []
|
||||
ready_markers = ("Uvicorn running", "Application startup complete")
|
||||
# Single lock for all server-stream threads so multibyte chars from
|
||||
# different servers don't interleave and tear UTF-8 sequences.
|
||||
print_lock = threading.Lock()
|
||||
|
||||
base_cmd = config.serve_command or (
|
||||
f"vllm serve {shlex.quote(config.model_id)} "
|
||||
@@ -489,24 +492,17 @@ def _spawn_parallel_inference_servers(config: VlmConfig) -> list[str]:
|
||||
ready_events.append(ready)
|
||||
|
||||
def _stream(idx: int, p: subprocess.Popen, ev: threading.Event) -> None:
|
||||
# Read whole lines and emit each line atomically under the
|
||||
# shared print_lock so output from N servers stays readable.
|
||||
assert p.stdout is not None
|
||||
buf = ""
|
||||
tag_emitted = False
|
||||
while True:
|
||||
ch = p.stdout.read(1)
|
||||
if ch == "":
|
||||
return
|
||||
if not tag_emitted:
|
||||
sys.stdout.write(f"[server-{idx}] ")
|
||||
tag_emitted = True
|
||||
sys.stdout.write(ch)
|
||||
sys.stdout.flush()
|
||||
buf += ch
|
||||
if ch in ("\n", "\r"):
|
||||
if any(m in buf for m in ready_markers):
|
||||
ev.set()
|
||||
buf = ""
|
||||
tag_emitted = False
|
||||
for line in iter(p.stdout.readline, ""):
|
||||
with print_lock:
|
||||
sys.stdout.write(f"[server-{idx}] {line}")
|
||||
if not line.endswith(("\n", "\r")):
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
if any(m in line for m in ready_markers):
|
||||
ev.set()
|
||||
|
||||
threading.Thread(target=_stream, args=(i, proc, ready), daemon=True).start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user