From a3c0a8ca1a34ef51411eba0ad0f08bd93001e370 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Thu, 18 Jun 2026 18:35:45 -0700 Subject: [PATCH] Make Foxglove dataset playback loop the sole frame emitter Address review: the listener no longer emits frames, it only mutates playback state and queues a one-shot seek index that the playback loop services. The loop is now the only caller of emit_frame, so concurrent random access into the on-disk dataset / video decoder never overlaps. Also remove the dead server_holder and tighten the _foxglove_safe_name docstring to state what it does and why. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lerobot/utils/visualization_utils.py | 74 ++++++++++++++---------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/src/lerobot/utils/visualization_utils.py b/src/lerobot/utils/visualization_utils.py index d6d477a84..7f9d92d54 100644 --- a/src/lerobot/utils/visualization_utils.py +++ b/src/lerobot/utils/visualization_utils.py @@ -139,10 +139,10 @@ def _ensure_blueprint(observation_paths: set[str], action_paths: set[str], image def _foxglove_safe_name(name: str) -> str: - """Make a feature name usable as an unquoted Foxglove message path / topic segment. + """Replace ``.`` with ``_`` so a feature name is a single Foxglove topic-path segment. - Foxglove message paths treat ``.`` as a field separator, so ``shoulder_pan.pos`` would have to be - written as ``"shoulder_pan.pos"`` when plotting. Replacing ``.`` with ``_`` avoids the quoting. + Foxglove treats ``.`` as a path separator, so an unsanitized name like ``observation.images.front`` + would split into nested segments instead of naming one topic. """ return name.replace(".", "_") @@ -495,22 +495,29 @@ def serve_foxglove_dataset_playback( lock = threading.Lock() stop_event = threading.Event() - server_holder: dict = {} - # Shared playback state, guarded by ``lock``. - state = {"status": PlaybackStatus.Paused, "cursor": first_ns, "speed": 1.0, "last_idx": -1} + # Shared playback state, guarded by ``lock``. ``seek_idx`` is a one-shot request set by the + # listener and serviced by the playback loop, which is the *only* thread that emits frames (so + # concurrent random access into the on-disk dataset / video decoder never overlaps). + state = { + "status": PlaybackStatus.Paused, + "cursor": first_ns, + "speed": 1.0, + "last_idx": -1, + "seek_idx": None, + } def index_at(t_ns: int) -> int: return max(0, min(n_frames - 1, bisect.bisect_right(times_ns, t_ns) - 1)) class _PlaybackListener(ServerListener): def on_playback_control_request(self, req: PlaybackControlRequest): - emit_idx = None + # Only mutate state here; the playback loop performs all frame emission. with lock: did_seek = False if req.seek_time is not None: cursor = max(first_ns, min(last_ns, req.seek_time)) state["cursor"] = cursor - emit_idx = state["last_idx"] = index_at(cursor) + state["last_idx"] = state["seek_idx"] = index_at(cursor) did_seek = True if req.playback_speed and req.playback_speed > 0: state["speed"] = req.playback_speed @@ -518,15 +525,13 @@ def serve_foxglove_dataset_playback( # Restarting from the end replays from the beginning. if state["cursor"] >= last_ns: state["cursor"] = first_ns - emit_idx = state["last_idx"] = 0 + state["last_idx"] = state["seek_idx"] = 0 did_seek = True state["status"] = PlaybackStatus.Playing elif req.playback_command == PlaybackCommand.Pause: state["status"] = PlaybackStatus.Paused status, cursor, speed = state["status"], state["cursor"], state["speed"] request_id = req.request_id or "" - if emit_idx is not None: - emit_frame(emit_idx) return PlaybackState(status, cursor, speed, did_seek, request_id) server = foxglove.start_server( @@ -537,45 +542,56 @@ def serve_foxglove_dataset_playback( server_listener=_PlaybackListener(), playback_time_range=(first_ns, last_ns), ) - server_holder["server"] = server def playback_loop() -> None: prev = time.monotonic() while not stop_event.is_set(): time.sleep(1.0 / 60.0) + ended = False + speed = 1.0 with lock: now = time.monotonic() dt = now - prev prev = now - if state["status"] != PlaybackStatus.Playing: + # A queued seek is always serviced, even while paused, so scrubbing updates the view. + work = [] + seek_idx = state["seek_idx"] + if seek_idx is not None: + state["seek_idx"] = None + work.append(seek_idx) + if state["status"] == PlaybackStatus.Playing: + cursor = state["cursor"] + int(dt * 1e9 * state["speed"]) + start_idx = state["last_idx"] + 1 + if cursor >= last_ns: + cursor, target, ended = last_ns, n_frames - 1, True + else: + target = index_at(cursor) + state["cursor"] = cursor + work.extend(range(start_idx, target + 1)) + state["last_idx"] = max(state["last_idx"], target) + if ended: + state["status"] = PlaybackStatus.Ended + if not work: continue - cursor = state["cursor"] + int(dt * 1e9 * state["speed"]) - start_idx = state["last_idx"] + 1 - if cursor >= last_ns: - cursor, target, ended = last_ns, n_frames - 1, True - else: - target, ended = index_at(cursor), False - state["cursor"] = cursor - state["last_idx"] = max(state["last_idx"], target) - if ended: - state["status"] = PlaybackStatus.Ended - speed = state["speed"] - for i in range(start_idx, target + 1): + cursor, speed = state["cursor"], state["speed"] + # Emit outside the lock; this is the only thread that calls emit_frame. + for i in work: emit_frame(i) server.broadcast_time(cursor) if ended: server.broadcast_playback_state(PlaybackState(PlaybackStatus.Ended, cursor, speed, False, "")) - thread = threading.Thread(target=playback_loop, name="foxglove-playback", daemon=True) - thread.start() - - # Emit the first frame so channels are advertised and the viewer isn't blank before playback. + # Emit the first frame so channels are advertised (done before the loop starts, so emission stays + # single-threaded). Late-connecting clients re-receive frames once they seek/play. emit_frame(0) with lock: state["last_idx"] = 0 server.broadcast_time(first_ns) server.broadcast_playback_state(PlaybackState(PlaybackStatus.Paused, first_ns, 1.0, True, "")) + thread = threading.Thread(target=playback_loop, name="foxglove-playback", daemon=True) + thread.start() + print(f"Foxglove server running. Connect the Foxglove app to ws://{host}:{port}") print("Use the playback controls in Foxglove to play/pause and scrub the episode. Ctrl-C to exit.") try: