From 888a5b6249935f38969e894f07bb26a3e3463ff8 Mon Sep 17 00:00:00 2001 From: Adil Zouitine Date: Thu, 4 Sep 2025 19:25:51 +0200 Subject: [PATCH] refactor(utils): simplify log_rerun_data function (#1864) * refactor(logging): enhance log_rerun_data to handle observation and action separately - Updated the `log_rerun_data` function to accept and log observation and action data more clearly, improving readability and maintainability. - Refactored the `record_loop` and `teleop_loop` functions to extract and pass observation and action data to `log_rerun_data`, ensuring consistent logging format. * refactor(tests): update test_log_rerun_data to align with log_rerun_data changes - Modified test cases in `test_visualization_utils.py` to extract and pass observation and action data separately to `log_rerun_data`, improving clarity and consistency with recent function updates. - Ensured that the tests reflect the new structure of `log_rerun_data` for better maintainability. * refactor(processors): simplify calls to log_rerun + replace lambda functions with identity_transition --------- Co-authored-by: Steven Palma --- examples/phone_to_so100/evaluate.py | 5 +- examples/phone_to_so100/record.py | 7 +- src/lerobot/processor/converters.py | 4 + src/lerobot/record.py | 52 +++++++---- src/lerobot/teleoperate.py | 34 ++++--- src/lerobot/utils/visualization_utils.py | 108 ++++++++--------------- tests/utils/test_visualization_utils.py | 10 ++- 7 files changed, 109 insertions(+), 111 deletions(-) diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index a3ed4722b..9e6ef7a57 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -23,6 +23,7 @@ from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import ( + identity_transition, observation_to_transition, transition_to_robot_action, ) @@ -74,7 +75,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline( initial_guess_current_joints=True, ), ], - to_transition=lambda tr: tr, + to_transition=identity_transition, to_output=transition_to_robot_action, ) @@ -84,7 +85,7 @@ robot_joints_to_ee_pose_processor = RobotProcessorPipeline( ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], to_transition=observation_to_transition, - to_output=lambda tr: tr, + to_output=identity_transition, ) # Build dataset action and gripper features diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index fef612692..e2bd5541a 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -23,6 +23,7 @@ from lerobot.model.kinematics import RobotKinematics from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import ( action_to_transition, + identity_transition, observation_to_transition, transition_to_robot_action, ) @@ -89,7 +90,7 @@ phone_to_robot_ee_pose_processor = RobotProcessorPipeline( ), ], to_transition=action_to_transition, - to_output=lambda tr: tr, + to_output=identity_transition, ) # Build pipeline to convert ee pose action to joint action @@ -105,7 +106,7 @@ robot_ee_to_joints_processor = RobotProcessorPipeline( speed_factor=20.0, ), ], - to_transition=lambda tr: tr, + to_transition=identity_transition, to_output=transition_to_robot_action, ) @@ -115,7 +116,7 @@ robot_joints_to_ee_pose = RobotProcessorPipeline( ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], to_transition=observation_to_transition, - to_output=lambda tr: tr, + to_output=identity_transition, ) # Build dataset ee action features diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index 7a8decb12..ceec211aa 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -479,3 +479,7 @@ def transition_to_batch(transition: EnvTransition) -> dict[str, Any]: batch.update(observation) return batch + + +def identity_transition(tr: EnvTransition) -> EnvTransition: + return tr diff --git a/src/lerobot/record.py b/src/lerobot/record.py index b86b1613d..eac1b6c23 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -62,6 +62,7 @@ import time from dataclasses import asdict, dataclass, field from pathlib import Path from pprint import pformat +from typing import Any from lerobot.cameras import ( # noqa: F401 CameraConfig, # noqa: F401 @@ -77,6 +78,7 @@ from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy from lerobot.processor import ( + EnvTransition, IdentityProcessorStep, PolicyProcessorPipeline, RobotProcessorPipeline, @@ -84,6 +86,7 @@ from lerobot.processor import ( ) from lerobot.processor.converters import ( action_to_transition, + identity_transition, observation_to_transition, transition_to_dataset_frame, transition_to_robot_action, @@ -243,22 +246,33 @@ def record_loop( preprocessor: PolicyProcessorPipeline | None = None, postprocessor: PolicyProcessorPipeline | None = None, control_time_s: int | None = None, - teleop_action_processor: RobotProcessorPipeline | None = None, # runs after teleop - robot_action_processor: RobotProcessorPipeline | None = None, # runs before robot - robot_observation_processor: RobotProcessorPipeline | None = None, # runs after robot + teleop_action_processor: RobotProcessorPipeline[EnvTransition] | None = None, # runs after teleop + robot_action_processor: RobotProcessorPipeline[dict[str, Any]] | None = None, # runs before robot + robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None, # runs after robot single_task: str | None = None, display_data: bool = False, ): - teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( - steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr + teleop_action_processor: RobotProcessorPipeline[EnvTransition] = ( + teleop_action_processor + or RobotProcessorPipeline[EnvTransition]( + steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition + ) ) - robot_action_processor = robot_action_processor or RobotProcessorPipeline( - steps=[IdentityProcessorStep()], to_transition=lambda tr: tr, to_output=transition_to_robot_action + robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = ( + robot_action_processor + or RobotProcessorPipeline[dict[str, Any]]( + steps=[IdentityProcessorStep()], + to_transition=identity_transition, + to_output=transition_to_robot_action, + ) ) - robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( - steps=[IdentityProcessorStep()], - to_transition=observation_to_transition, - to_output=lambda tr: tr, + robot_observation_processor: RobotProcessorPipeline[EnvTransition] = ( + robot_observation_processor + or RobotProcessorPipeline[EnvTransition]( + steps=[IdentityProcessorStep()], + to_transition=observation_to_transition, + to_output=identity_transition, + ) ) if dataset is not None and dataset.fps != fps: @@ -309,7 +323,7 @@ def record_loop( obs = robot.get_observation() # Applies a pipeline to the raw robot observation, default is IdentityProcessor - obs_transition = robot_observation_processor(obs) + obs_transition: EnvTransition = robot_observation_processor(obs) # Get action from either policy or teleop if policy is not None and preprocessor is not None and postprocessor is not None: @@ -340,7 +354,9 @@ def record_loop( act = teleop.get_action() # Applies a pipeline to the raw teleop action, default is IdentityProcessor - teleop_transition = teleop_action_processor(act) + # TODO(Steven): This assumes that the processor passed by the user should have identity_transition as to_output. + # TODO(Steven): Why is this not automatically typed as EnvTransition? + teleop_transition: EnvTransition = teleop_action_processor(act) elif isinstance(teleop, list): arm_action = teleop_arm.get_action() @@ -348,7 +364,7 @@ def record_loop( keyboard_action = teleop_keyboard.get_action() base_action = robot._from_keyboard_to_base_action(keyboard_action) act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action - teleop_transition = teleop_action_processor(act) + teleop_transition: EnvTransition = teleop_action_processor(act) else: logging.info( "No policy or teleoperator provided, skipping action generation. " @@ -360,9 +376,9 @@ def record_loop( # Applies a pipeline to the action, default is IdentityProcessor # IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action() if policy is not None and policy_transition is not None: - robot_action_to_send = robot_action_processor(policy_transition) + robot_action_to_send: dict[str, Any] = robot_action_processor(policy_transition) else: - robot_action_to_send = robot_action_processor(teleop_transition) + robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition) # Send action to robot # Action can eventually be clipped using `max_relative_target`, @@ -386,7 +402,9 @@ def record_loop( dataset.add_frame(frame, task=single_task) if display_data: - log_rerun_data([obs_transition, teleop_transition or policy_transition]) + log_rerun_data( + observation=obs_transition.get(TransitionKey.OBSERVATION), action=robot_action_to_send + ) dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s) diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index a89872502..eb7f4bc1b 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -55,15 +55,17 @@ import logging import time from dataclasses import asdict, dataclass from pprint import pformat +from typing import Any import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 from lerobot.configs import parser -from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline +from lerobot.processor import EnvTransition, IdentityProcessorStep, RobotProcessorPipeline, TransitionKey from lerobot.processor.converters import ( action_to_transition, + identity_transition, observation_to_transition, transition_to_robot_action, ) @@ -115,23 +117,23 @@ def teleop_loop( fps: int, display_data: bool = False, duration: float | None = None, - teleop_action_processor: RobotProcessorPipeline | None = None, - robot_action_processor: RobotProcessorPipeline | None = None, - robot_observation_processor: RobotProcessorPipeline | None = None, + teleop_action_processor: RobotProcessorPipeline[EnvTransition] | None = None, + robot_action_processor: RobotProcessorPipeline[dict[str, Any]] | None = None, + robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None, ): # Initialize processors with defaults if not provided - teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( - steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr + teleop_action_processor = teleop_action_processor or RobotProcessorPipeline[EnvTransition]( + steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition ) - robot_action_processor = robot_action_processor or RobotProcessorPipeline( + robot_action_processor = robot_action_processor or RobotProcessorPipeline[dict[str, Any]]( steps=[IdentityProcessorStep()], - to_transition=lambda tr: tr, + to_transition=identity_transition, to_output=transition_to_robot_action, # type: ignore[arg-type] ) - robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( + robot_observation_processor = robot_observation_processor or RobotProcessorPipeline[EnvTransition]( steps=[IdentityProcessorStep()], to_transition=observation_to_transition, - to_output=lambda tr: tr, + to_output=identity_transition, ) # Reset processors @@ -149,10 +151,10 @@ def teleop_loop( raw_action = teleop.get_action() # Process teleop action through pipeline - teleop_transition = teleop_action_processor(raw_action) + teleop_transition: EnvTransition = teleop_action_processor(raw_action) # Process action for robot through pipeline - robot_action_to_send = robot_action_processor(teleop_transition) + robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition) # Send processed action to robot (robot_action_processor.to_output should return dict[str, Any]) robot.send_action(robot_action_to_send) # type: ignore[arg-type] @@ -161,8 +163,12 @@ def teleop_loop( # Get robot observation obs = robot.get_observation() # Process robot observation through pipeline - obs_transition = robot_observation_processor(obs) - log_rerun_data([obs_transition, teleop_transition]) + obs_transition: EnvTransition = robot_observation_processor(obs) + + log_rerun_data( + observation=obs_transition.get(TransitionKey.OBSERVATION), + action=teleop_transition.get(TransitionKey.ACTION), + ) print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") diff --git a/src/lerobot/utils/visualization_utils.py b/src/lerobot/utils/visualization_utils.py index 1bb5e65cd..c4f273506 100644 --- a/src/lerobot/utils/visualization_utils.py +++ b/src/lerobot/utils/visualization_utils.py @@ -19,8 +19,6 @@ from typing import Any import numpy as np import rerun as rr -from lerobot.processor import EnvTransition, TransitionKey - def _init_rerun(session_name: str = "lerobot_control_loop") -> None: """Initializes the Rerun SDK for visualizing the control loop.""" @@ -33,85 +31,51 @@ def _init_rerun(session_name: str = "lerobot_control_loop") -> None: def _is_scalar(x): return ( - isinstance(x, numbers.Real) + isinstance(x, float) + or isinstance(x, numbers.Real) or isinstance(x, (np.integer, np.floating)) or (isinstance(x, np.ndarray) and x.ndim == 0) ) def log_rerun_data( - data: list[dict[str | Any] | EnvTransition] | dict[str | Any] | EnvTransition | None = None, - *, observation: dict[str, Any] | None = None, action: dict[str, Any] | None = None, ) -> None: - items = data if isinstance(data, list) else ([data] if data is not None else []) + """Log observation and action data to Rerun for visualization.""" + if observation: + for k, v in observation.items(): + if v is None: + continue + key = k if str(k).startswith("observation.") else f"observation.{k}" - obs = {} if observation is None else dict(observation) - act = {} if action is None else dict(action) + if _is_scalar(v): + rr.log(key, rr.Scalar(float(v))) + elif isinstance(v, np.ndarray): + arr = v + # Convert CHW -> HWC when needed + if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4): + arr = np.transpose(arr, (1, 2, 0)) + if arr.ndim == 1: + for i, vi in enumerate(arr): + rr.log(f"{key}_{i}", rr.Scalar(float(vi))) + else: + rr.log(key, rr.Image(arr), static=True) - for idx, item in enumerate(items): - if not isinstance(item, dict): - continue + if action: + for k, v in action.items(): + if v is None: + continue + key = k if str(k).startswith("action.") else f"action.{k}" - if any(isinstance(k, TransitionKey) for k in item.keys()): - o = item.get(TransitionKey.OBSERVATION) or {} - a = item.get(TransitionKey.ACTION) or {} - if isinstance(o, dict): - obs.update(o) - if isinstance(a, dict): - act.update(a) - continue - - keys = list(item.keys()) - has_obs = any(str(k).startswith("observation.") for k in keys) - has_act = any(str(k).startswith("action.") for k in keys) - - if has_obs or has_act: - if has_obs: - obs.update(item) - if has_act: - act.update(item) - else: - # No prefixes: assume first is observation, second is action, others are observation - if idx == 0: - obs.update(item) - elif idx == 1: - act.update(item) - else: - obs.update(item) - - for k, v in obs.items(): - if v is None: - continue - key = k if str(k).startswith("observation.") else f"observation.{k}" - - if _is_scalar(v): - rr.log(key, rr.Scalar(float(v))) - elif isinstance(v, np.ndarray): - arr = v - # Convert CHW -> HWC when needed - if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4): - arr = np.transpose(arr, (1, 2, 0)) - if arr.ndim == 1: - for i, vi in enumerate(arr): - rr.log(f"{key}_{i}", rr.Scalar(float(vi))) - else: - rr.log(key, rr.Image(arr), static=True) - - for k, v in act.items(): - if v is None: - continue - key = k if str(k).startswith("action.") else f"action.{k}" - - if _is_scalar(v): - rr.log(key, rr.Scalar(float(v))) - elif isinstance(v, np.ndarray): - if v.ndim == 1: - for i, vi in enumerate(v): - rr.log(f"{key}_{i}", rr.Scalar(float(vi))) - else: - # Fall back to flattening higher-dimensional arrays - flat = v.flatten() - for i, vi in enumerate(flat): - rr.log(f"{key}_{i}", rr.Scalar(float(vi))) + if _is_scalar(v): + rr.log(key, rr.Scalar(float(v))) + elif isinstance(v, np.ndarray): + if v.ndim == 1: + for i, vi in enumerate(v): + rr.log(f"{key}_{i}", rr.Scalar(float(vi))) + else: + # Fall back to flattening higher-dimensional arrays + flat = v.flatten() + for i, vi in enumerate(flat): + rr.log(f"{key}_{i}", rr.Scalar(float(vi))) diff --git a/tests/utils/test_visualization_utils.py b/tests/utils/test_visualization_utils.py index bcdfff5e2..29b7bf70a 100644 --- a/tests/utils/test_visualization_utils.py +++ b/tests/utils/test_visualization_utils.py @@ -86,7 +86,10 @@ def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun): TransitionKey.ACTION: act, } - vu.log_rerun_data(transition) + # Extract observation and action data from transition like in the real call sites + obs_data = transition.get(TransitionKey.OBSERVATION, {}) + action_data = transition.get(TransitionKey.ACTION, {}) + vu.log_rerun_data(observation=obs_data, action=action_data) # We expect: # - observation.state.temperature -> Scalar @@ -141,7 +144,9 @@ def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun): "vec": np.array([9, 8, 7], dtype=np.float32), } - vu.log_rerun_data([obs_plain, act_plain]) + # Extract observation and action data from list like the old function logic did + # First dict was treated as observation, second as action + vu.log_rerun_data(observation=obs_plain, action=act_plain) # Expected keys with auto-prefixes expected = { @@ -181,7 +186,6 @@ def test_log_rerun_data_kwargs_only(mock_rerun): vu, calls = mock_rerun vu.log_rerun_data( - None, observation={"observation.temp": 10.0, "observation.gray": np.zeros((8, 8, 1), dtype=np.uint8)}, action={"action.a": 1.0}, )