Make Foxglove dataset playback loop the sole frame emitter

Address review: the listener no longer emits frames, it only mutates
playback state and queues a one-shot seek index that the playback loop
services. The loop is now the only caller of emit_frame, so concurrent
random access into the on-disk dataset / video decoder never overlaps.

Also remove the dead server_holder and tighten the _foxglove_safe_name
docstring to state what it does and why.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Roman Shtylman
2026-06-18 18:35:45 -07:00
committed by CarolinePascal
parent 77c9983a7d
commit a3c0a8ca1a
+45 -29
View File
@@ -139,10 +139,10 @@ def _ensure_blueprint(observation_paths: set[str], action_paths: set[str], image
def _foxglove_safe_name(name: str) -> str:
"""Make a feature name usable as an unquoted Foxglove message path / topic segment.
"""Replace ``.`` with ``_`` so a feature name is a single Foxglove topic-path segment.
Foxglove message paths treat ``.`` as a field separator, so ``shoulder_pan.pos`` would have to be
written as ``"shoulder_pan.pos"`` when plotting. Replacing ``.`` with ``_`` avoids the quoting.
Foxglove treats ``.`` as a path separator, so an unsanitized name like ``observation.images.front``
would split into nested segments instead of naming one topic.
"""
return name.replace(".", "_")
@@ -495,22 +495,29 @@ def serve_foxglove_dataset_playback(
lock = threading.Lock()
stop_event = threading.Event()
server_holder: dict = {}
# Shared playback state, guarded by ``lock``.
state = {"status": PlaybackStatus.Paused, "cursor": first_ns, "speed": 1.0, "last_idx": -1}
# Shared playback state, guarded by ``lock``. ``seek_idx`` is a one-shot request set by the
# listener and serviced by the playback loop, which is the *only* thread that emits frames (so
# concurrent random access into the on-disk dataset / video decoder never overlaps).
state = {
"status": PlaybackStatus.Paused,
"cursor": first_ns,
"speed": 1.0,
"last_idx": -1,
"seek_idx": None,
}
def index_at(t_ns: int) -> int:
return max(0, min(n_frames - 1, bisect.bisect_right(times_ns, t_ns) - 1))
class _PlaybackListener(ServerListener):
def on_playback_control_request(self, req: PlaybackControlRequest):
emit_idx = None
# Only mutate state here; the playback loop performs all frame emission.
with lock:
did_seek = False
if req.seek_time is not None:
cursor = max(first_ns, min(last_ns, req.seek_time))
state["cursor"] = cursor
emit_idx = state["last_idx"] = index_at(cursor)
state["last_idx"] = state["seek_idx"] = index_at(cursor)
did_seek = True
if req.playback_speed and req.playback_speed > 0:
state["speed"] = req.playback_speed
@@ -518,15 +525,13 @@ def serve_foxglove_dataset_playback(
# Restarting from the end replays from the beginning.
if state["cursor"] >= last_ns:
state["cursor"] = first_ns
emit_idx = state["last_idx"] = 0
state["last_idx"] = state["seek_idx"] = 0
did_seek = True
state["status"] = PlaybackStatus.Playing
elif req.playback_command == PlaybackCommand.Pause:
state["status"] = PlaybackStatus.Paused
status, cursor, speed = state["status"], state["cursor"], state["speed"]
request_id = req.request_id or ""
if emit_idx is not None:
emit_frame(emit_idx)
return PlaybackState(status, cursor, speed, did_seek, request_id)
server = foxglove.start_server(
@@ -537,45 +542,56 @@ def serve_foxglove_dataset_playback(
server_listener=_PlaybackListener(),
playback_time_range=(first_ns, last_ns),
)
server_holder["server"] = server
def playback_loop() -> None:
prev = time.monotonic()
while not stop_event.is_set():
time.sleep(1.0 / 60.0)
ended = False
speed = 1.0
with lock:
now = time.monotonic()
dt = now - prev
prev = now
if state["status"] != PlaybackStatus.Playing:
# A queued seek is always serviced, even while paused, so scrubbing updates the view.
work = []
seek_idx = state["seek_idx"]
if seek_idx is not None:
state["seek_idx"] = None
work.append(seek_idx)
if state["status"] == PlaybackStatus.Playing:
cursor = state["cursor"] + int(dt * 1e9 * state["speed"])
start_idx = state["last_idx"] + 1
if cursor >= last_ns:
cursor, target, ended = last_ns, n_frames - 1, True
else:
target = index_at(cursor)
state["cursor"] = cursor
work.extend(range(start_idx, target + 1))
state["last_idx"] = max(state["last_idx"], target)
if ended:
state["status"] = PlaybackStatus.Ended
if not work:
continue
cursor = state["cursor"] + int(dt * 1e9 * state["speed"])
start_idx = state["last_idx"] + 1
if cursor >= last_ns:
cursor, target, ended = last_ns, n_frames - 1, True
else:
target, ended = index_at(cursor), False
state["cursor"] = cursor
state["last_idx"] = max(state["last_idx"], target)
if ended:
state["status"] = PlaybackStatus.Ended
speed = state["speed"]
for i in range(start_idx, target + 1):
cursor, speed = state["cursor"], state["speed"]
# Emit outside the lock; this is the only thread that calls emit_frame.
for i in work:
emit_frame(i)
server.broadcast_time(cursor)
if ended:
server.broadcast_playback_state(PlaybackState(PlaybackStatus.Ended, cursor, speed, False, ""))
thread = threading.Thread(target=playback_loop, name="foxglove-playback", daemon=True)
thread.start()
# Emit the first frame so channels are advertised and the viewer isn't blank before playback.
# Emit the first frame so channels are advertised (done before the loop starts, so emission stays
# single-threaded). Late-connecting clients re-receive frames once they seek/play.
emit_frame(0)
with lock:
state["last_idx"] = 0
server.broadcast_time(first_ns)
server.broadcast_playback_state(PlaybackState(PlaybackStatus.Paused, first_ns, 1.0, True, ""))
thread = threading.Thread(target=playback_loop, name="foxglove-playback", daemon=True)
thread.start()
print(f"Foxglove server running. Connect the Foxglove app to ws://{host}:{port}")
print("Use the playback controls in Foxglove to play/pause and scrub the episode. Ctrl-C to exit.")
try: