From cd0098a5f7a1be4b5700acbbd46276cf7bb46a12 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Thu, 11 Sep 2025 18:20:39 +0200 Subject: [PATCH] debug(scripts): simplify record with processors (#1918) Co-authored-by: Adil Zouitine --- examples/phone_to_so100/evaluate.py | 4 +- examples/phone_to_so100/record.py | 5 +- src/lerobot/processor/converters.py | 1 + src/lerobot/record.py | 120 ++++++++++++++-------------- 4 files changed, 63 insertions(+), 67 deletions(-) diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index 1715412b2..2c4c14cfb 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -93,9 +93,8 @@ robot_joints_to_ee_pose_processor = RobotProcessorPipeline[dict[str, Any], EnvTr # Build dataset action and gripper features action_ee_and_gripper = aggregate_pipeline_dataset_features( pipeline=robot_ee_to_joints_processor, - initial_features=create_initial_features(), + initial_features=create_initial_features(action=robot.action_features), use_videos=True, - patterns=["action.ee", "action.gripper.pos", "observation.state.gripper.pos"], ) # Get all ee action features + gripper pos action features # Build dataset observation features @@ -103,7 +102,6 @@ obs_ee = aggregate_pipeline_dataset_features( pipeline=robot_joints_to_ee_pose_processor, initial_features=create_initial_features(observation=robot.observation_features), use_videos=True, - patterns=["observation.state.ee"], ) # Get all ee observation features dataset_features = combine_feature_dicts(obs_ee, action_ee_and_gripper) diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index f0aa0e5ad..4c9a8f1aa 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -125,15 +125,13 @@ action_ee = aggregate_pipeline_dataset_features( pipeline=phone_to_robot_ee_pose_processor, initial_features=create_initial_features(action=phone.action_features), use_videos=True, - patterns=["action.ee"], ) # Get gripper pos action features gripper = aggregate_pipeline_dataset_features( pipeline=robot_ee_to_joints_processor, - initial_features=create_initial_features(), + initial_features=create_initial_features(action=robot.action_features), use_videos=True, - patterns=["action.gripper.pos", "observation.state.gripper.pos"], ) # Build dataset ee observation features @@ -141,7 +139,6 @@ observation_ee = aggregate_pipeline_dataset_features( pipeline=robot_joints_to_ee_pose, initial_features=create_initial_features(observation=robot.observation_features), use_videos=True, - patterns=["observation.state.ee"], ) dataset_features = combine_feature_dicts(action_ee, gripper, observation_ee) diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index 3ae846056..d4ad710ce 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -381,6 +381,7 @@ def merge_transitions(transitions: Sequence[EnvTransition] | EnvTransition) -> E return result +# TODO(Steven): Currently unused consider removing after testing def transition_to_dataset_frame( transitions_or_transition: EnvTransition | Sequence[EnvTransition], features: dict[str, dict] ) -> dict[str, Any]: diff --git a/src/lerobot/record.py b/src/lerobot/record.py index 71de5dd5f..b5a93e34c 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -73,7 +73,8 @@ from lerobot.configs import parser from lerobot.configs.policies import PreTrainedConfig from lerobot.datasets.image_writer import safe_stop_image_writer from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features +from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy @@ -90,7 +91,6 @@ from lerobot.processor.converters import ( identity_transition, observation_to_transition, robot_action_to_transition, - transition_to_dataset_frame, transition_to_robot_action, ) from lerobot.processor.rename_processor import rename_stats @@ -242,46 +242,18 @@ def record_loop( robot: Robot, events: dict, fps: int, + teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition], # runs after teleop + robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction], # runs before robot + robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition], # runs after robot dataset: LeRobotDataset | None = None, teleop: Teleoperator | list[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]] | None = None, postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction] | None = None, control_time_s: int | None = None, - teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] - | None = None, # runs after teleop - robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] - | None = None, # runs before robot - robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] - | None = None, # runs after robot single_task: str | None = None, display_data: bool = False, ): - teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] = ( - teleop_action_processor - or RobotProcessorPipeline[RobotAction, EnvTransition]( - steps=[IdentityProcessorStep()], - to_transition=robot_action_to_transition, - to_output=identity_transition, - ) - ) - robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] = ( - robot_action_processor - or RobotProcessorPipeline[EnvTransition, RobotAction]( - steps=[IdentityProcessorStep()], - to_transition=identity_transition, - to_output=transition_to_robot_action, - ) - ) - robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] = ( - robot_observation_processor - or RobotProcessorPipeline[dict[str, Any], EnvTransition]( - steps=[IdentityProcessorStep()], - to_transition=observation_to_transition, - to_output=identity_transition, - ) - ) - if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") @@ -315,11 +287,6 @@ def record_loop( preprocessor.reset() postprocessor.reset() - # Reset custom pipelines - teleop_action_processor.reset() - robot_action_processor.reset() - robot_observation_processor.reset() - policy_transition = None teleop_transition = None obs_transition = None @@ -339,13 +306,14 @@ def record_loop( # Applies a pipeline to the raw robot observation, default is IdentityProcessor obs_transition = robot_observation_processor(obs) + if policy is not None or dataset is not None: + # TODO(Steven): We might be able to get rid of this + observation_frame = build_dataset_frame( + dataset.features, obs_transition[TransitionKey.OBSERVATION], prefix="observation" + ) + # Get action from either policy or teleop if policy is not None and preprocessor is not None and postprocessor is not None: - if dataset is not None: - observation_frame = transition_to_dataset_frame( - obs_transition, dataset.features - ) # Convert the observation to the dataset format - action_values = predict_action( observation=observation_frame, policy=policy, @@ -358,7 +326,7 @@ def record_loop( ) action_names = dataset.features["action"]["names"] - policy_action = {f"action.{name}": float(action_values[i]) for i, name in enumerate(action_names)} + policy_action = {f"{name}": float(action_values[i]) for i, name in enumerate(action_names)} policy_transition = { TransitionKey.ACTION: policy_action, TransitionKey.COMPLEMENTARY_DATA: {}, @@ -397,21 +365,12 @@ def record_loop( # Action can eventually be clipped using `max_relative_target`, # so action actually sent is saved in the dataset. action = postprocessor.process(action) # TODO(pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot. - _ = robot.send_action(robot_action_to_send) + sent_action = robot.send_action(robot_action_to_send) # Write to dataset if dataset is not None: - # If transition_to_dataset_frame is provided, use it to merge the transitions. - merged = [] - if obs_transition is not None: # The observation from the robot - merged.append(obs_transition) - if teleop_transition is not None: # The action from teleop - merged.append(teleop_transition) - if policy_transition is not None: # The action from policy - merged.append(policy_transition) - frame = transition_to_dataset_frame( - merged if len(merged) > 1 else merged[0], dataset.features - ) # Convert the observation to the dataset format + action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action") + frame = {**observation_frame, **action_frame} dataset.add_frame(frame, task=single_task) if display_data: @@ -435,17 +394,52 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot = make_robot_from_config(cfg.robot) teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None - action_features = hw_to_dataset_features(robot.action_features, "action", cfg.dataset.video) - obs_features = hw_to_dataset_features(robot.observation_features, "observation", cfg.dataset.video) + teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] = RobotProcessorPipeline[ + RobotAction, EnvTransition + ]( + steps=[IdentityProcessorStep()], + to_transition=robot_action_to_transition, + to_output=identity_transition, + ) + robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] = RobotProcessorPipeline[ + EnvTransition, RobotAction + ]( + steps=[IdentityProcessorStep()], + to_transition=identity_transition, + to_output=transition_to_robot_action, + ) + robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] = ( + RobotProcessorPipeline[dict[str, Any], EnvTransition]( + steps=[IdentityProcessorStep()], + to_transition=observation_to_transition, + to_output=identity_transition, + ) + ) + + atf = aggregate_pipeline_dataset_features( + pipeline=teleop_action_processor, + initial_features=create_initial_features(action=teleop.action_features, observation=None), + use_videos=cfg.dataset.video, + ) + arf = aggregate_pipeline_dataset_features( + pipeline=robot_action_processor, + initial_features=create_initial_features(action=robot.action_features, observation=None), + use_videos=cfg.dataset.video, + ) + of = aggregate_pipeline_dataset_features( + pipeline=robot_observation_processor, + initial_features=create_initial_features(observation=robot.observation_features, action=None), + use_videos=cfg.dataset.video, + ) # Add next.* features that are generated during recording - transition_features = { + _transition_features = { "next.reward": {"dtype": "float32", "shape": (1,), "names": None}, "next.done": {"dtype": "bool", "shape": (1,), "names": None}, "next.truncated": {"dtype": "bool", "shape": (1,), "names": None}, } - dataset_features = {**action_features, **obs_features, **transition_features} + dataset_features = {**combine_feature_dicts(atf, arf, of)} # , **transition_features} if cfg.resume: dataset = LeRobotDataset( @@ -504,6 +498,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot=robot, events=events, fps=cfg.dataset.fps, + teleop_action_processor=teleop_action_processor, + robot_action_processor=robot_action_processor, + robot_observation_processor=robot_observation_processor, teleop=teleop, policy=policy, preprocessor=preprocessor, @@ -524,6 +521,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot=robot, events=events, fps=cfg.dataset.fps, + teleop_action_processor=teleop_action_processor, + robot_action_processor=robot_action_processor, + robot_observation_processor=robot_observation_processor, teleop=teleop, control_time_s=cfg.dataset.reset_time_s, single_task=cfg.dataset.single_task,