[skip ci] feat(visualize audio): adding audio recordings visualization in rerun

This commit is contained in:
CarolinePascal
2025-05-09 18:00:30 +02:00
parent 421fdcce96
commit af0294198a
4 changed files with 97 additions and 3 deletions
+1 -1
View File
@@ -43,7 +43,7 @@ def main():
keyboard.connect() keyboard.connect()
# Init rerun viewer # Init rerun viewer
init_rerun(session_name="lekiwi_teleop") init_rerun(session_name="lekiwi_teleop", robot=robot, reset_time=True)
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
raise ValueError("Robot or teleop is not connected!") raise ValueError("Robot or teleop is not connected!")
+7
View File
@@ -285,6 +285,13 @@ def record_loop(
display_data: bool = False, display_data: bool = False,
display_compressed_images: bool = False, display_compressed_images: bool = False,
): ):
if display_data:
init_rerun(
session_name="recording",
robot=robot,
reset_time=True,
)
if dataset is not None and dataset.fps != fps: if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
@@ -143,6 +143,12 @@ def teleop_loop(
robot_action_processor: An optional pipeline to process actions before they are sent to the robot. robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
robot_observation_processor: An optional pipeline to process raw observations from the robot. robot_observation_processor: An optional pipeline to process raw observations from the robot.
""" """
if display_data:
init_rerun(
session_name="teleoperation",
robot=robot,
reset_time=True,
)
display_len = max(len(key) for key in robot.action_features) display_len = max(len(key) for key in robot.action_features)
+83 -2
View File
@@ -14,17 +14,24 @@
import numbers import numbers
import os import os
from uuid import uuid4
import numpy as np import numpy as np
import rerun as rr import rerun as rr
from lerobot.datasets.utils import DEFAULT_AUDIO_CHUNK_DURATION
from lerobot.processor import RobotAction, RobotObservation from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots import Robot
from .constants import ACTION, ACTION_PREFIX, OBS_PREFIX, OBS_STR from .constants import ACTION, ACTION_PREFIX, OBS_PREFIX, OBS_STR
def init_rerun( def init_rerun(
session_name: str = "lerobot_control_loop", ip: str | None = None, port: int | None = None session_name: str = "lerobot_control_loop",
ip: str | None = None,
port: int | None = None,
robot: Robot | None = None,
reset_time: bool = False,
) -> None: ) -> None:
""" """
Initializes the Rerun SDK for visualizing the control loop. Initializes the Rerun SDK for visualizing the control loop.
@@ -33,16 +40,25 @@ def init_rerun(
session_name: Name of the Rerun session. session_name: Name of the Rerun session.
ip: Optional IP for connecting to a Rerun server. ip: Optional IP for connecting to a Rerun server.
port: Optional port for connecting to a Rerun server. port: Optional port for connecting to a Rerun server.
robot: A Robot object. If provided, Rerun will be initialized with a blueprint that includes the object's cameras and microphones.
reset_time: Whether to reset the timer "episode_time" to 0.
""" """
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000") batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
rr.init(session_name) rr.init(
application_id=session_name,
recording_id=uuid4(),
default_blueprint=build_rerun_blueprint(robot) if robot is not None else None,
)
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%") memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
if ip and port: if ip and port:
rr.connect_grpc(url=f"rerun+http://{ip}:{port}/proxy") rr.connect_grpc(url=f"rerun+http://{ip}:{port}/proxy")
else: else:
rr.spawn(memory_limit=memory_limit) rr.spawn(memory_limit=memory_limit)
if reset_time:
rr.set_time_seconds("episode_time", seconds=0.0)
def _is_scalar(x): def _is_scalar(x):
return isinstance(x, (float | numbers.Real | np.integer | np.floating)) or ( return isinstance(x, (float | numbers.Real | np.integer | np.floating)) or (
@@ -50,10 +66,47 @@ def _is_scalar(x):
) )
def build_rerun_blueprint(robot: Robot) -> rr.blueprint.Grid:
""" "
Builds a Rerun blueprint for optimized visualization of the robot's observations and actions :
- Time series views for all scalar observations and actions (e.g. position, velocity, torque, etc.).
- Spatial 2D views for all camera observations.
- Time series views for all microphone observations.
Args:
robot: A Robot object.
Returns:
A Rerun blueprint.
"""
contents = [
rr.blueprint.TimeSeriesView(
origin="states_actions",
plot_legend=rr.blueprint.PlotLegend(visible=True),
)
]
if robot.microphones:
contents += [
rr.blueprint.TimeSeriesView(
origin="microphones",
plot_legend=rr.blueprint.PlotLegend(visible=True),
)
]
if robot.cameras:
contents += [
rr.blueprint.Spatial2DView(
origin=camera_name,
)
for camera_name in robot.cameras
]
return rr.blueprint.Grid(contents)
def log_rerun_data( def log_rerun_data(
observation: RobotObservation | None = None, observation: RobotObservation | None = None,
action: RobotAction | None = None, action: RobotAction | None = None,
compress_images: bool = False, compress_images: bool = False,
log_time: float | None = None,
) -> None: ) -> None:
""" """
Logs observation and action data to Rerun for real-time visualization. Logs observation and action data to Rerun for real-time visualization.
@@ -72,7 +125,12 @@ def log_rerun_data(
observation: An optional dictionary containing observation data to log. observation: An optional dictionary containing observation data to log.
action: An optional dictionary containing action data to log. action: An optional dictionary containing action data to log.
compress_images: Whether to compress images before logging to save bandwidth & memory in exchange for cpu and quality. compress_images: Whether to compress images before logging to save bandwidth & memory in exchange for cpu and quality.
log_time: The time to log the data in the "episode_time" timeline.
If None, the current time is used in Rerun's default timeline.
""" """
if log_time is not None:
rr.set_time_seconds("episode_time", seconds=log_time)
if observation: if observation:
for k, v in observation.items(): for k, v in observation.items():
if v is None: if v is None:
@@ -86,9 +144,32 @@ def log_rerun_data(
# Convert CHW -> HWC when needed # Convert CHW -> HWC when needed
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4): if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0)) arr = np.transpose(arr, (1, 2, 0))
# Convert channel x samples -> samples x channel when needed
elif arr.ndim == 2 and arr.shape[0] < arr.shape[1]:
arr = np.transpose(arr, (1, 0))
if arr.ndim == 1: if arr.ndim == 1:
for i, vi in enumerate(arr): for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalars(float(vi))) rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
elif arr.ndim == 2:
rr.send_columns(
"audio/" + key,
indexes=[
rr.TimeSecondsColumn(
"episode_time",
times=log_time
+ np.linspace(
-DEFAULT_AUDIO_CHUNK_DURATION,
0,
len(observation[key]),
endpoint=False,
),
)
],
columns=rr.Scalar.columns(scalar=observation[key]),
)
elif arr.ndim == 3:
rr.log(key, rr.Image(arr), static=True)
else: else:
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr) img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
rr.log(key, entity=img_entity, static=True) rr.log(key, entity=img_entity, static=True)