refactor(utils): simplify log_rerun_data function (#1864)

* refactor(logging): enhance log_rerun_data to handle observation and action separately

- Updated the `log_rerun_data` function to accept and log observation and action data more clearly, improving readability and maintainability.
- Refactored the `record_loop` and `teleop_loop` functions to extract and pass observation and action data to `log_rerun_data`, ensuring consistent logging format.

* refactor(tests): update test_log_rerun_data to align with log_rerun_data changes

- Modified test cases in `test_visualization_utils.py` to extract and pass observation and action data separately to `log_rerun_data`, improving clarity and consistency with recent function updates.
- Ensured that the tests reflect the new structure of `log_rerun_data` for better maintainability.

* refactor(processors): simplify calls to log_rerun + replace lambda functions with identity_transition

---------

Co-authored-by: Steven Palma <steven.palma@huggingface.co>
This commit is contained in:
Adil Zouitine
2025-09-04 19:25:51 +02:00
committed by GitHub
parent f247aa0701
commit 888a5b6249
7 changed files with 109 additions and 111 deletions
+3 -2
View File
@@ -23,6 +23,7 @@ from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import RobotProcessorPipeline from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import ( from lerobot.processor.converters import (
identity_transition,
observation_to_transition, observation_to_transition,
transition_to_robot_action, transition_to_robot_action,
) )
@@ -74,7 +75,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline(
initial_guess_current_joints=True, initial_guess_current_joints=True,
), ),
], ],
to_transition=lambda tr: tr, to_transition=identity_transition,
to_output=transition_to_robot_action, to_output=transition_to_robot_action,
) )
@@ -84,7 +85,7 @@ robot_joints_to_ee_pose_processor = RobotProcessorPipeline(
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
], ],
to_transition=observation_to_transition, to_transition=observation_to_transition,
to_output=lambda tr: tr, to_output=identity_transition,
) )
# Build dataset action and gripper features # Build dataset action and gripper features
+4 -3
View File
@@ -23,6 +23,7 @@ from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotProcessorPipeline from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import ( from lerobot.processor.converters import (
action_to_transition, action_to_transition,
identity_transition,
observation_to_transition, observation_to_transition,
transition_to_robot_action, transition_to_robot_action,
) )
@@ -89,7 +90,7 @@ phone_to_robot_ee_pose_processor = RobotProcessorPipeline(
), ),
], ],
to_transition=action_to_transition, to_transition=action_to_transition,
to_output=lambda tr: tr, to_output=identity_transition,
) )
# Build pipeline to convert ee pose action to joint action # Build pipeline to convert ee pose action to joint action
@@ -105,7 +106,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline(
speed_factor=20.0, speed_factor=20.0,
), ),
], ],
to_transition=lambda tr: tr, to_transition=identity_transition,
to_output=transition_to_robot_action, to_output=transition_to_robot_action,
) )
@@ -115,7 +116,7 @@ robot_joints_to_ee_pose = RobotProcessorPipeline(
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
], ],
to_transition=observation_to_transition, to_transition=observation_to_transition,
to_output=lambda tr: tr, to_output=identity_transition,
) )
# Build dataset ee action features # Build dataset ee action features
+4
View File
@@ -479,3 +479,7 @@ def transition_to_batch(transition: EnvTransition) -> dict[str, Any]:
batch.update(observation) batch.update(observation)
return batch return batch
def identity_transition(tr: EnvTransition) -> EnvTransition:
return tr
+35 -17
View File
@@ -62,6 +62,7 @@ import time
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from pathlib import Path from pathlib import Path
from pprint import pformat from pprint import pformat
from typing import Any
from lerobot.cameras import ( # noqa: F401 from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401 CameraConfig, # noqa: F401
@@ -77,6 +78,7 @@ from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.processor import ( from lerobot.processor import (
EnvTransition,
IdentityProcessorStep, IdentityProcessorStep,
PolicyProcessorPipeline, PolicyProcessorPipeline,
RobotProcessorPipeline, RobotProcessorPipeline,
@@ -84,6 +86,7 @@ from lerobot.processor import (
) )
from lerobot.processor.converters import ( from lerobot.processor.converters import (
action_to_transition, action_to_transition,
identity_transition,
observation_to_transition, observation_to_transition,
transition_to_dataset_frame, transition_to_dataset_frame,
transition_to_robot_action, transition_to_robot_action,
@@ -243,22 +246,33 @@ def record_loop(
preprocessor: PolicyProcessorPipeline | None = None, preprocessor: PolicyProcessorPipeline | None = None,
postprocessor: PolicyProcessorPipeline | None = None, postprocessor: PolicyProcessorPipeline | None = None,
control_time_s: int | None = None, control_time_s: int | None = None,
teleop_action_processor: RobotProcessorPipeline | None = None, # runs after teleop teleop_action_processor: RobotProcessorPipeline[EnvTransition] | None = None, # runs after teleop
robot_action_processor: RobotProcessorPipeline | None = None, # runs before robot robot_action_processor: RobotProcessorPipeline[dict[str, Any]] | None = None, # runs before robot
robot_observation_processor: RobotProcessorPipeline | None = None, # runs after robot robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None, # runs after robot
single_task: str | None = None, single_task: str | None = None,
display_data: bool = False, display_data: bool = False,
): ):
teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( teleop_action_processor: RobotProcessorPipeline[EnvTransition] = (
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr teleop_action_processor
or RobotProcessorPipeline[EnvTransition](
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition
)
) )
robot_action_processor = robot_action_processor or RobotProcessorPipeline( robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = (
steps=[IdentityProcessorStep()], to_transition=lambda tr: tr, to_output=transition_to_robot_action robot_action_processor
or RobotProcessorPipeline[dict[str, Any]](
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action,
)
) )
robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( robot_observation_processor: RobotProcessorPipeline[EnvTransition] = (
steps=[IdentityProcessorStep()], robot_observation_processor
to_transition=observation_to_transition, or RobotProcessorPipeline[EnvTransition](
to_output=lambda tr: tr, steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
)
) )
if dataset is not None and dataset.fps != fps: if dataset is not None and dataset.fps != fps:
@@ -309,7 +323,7 @@ def record_loop(
obs = robot.get_observation() obs = robot.get_observation()
# Applies a pipeline to the raw robot observation, default is IdentityProcessor # Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_transition = robot_observation_processor(obs) obs_transition: EnvTransition = robot_observation_processor(obs)
# Get action from either policy or teleop # Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None: if policy is not None and preprocessor is not None and postprocessor is not None:
@@ -340,7 +354,9 @@ def record_loop(
act = teleop.get_action() act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor # Applies a pipeline to the raw teleop action, default is IdentityProcessor
teleop_transition = teleop_action_processor(act) # TODO(Steven): This assumes that the processor passed by the user should have identity_transition as to_output.
# TODO(Steven): Why is this not automatically typed as EnvTransition?
teleop_transition: EnvTransition = teleop_action_processor(act)
elif isinstance(teleop, list): elif isinstance(teleop, list):
arm_action = teleop_arm.get_action() arm_action = teleop_arm.get_action()
@@ -348,7 +364,7 @@ def record_loop(
keyboard_action = teleop_keyboard.get_action() keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action) base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
teleop_transition = teleop_action_processor(act) teleop_transition: EnvTransition = teleop_action_processor(act)
else: else:
logging.info( logging.info(
"No policy or teleoperator provided, skipping action generation. " "No policy or teleoperator provided, skipping action generation. "
@@ -360,9 +376,9 @@ def record_loop(
# Applies a pipeline to the action, default is IdentityProcessor # Applies a pipeline to the action, default is IdentityProcessor
# IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action() # IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action()
if policy is not None and policy_transition is not None: if policy is not None and policy_transition is not None:
robot_action_to_send = robot_action_processor(policy_transition) robot_action_to_send: dict[str, Any] = robot_action_processor(policy_transition)
else: else:
robot_action_to_send = robot_action_processor(teleop_transition) robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition)
# Send action to robot # Send action to robot
# Action can eventually be clipped using `max_relative_target`, # Action can eventually be clipped using `max_relative_target`,
@@ -386,7 +402,9 @@ def record_loop(
dataset.add_frame(frame, task=single_task) dataset.add_frame(frame, task=single_task)
if display_data: if display_data:
log_rerun_data([obs_transition, teleop_transition or policy_transition]) log_rerun_data(
observation=obs_transition.get(TransitionKey.OBSERVATION), action=robot_action_to_send
)
dt_s = time.perf_counter() - start_loop_t dt_s = time.perf_counter() - start_loop_t
busy_wait(1 / fps - dt_s) busy_wait(1 / fps - dt_s)
+20 -14
View File
@@ -55,15 +55,17 @@ import logging
import time import time
from dataclasses import asdict, dataclass from dataclasses import asdict, dataclass
from pprint import pformat from pprint import pformat
from typing import Any
import rerun as rr import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser from lerobot.configs import parser
from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline from lerobot.processor import EnvTransition, IdentityProcessorStep, RobotProcessorPipeline, TransitionKey
from lerobot.processor.converters import ( from lerobot.processor.converters import (
action_to_transition, action_to_transition,
identity_transition,
observation_to_transition, observation_to_transition,
transition_to_robot_action, transition_to_robot_action,
) )
@@ -115,23 +117,23 @@ def teleop_loop(
fps: int, fps: int,
display_data: bool = False, display_data: bool = False,
duration: float | None = None, duration: float | None = None,
teleop_action_processor: RobotProcessorPipeline | None = None, teleop_action_processor: RobotProcessorPipeline[EnvTransition] | None = None,
robot_action_processor: RobotProcessorPipeline | None = None, robot_action_processor: RobotProcessorPipeline[dict[str, Any]] | None = None,
robot_observation_processor: RobotProcessorPipeline | None = None, robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None,
): ):
# Initialize processors with defaults if not provided # Initialize processors with defaults if not provided
teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( teleop_action_processor = teleop_action_processor or RobotProcessorPipeline[EnvTransition](
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition
) )
robot_action_processor = robot_action_processor or RobotProcessorPipeline( robot_action_processor = robot_action_processor or RobotProcessorPipeline[dict[str, Any]](
steps=[IdentityProcessorStep()], steps=[IdentityProcessorStep()],
to_transition=lambda tr: tr, to_transition=identity_transition,
to_output=transition_to_robot_action, # type: ignore[arg-type] to_output=transition_to_robot_action, # type: ignore[arg-type]
) )
robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( robot_observation_processor = robot_observation_processor or RobotProcessorPipeline[EnvTransition](
steps=[IdentityProcessorStep()], steps=[IdentityProcessorStep()],
to_transition=observation_to_transition, to_transition=observation_to_transition,
to_output=lambda tr: tr, to_output=identity_transition,
) )
# Reset processors # Reset processors
@@ -149,10 +151,10 @@ def teleop_loop(
raw_action = teleop.get_action() raw_action = teleop.get_action()
# Process teleop action through pipeline # Process teleop action through pipeline
teleop_transition = teleop_action_processor(raw_action) teleop_transition: EnvTransition = teleop_action_processor(raw_action)
# Process action for robot through pipeline # Process action for robot through pipeline
robot_action_to_send = robot_action_processor(teleop_transition) robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition)
# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any]) # Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
robot.send_action(robot_action_to_send) # type: ignore[arg-type] robot.send_action(robot_action_to_send) # type: ignore[arg-type]
@@ -161,8 +163,12 @@ def teleop_loop(
# Get robot observation # Get robot observation
obs = robot.get_observation() obs = robot.get_observation()
# Process robot observation through pipeline # Process robot observation through pipeline
obs_transition = robot_observation_processor(obs) obs_transition: EnvTransition = robot_observation_processor(obs)
log_rerun_data([obs_transition, teleop_transition])
log_rerun_data(
observation=obs_transition.get(TransitionKey.OBSERVATION),
action=teleop_transition.get(TransitionKey.ACTION),
)
print("\n" + "-" * (display_len + 10)) print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'NORM':>7}") print(f"{'NAME':<{display_len}} | {'NORM':>7}")
+36 -72
View File
@@ -19,8 +19,6 @@ from typing import Any
import numpy as np import numpy as np
import rerun as rr import rerun as rr
from lerobot.processor import EnvTransition, TransitionKey
def _init_rerun(session_name: str = "lerobot_control_loop") -> None: def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
"""Initializes the Rerun SDK for visualizing the control loop.""" """Initializes the Rerun SDK for visualizing the control loop."""
@@ -33,85 +31,51 @@ def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
def _is_scalar(x): def _is_scalar(x):
return ( return (
isinstance(x, numbers.Real) isinstance(x, float)
or isinstance(x, numbers.Real)
or isinstance(x, (np.integer, np.floating)) or isinstance(x, (np.integer, np.floating))
or (isinstance(x, np.ndarray) and x.ndim == 0) or (isinstance(x, np.ndarray) and x.ndim == 0)
) )
def log_rerun_data( def log_rerun_data(
data: list[dict[str | Any] | EnvTransition] | dict[str | Any] | EnvTransition | None = None,
*,
observation: dict[str, Any] | None = None, observation: dict[str, Any] | None = None,
action: dict[str, Any] | None = None, action: dict[str, Any] | None = None,
) -> None: ) -> None:
items = data if isinstance(data, list) else ([data] if data is not None else []) """Log observation and action data to Rerun for visualization."""
if observation:
for k, v in observation.items():
if v is None:
continue
key = k if str(k).startswith("observation.") else f"observation.{k}"
obs = {} if observation is None else dict(observation) if _is_scalar(v):
act = {} if action is None else dict(action) rr.log(key, rr.Scalar(float(v)))
elif isinstance(v, np.ndarray):
arr = v
# Convert CHW -> HWC when needed
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.ndim == 1:
for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
else:
rr.log(key, rr.Image(arr), static=True)
for idx, item in enumerate(items): if action:
if not isinstance(item, dict): for k, v in action.items():
continue if v is None:
continue
key = k if str(k).startswith("action.") else f"action.{k}"
if any(isinstance(k, TransitionKey) for k in item.keys()): if _is_scalar(v):
o = item.get(TransitionKey.OBSERVATION) or {} rr.log(key, rr.Scalar(float(v)))
a = item.get(TransitionKey.ACTION) or {} elif isinstance(v, np.ndarray):
if isinstance(o, dict): if v.ndim == 1:
obs.update(o) for i, vi in enumerate(v):
if isinstance(a, dict): rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
act.update(a) else:
continue # Fall back to flattening higher-dimensional arrays
flat = v.flatten()
keys = list(item.keys()) for i, vi in enumerate(flat):
has_obs = any(str(k).startswith("observation.") for k in keys) rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
has_act = any(str(k).startswith("action.") for k in keys)
if has_obs or has_act:
if has_obs:
obs.update(item)
if has_act:
act.update(item)
else:
# No prefixes: assume first is observation, second is action, others are observation
if idx == 0:
obs.update(item)
elif idx == 1:
act.update(item)
else:
obs.update(item)
for k, v in obs.items():
if v is None:
continue
key = k if str(k).startswith("observation.") else f"observation.{k}"
if _is_scalar(v):
rr.log(key, rr.Scalar(float(v)))
elif isinstance(v, np.ndarray):
arr = v
# Convert CHW -> HWC when needed
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.ndim == 1:
for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
else:
rr.log(key, rr.Image(arr), static=True)
for k, v in act.items():
if v is None:
continue
key = k if str(k).startswith("action.") else f"action.{k}"
if _is_scalar(v):
rr.log(key, rr.Scalar(float(v)))
elif isinstance(v, np.ndarray):
if v.ndim == 1:
for i, vi in enumerate(v):
rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
else:
# Fall back to flattening higher-dimensional arrays
flat = v.flatten()
for i, vi in enumerate(flat):
rr.log(f"{key}_{i}", rr.Scalar(float(vi)))
+7 -3
View File
@@ -86,7 +86,10 @@ def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun):
TransitionKey.ACTION: act, TransitionKey.ACTION: act,
} }
vu.log_rerun_data(transition) # Extract observation and action data from transition like in the real call sites
obs_data = transition.get(TransitionKey.OBSERVATION, {})
action_data = transition.get(TransitionKey.ACTION, {})
vu.log_rerun_data(observation=obs_data, action=action_data)
# We expect: # We expect:
# - observation.state.temperature -> Scalar # - observation.state.temperature -> Scalar
@@ -141,7 +144,9 @@ def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun):
"vec": np.array([9, 8, 7], dtype=np.float32), "vec": np.array([9, 8, 7], dtype=np.float32),
} }
vu.log_rerun_data([obs_plain, act_plain]) # Extract observation and action data from list like the old function logic did
# First dict was treated as observation, second as action
vu.log_rerun_data(observation=obs_plain, action=act_plain)
# Expected keys with auto-prefixes # Expected keys with auto-prefixes
expected = { expected = {
@@ -181,7 +186,6 @@ def test_log_rerun_data_kwargs_only(mock_rerun):
vu, calls = mock_rerun vu, calls = mock_rerun
vu.log_rerun_data( vu.log_rerun_data(
None,
observation={"observation.temp": 10.0, "observation.gray": np.zeros((8, 8, 1), dtype=np.uint8)}, observation={"observation.temp": 10.0, "observation.gray": np.zeros((8, 8, 1), dtype=np.uint8)},
action={"action.a": 1.0}, action={"action.a": 1.0},
) )