mirror of
https://github.com/huggingface/lerobot.git
synced 2026-07-04 00:27:15 +00:00
Add Foxglove display mode for teleoperate
Add a --display_mode flag (rerun|foxglove) to lerobot-teleoperate. When set to foxglove, stream observations/actions over a Foxglove WebSocket server: images as RawImage/CompressedImage, scalars as typed JSON channels with schemas generated from the feature names (sanitized so paths don't need quoting). Adds a `foxglove` extra. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
committed by
CarolinePascal
parent
2f2b567951
commit
3062bf0548
@@ -126,6 +126,9 @@ hardware = [
|
||||
viz = [
|
||||
"rerun-sdk>=0.24.0,<0.34.0",
|
||||
]
|
||||
foxglove = [
|
||||
"foxglove-sdk>=0.24.0,<1.0.0",
|
||||
]
|
||||
# ── User-facing composite extras (map to CLI scripts) ─────
|
||||
# lerobot-record, lerobot-replay, lerobot-calibrate, lerobot-teleoperate, etc.
|
||||
core_scripts = ["lerobot[dataset]", "lerobot[hardware]", "lerobot[viz]"]
|
||||
|
||||
@@ -31,6 +31,22 @@ lerobot-teleoperate \
|
||||
--display_data=true
|
||||
```
|
||||
|
||||
To stream the data to Foxglove instead of Rerun, add ``--display_mode=foxglove`` (then connect the
|
||||
Foxglove app to ``ws://127.0.0.1:8765``; override the port with ``--display_port=<port>``):
|
||||
|
||||
```shell
|
||||
lerobot-teleoperate \
|
||||
--robot.type=so101_follower \
|
||||
--robot.port=/dev/tty.usbmodem58760431541 \
|
||||
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}" \
|
||||
--robot.id=black \
|
||||
--teleop.type=so101_leader \
|
||||
--teleop.port=/dev/tty.usbmodem58760431551 \
|
||||
--teleop.id=blue \
|
||||
--display_data=true \
|
||||
--display_mode=foxglove
|
||||
```
|
||||
|
||||
Example teleoperation with bimanual so100:
|
||||
|
||||
```shell
|
||||
@@ -108,7 +124,11 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
from lerobot.utils.utils import init_logging, move_cursor_up
|
||||
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data, shutdown_rerun
|
||||
from lerobot.utils.visualization_utils import (
|
||||
init_visualization,
|
||||
log_visualization_data,
|
||||
shutdown_visualization,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -121,11 +141,14 @@ class TeleoperateConfig:
|
||||
teleop_time_s: float | None = None
|
||||
# Display all cameras on screen
|
||||
display_data: bool = False
|
||||
# Display data on a remote Rerun server
|
||||
# Visualization backend used when display_data is True: "rerun" or "foxglove".
|
||||
# "foxglove" starts a WebSocket server (default ws://127.0.0.1:8765) to stream data to the Foxglove app.
|
||||
display_mode: str = "rerun"
|
||||
# For "rerun": IP of a remote Rerun server to connect to. Unused by "foxglove".
|
||||
display_ip: str | None = None
|
||||
# Port of the remote Rerun server
|
||||
# For "rerun": port of the remote Rerun server. For "foxglove": port to bind the WebSocket server to.
|
||||
display_port: int | None = None
|
||||
# Whether to display compressed images in Rerun
|
||||
# Whether to display compressed (JPEG) images instead of raw frames
|
||||
display_compressed_images: bool = False
|
||||
|
||||
|
||||
@@ -137,6 +160,7 @@ def teleop_loop(
|
||||
robot_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
|
||||
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
|
||||
display_data: bool = False,
|
||||
display_mode: str = "rerun",
|
||||
duration: float | None = None,
|
||||
display_compressed_images: bool = False,
|
||||
):
|
||||
@@ -149,8 +173,10 @@ def teleop_loop(
|
||||
teleop: The teleoperator device instance providing control actions.
|
||||
robot: The robot instance being controlled.
|
||||
fps: The target frequency for the control loop in frames per second.
|
||||
display_data: If True, fetches robot observations and displays them in the console and Rerun.
|
||||
display_compressed_images: If True, compresses images before sending them to Rerun for display.
|
||||
display_data: If True, fetches robot observations and displays them in the console and the
|
||||
visualization backend.
|
||||
display_mode: Visualization backend to use when display_data is True ("rerun" or "foxglove").
|
||||
display_compressed_images: If True, compresses images before sending them to the backend for display.
|
||||
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
|
||||
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
|
||||
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
|
||||
@@ -187,7 +213,8 @@ def teleop_loop(
|
||||
# Process robot observation through pipeline
|
||||
obs_transition = robot_observation_processor(obs)
|
||||
|
||||
log_rerun_data(
|
||||
log_visualization_data(
|
||||
display_mode,
|
||||
observation=obs_transition,
|
||||
action=teleop_action,
|
||||
compress_images=display_compressed_images,
|
||||
@@ -215,7 +242,9 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
if cfg.display_data:
|
||||
init_rerun(session_name="teleoperation", ip=cfg.display_ip, port=cfg.display_port)
|
||||
init_visualization(
|
||||
cfg.display_mode, session_name="teleoperation", ip=cfg.display_ip, port=cfg.display_port
|
||||
)
|
||||
display_compressed_images = (
|
||||
True
|
||||
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
|
||||
@@ -235,6 +264,7 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
robot=robot,
|
||||
fps=cfg.fps,
|
||||
display_data=cfg.display_data,
|
||||
display_mode=cfg.display_mode,
|
||||
duration=cfg.teleop_time_s,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
@@ -245,7 +275,7 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
pass
|
||||
finally:
|
||||
if cfg.display_data:
|
||||
shutdown_rerun()
|
||||
shutdown_visualization(cfg.display_mode)
|
||||
teleop.disconnect()
|
||||
robot.disconnect()
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
import numbers
|
||||
import os
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -22,6 +23,12 @@ from lerobot.types import RobotAction, RobotObservation
|
||||
from .constants import ACTION, ACTION_PREFIX, OBS_PREFIX, OBS_STR
|
||||
from .import_utils import require_package
|
||||
|
||||
# Module-level Foxglove state. A single WebSocket server is shared for the
|
||||
# process lifetime, and image channels are cached by topic (the Foxglove SDK
|
||||
# requires reusing one channel per topic).
|
||||
_foxglove_server = None
|
||||
_foxglove_channels: dict = {}
|
||||
|
||||
|
||||
def init_rerun(
|
||||
session_name: str = "lerobot_control_loop", ip: str | None = None, port: int | None = None
|
||||
@@ -59,6 +66,37 @@ def shutdown_rerun() -> None:
|
||||
rr.rerun_shutdown()
|
||||
|
||||
|
||||
def init_foxglove(host: str = "127.0.0.1", port: int | None = 8765) -> None:
|
||||
"""
|
||||
Starts a Foxglove WebSocket server for visualizing the control loop.
|
||||
|
||||
Connect to it from the Foxglove app at ``ws://<host>:<port>``. Calling this
|
||||
more than once is a no-op while a server is already running.
|
||||
|
||||
Args:
|
||||
host: Host interface to bind the WebSocket server to.
|
||||
port: Port to bind the WebSocket server to (defaults to 8765).
|
||||
"""
|
||||
|
||||
require_package("foxglove-sdk", extra="foxglove", import_name="foxglove")
|
||||
import foxglove
|
||||
|
||||
global _foxglove_server
|
||||
if _foxglove_server is not None:
|
||||
return
|
||||
_foxglove_server = foxglove.start_server(host=host, port=port or 8765)
|
||||
|
||||
|
||||
def shutdown_foxglove() -> None:
|
||||
"""Stops the Foxglove WebSocket server and clears cached channels."""
|
||||
|
||||
global _foxglove_server
|
||||
if _foxglove_server is not None:
|
||||
_foxglove_server.stop()
|
||||
_foxglove_server = None
|
||||
_foxglove_channels.clear()
|
||||
|
||||
|
||||
def _is_scalar(x):
|
||||
return isinstance(x, (float | numbers.Real | np.integer | np.floating)) or (
|
||||
isinstance(x, np.ndarray) and x.ndim == 0
|
||||
@@ -100,6 +138,41 @@ def _ensure_blueprint(observation_paths: set[str], action_paths: set[str], image
|
||||
rr.send_blueprint(blueprint)
|
||||
|
||||
|
||||
def _foxglove_safe_name(name: str) -> str:
|
||||
"""Make a feature name usable as an unquoted Foxglove message path / topic segment.
|
||||
|
||||
Foxglove message paths treat ``.`` as a field separator, so ``shoulder_pan.pos`` would have to be
|
||||
written as ``"shoulder_pan.pos"`` when plotting. Replacing ``.`` with ``_`` avoids the quoting.
|
||||
"""
|
||||
|
||||
return name.replace(".", "_")
|
||||
|
||||
|
||||
def _log_foxglove_scalars(topic: str, schema_name: str, values: dict[str, float]) -> None:
|
||||
"""Log a flat dict of scalars on a typed JSON channel, building the schema on first use.
|
||||
|
||||
The schema is derived from the keys of the first message (stable for a given robot/session) so
|
||||
Foxglove offers message-path autocomplete. ``additionalProperties`` keeps it permissive if a later
|
||||
message carries extra keys.
|
||||
"""
|
||||
|
||||
if not values:
|
||||
return
|
||||
|
||||
import foxglove
|
||||
|
||||
channel = _foxglove_channels.get(topic)
|
||||
if channel is None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"title": schema_name,
|
||||
"properties": {name: {"type": "number"} for name in values},
|
||||
"additionalProperties": {"type": "number"},
|
||||
}
|
||||
channel = _foxglove_channels[topic] = foxglove.Channel(topic, schema=schema, message_encoding="json")
|
||||
channel.log(values)
|
||||
|
||||
|
||||
def log_rerun_data(
|
||||
observation: RobotObservation | None = None,
|
||||
action: RobotAction | None = None,
|
||||
@@ -175,3 +248,162 @@ def log_rerun_data(
|
||||
action_paths.add(key)
|
||||
|
||||
_ensure_blueprint(observation_paths, action_paths, image_paths)
|
||||
|
||||
|
||||
def log_foxglove_data(
|
||||
observation: RobotObservation | None = None,
|
||||
action: RobotAction | None = None,
|
||||
compress_images: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Logs observation and action data to a Foxglove WebSocket server for real-time visualization.
|
||||
|
||||
Mirrors :func:`log_rerun_data` but emits Foxglove messages over the server started by
|
||||
:func:`init_foxglove`. Data is mapped as follows:
|
||||
- Scalars (and elements of 1D arrays) are accumulated per source and logged on the
|
||||
``/observation/state`` and ``/action/state`` topics as typed JSON messages. Each topic gets a
|
||||
schema generated from its field names so Foxglove provides message-path autocomplete. Field names
|
||||
are sanitized (``.`` -> ``_``) so they don't need quoting when plotting.
|
||||
- 3D NumPy arrays that resemble images are transposed from CHW to HWC when needed and logged on a
|
||||
per-source topic (e.g. ``/observation/images/front``) as a ``RawImage`` (or a JPEG
|
||||
``CompressedImage`` when ``compress_images`` is True).
|
||||
|
||||
Args:
|
||||
observation: An optional dictionary containing observation data to log.
|
||||
action: An optional dictionary containing action data to log.
|
||||
compress_images: Whether to JPEG-compress images before logging to save bandwidth in exchange
|
||||
for CPU and quality.
|
||||
"""
|
||||
|
||||
require_package("foxglove-sdk", extra="foxglove", import_name="foxglove")
|
||||
from foxglove.channels import CompressedImageChannel, RawImageChannel
|
||||
from foxglove.messages import CompressedImage, RawImage, Timestamp
|
||||
|
||||
if _foxglove_server is None:
|
||||
raise RuntimeError("init_foxglove() must be called before log_foxglove_data().")
|
||||
|
||||
now = time.time_ns()
|
||||
timestamp = Timestamp(sec=now // 1_000_000_000, nsec=now % 1_000_000_000)
|
||||
|
||||
def log_image(topic: str, frame_id: str, arr: np.ndarray) -> None:
|
||||
# Convert CHW -> HWC when needed (mirrors log_rerun_data).
|
||||
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4):
|
||||
arr = np.transpose(arr, (1, 2, 0))
|
||||
height, width = arr.shape[0], arr.shape[1]
|
||||
channels = 1 if arr.ndim == 2 else arr.shape[2]
|
||||
|
||||
if compress_images and channels == 3:
|
||||
import cv2
|
||||
|
||||
# Camera frames are RGB; cv2.imencode assumes BGR, so swap to keep colors correct.
|
||||
_, buf = cv2.imencode(".jpg", cv2.cvtColor(arr, cv2.COLOR_RGB2BGR))
|
||||
channel = _foxglove_channels.get(topic)
|
||||
if channel is None:
|
||||
channel = _foxglove_channels[topic] = CompressedImageChannel(topic=topic)
|
||||
channel.log(
|
||||
CompressedImage(timestamp=timestamp, frame_id=frame_id, data=buf.tobytes(), format="jpeg")
|
||||
)
|
||||
return
|
||||
|
||||
encoding = {1: "mono8", 3: "rgb8", 4: "rgba8"}.get(channels)
|
||||
if encoding is None:
|
||||
return
|
||||
arr = np.ascontiguousarray(arr, dtype=np.uint8)
|
||||
channel = _foxglove_channels.get(topic)
|
||||
if channel is None:
|
||||
channel = _foxglove_channels[topic] = RawImageChannel(topic=topic)
|
||||
channel.log(
|
||||
RawImage(
|
||||
timestamp=timestamp,
|
||||
frame_id=frame_id,
|
||||
width=width,
|
||||
height=height,
|
||||
encoding=encoding,
|
||||
step=width * channels,
|
||||
data=arr.tobytes(),
|
||||
)
|
||||
)
|
||||
|
||||
if observation:
|
||||
obs_scalars: dict[str, float] = {}
|
||||
for k, v in observation.items():
|
||||
if v is None:
|
||||
continue
|
||||
key = _foxglove_safe_name(k[len(OBS_PREFIX) :] if str(k).startswith(OBS_PREFIX) else str(k))
|
||||
if _is_scalar(v):
|
||||
obs_scalars[key] = float(v)
|
||||
elif isinstance(v, np.ndarray):
|
||||
if v.ndim == 1:
|
||||
for i, vi in enumerate(v):
|
||||
obs_scalars[f"{key}_{i}"] = float(vi)
|
||||
else:
|
||||
log_image(f"/{OBS_STR}/images/{key}", key, v)
|
||||
_log_foxglove_scalars(f"/{OBS_STR}/state", "lerobot.Observation", obs_scalars)
|
||||
|
||||
if action:
|
||||
action_scalars: dict[str, float] = {}
|
||||
for k, v in action.items():
|
||||
if v is None:
|
||||
continue
|
||||
key = _foxglove_safe_name(k[len(ACTION_PREFIX) :] if str(k).startswith(ACTION_PREFIX) else str(k))
|
||||
if _is_scalar(v):
|
||||
action_scalars[key] = float(v)
|
||||
elif isinstance(v, np.ndarray):
|
||||
for i, vi in enumerate(v.flatten()):
|
||||
action_scalars[f"{key}_{i}"] = float(vi)
|
||||
_log_foxglove_scalars(f"/{ACTION}/state", "lerobot.Action", action_scalars)
|
||||
|
||||
|
||||
# ── Backend-agnostic dispatch ─────────────────────────────────────────────
|
||||
# These let callers select a visualization backend at runtime via a string
|
||||
# (e.g. a `--display_mode` CLI flag) without branching on the backend everywhere.
|
||||
|
||||
VISUALIZATION_MODES = ("rerun", "foxglove")
|
||||
|
||||
|
||||
def init_visualization(
|
||||
display_mode: str,
|
||||
*,
|
||||
session_name: str = "lerobot_control_loop",
|
||||
ip: str | None = None,
|
||||
port: int | None = None,
|
||||
) -> None:
|
||||
"""Initializes the visualization backend selected by ``display_mode``.
|
||||
|
||||
For ``"rerun"``, ``ip``/``port`` point at an optional remote Rerun server. For ``"foxglove"``,
|
||||
``port`` is the local WebSocket server port (``ip`` is ignored; the server binds locally).
|
||||
"""
|
||||
|
||||
if display_mode == "rerun":
|
||||
init_rerun(session_name=session_name, ip=ip, port=port)
|
||||
elif display_mode == "foxglove":
|
||||
init_foxglove(port=port)
|
||||
else:
|
||||
raise ValueError(f"Unknown display_mode '{display_mode}'. Expected one of {VISUALIZATION_MODES}.")
|
||||
|
||||
|
||||
def log_visualization_data(
|
||||
display_mode: str,
|
||||
observation: RobotObservation | None = None,
|
||||
action: RobotAction | None = None,
|
||||
compress_images: bool = False,
|
||||
) -> None:
|
||||
"""Logs observation/action data to the backend selected by ``display_mode``."""
|
||||
|
||||
if display_mode == "rerun":
|
||||
log_rerun_data(observation=observation, action=action, compress_images=compress_images)
|
||||
elif display_mode == "foxglove":
|
||||
log_foxglove_data(observation=observation, action=action, compress_images=compress_images)
|
||||
else:
|
||||
raise ValueError(f"Unknown display_mode '{display_mode}'. Expected one of {VISUALIZATION_MODES}.")
|
||||
|
||||
|
||||
def shutdown_visualization(display_mode: str) -> None:
|
||||
"""Shuts down the backend selected by ``display_mode``."""
|
||||
|
||||
if display_mode == "rerun":
|
||||
shutdown_rerun()
|
||||
elif display_mode == "foxglove":
|
||||
shutdown_foxglove()
|
||||
else:
|
||||
raise ValueError(f"Unknown display_mode '{display_mode}'. Expected one of {VISUALIZATION_MODES}.")
|
||||
|
||||
Reference in New Issue
Block a user