diff --git a/examples/lekiwi/evaluate.py b/examples/lekiwi/evaluate.py index a3144a442..1458d0ce8 100644 --- a/examples/lekiwi/evaluate.py +++ b/examples/lekiwi/evaluate.py @@ -18,7 +18,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import hw_to_dataset_features from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors -from lerobot.processor import make_default_processors from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig from lerobot.scripts.lerobot_record import record_loop from lerobot.utils.constants import ACTION, OBS_STR @@ -71,9 +70,6 @@ def main(): # To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi` robot.connect() - # TODO(Steven): Update this example to use pipelines - teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() - # Initialize the keyboard listener and rerun visualization listener, events = init_keyboard_listener() init_rerun(session_name="lekiwi_evaluate") @@ -99,9 +95,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, ) # Reset the environment if not stopping or re-recording @@ -116,9 +109,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, ) if events["rerecord_episode"]: diff --git a/examples/lekiwi/record.py b/examples/lekiwi/record.py index 9292157f7..8efdcb66c 100644 --- a/examples/lekiwi/record.py +++ b/examples/lekiwi/record.py @@ -16,7 +16,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import hw_to_dataset_features -from lerobot.processor import make_default_processors from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient from lerobot.scripts.lerobot_record import record_loop @@ -46,9 +45,6 @@ def main(): leader_arm = SO100Leader(leader_arm_config) keyboard = KeyboardTeleop(keyboard_config) - # TODO(Steven): Update this example to use pipelines - teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() - # Configure the dataset features action_features = hw_to_dataset_features(robot.action_features, ACTION) obs_features = hw_to_dataset_features(robot.observation_features, OBS_STR) @@ -93,9 +89,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, ) # Reset the environment if not stopping or re-recording @@ -111,9 +104,6 @@ def main(): control_time_s=RESET_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, ) if events["rerecord_episode"]: diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index 837217eda..f839d883a 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -17,30 +17,16 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import combine_feature_dicts -from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors -from lerobot.processor import ( - RobotAction, - RobotObservation, - RobotProcessorPipeline, - make_default_teleop_action_processor, -) -from lerobot.processor.converters import ( - observation_to_transition, - robot_action_observation_to_transition, - transition_to_observation, - transition_to_robot_action, -) from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig -from lerobot.robots.so_follower.robot_kinematic_processor import ( - ForwardKinematicsJointsToEE, - InverseKinematicsEEToJoints, +from lerobot.robots.so_follower.pipelines import ( + make_so10x_fk_observation_pipeline, + make_so10x_ik_action_pipeline, ) from lerobot.scripts.lerobot_record import record_loop from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.pipeline_utils import build_dataset_features from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun @@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description" HF_MODEL_ID = "/" HF_DATASET_ID = "/" +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" + def main(): # Create the robot configuration & robot @@ -64,68 +54,31 @@ def main(): robot = SO100Follower(robot_config) - # Create policy - policy = ACTPolicy.from_pretrained(HF_MODEL_ID) + # Attach FK/IK pipelines so the robot works in EE space + motor_names = list(robot.bus.motors.keys()) + robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names)) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(robot.bus.motors.keys()), - ) - - # Build pipeline to convert EE action to joints action - robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[ - InverseKinematicsEEToJoints( - kinematics=kinematics_solver, - motor_names=list(robot.bus.motors.keys()), - initial_guess_current_joints=True, - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - - # Build pipeline to convert joints observation to EE observation - robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()) - ) - ], - to_transition=observation_to_transition, - to_output=transition_to_observation, - ) - - # Create the dataset + # Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit dataset = LeRobotDataset.create( repo_id=HF_DATASET_ID, fps=FPS, - features=combine_feature_dicts( - aggregate_pipeline_dataset_features( - pipeline=robot_joints_to_ee_pose_processor, - initial_features=create_initial_features(observation=robot.observation_features), - use_videos=True, - ), - # User for now should be explicit on the feature keys that were used for record - # Alternatively, the user can pass the processor step that has the right features - aggregate_pipeline_dataset_features( - pipeline=make_default_teleop_action_processor(), - initial_features=create_initial_features( - action={ - f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"] - } - ), - use_videos=True, - ), + features=build_dataset_features( + robot, + use_videos=True, + action_features={ + f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"] + }, ), robot_type=robot.name, use_videos=True, image_writer_threads=4, ) + # Create policy + policy = ACTPolicy.from_pretrained(HF_MODEL_ID) + # Build Policy Processors preprocessor, postprocessor = make_pre_post_processors( policy_cfg=policy, @@ -151,21 +104,18 @@ def main(): for episode_idx in range(NUM_EPISODES): log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop + # Main record loop — pipelines applied internally by robot record_loop( robot=robot, events=events, fps=FPS, policy=policy, - preprocessor=preprocessor, # Pass the pre and post policy processors + preprocessor=preprocessor, postprocessor=postprocessor, dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, ) # Reset the environment if not stopping or re-recording @@ -180,9 +130,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, ) if events["rerecord_episode"]: diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index 1f5005db9..a98cdb8d0 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -16,21 +16,17 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import combine_feature_dicts -from lerobot.model.kinematics import RobotKinematics from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline from lerobot.processor.converters import ( - observation_to_transition, robot_action_observation_to_transition, - transition_to_observation, + robot_action_to_transition, transition_to_robot_action, ) from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig +from lerobot.robots.so_follower.pipelines import make_so10x_fk_observation_pipeline from lerobot.robots.so_follower.robot_kinematic_processor import ( EEBoundsAndSafety, EEReferenceAndDelta, - ForwardKinematicsJointsToEE, GripperVelocityToJoint, InverseKinematicsEEToJoints, ) @@ -39,6 +35,7 @@ from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS from lerobot.teleoperators.phone.phone_processor import MapPhoneActionToRobotAction from lerobot.teleoperators.phone.teleop_phone import Phone from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.pipeline_utils import build_dataset_features from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun @@ -49,6 +46,10 @@ RESET_TIME_SEC = 30 TASK_DESCRIPTION = "My task description" HF_REPO_ID = "/" +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" + def main(): # Create the robot and teleoperator configurations @@ -65,77 +66,59 @@ def main(): robot = SO100Follower(robot_config) phone = Phone(teleop_config) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf + motor_names = list(robot.bus.motors.keys()) + + from lerobot.model.kinematics import RobotKinematics + kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", + urdf_path=URDF_PATH, target_frame_name="gripper_frame_link", - joint_names=list(robot.bus.motors.keys()), + joint_names=motor_names, ) - # Build pipeline to convert phone action to EE action - phone_to_robot_ee_pose_processor = RobotProcessorPipeline[ - tuple[RobotAction, RobotObservation], RobotAction - ]( - steps=[ - MapPhoneActionToRobotAction(platform=teleop_config.phone_os), - EEReferenceAndDelta( - kinematics=kinematics_solver, - end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5}, - motor_names=list(robot.bus.motors.keys()), - use_latched_reference=True, - ), - EEBoundsAndSafety( - end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, - max_ee_step_m=0.20, - ), - GripperVelocityToJoint(speed_factor=20.0), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, + # Phone output pipeline: map raw phone gesture to EE delta (no robot obs needed) + phone.set_output_pipeline( + RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os)], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, + ) ) - # Build pipeline to convert EE action to joints action - robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[ - InverseKinematicsEEToJoints( - kinematics=kinematics_solver, - motor_names=list(robot.bus.motors.keys()), - initial_guess_current_joints=True, - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, + # Robot FK observation pipeline: joints → EE pose + robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + + # Robot input pipeline: EE delta + current robot obs → joint commands + robot.set_input_pipeline( + RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( + steps=[ + EEReferenceAndDelta( + kinematics=kinematics_solver, + end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5}, + motor_names=motor_names, + use_latched_reference=True, + ), + EEBoundsAndSafety( + end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, + max_ee_step_m=0.20, + ), + GripperVelocityToJoint(speed_factor=20.0), + InverseKinematicsEEToJoints( + kinematics=kinematics_solver, + motor_names=motor_names, + initial_guess_current_joints=True, + ), + ], + to_transition=robot_action_observation_to_transition, + to_output=transition_to_robot_action, + ) ) - # Build pipeline to convert joint observation to EE observation - robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()) - ) - ], - to_transition=observation_to_transition, - to_output=transition_to_observation, - ) - - # Create the dataset + # Dataset features auto-derived from robot's FK obs pipeline and phone's mapped action pipeline dataset = LeRobotDataset.create( repo_id=HF_REPO_ID, fps=FPS, - features=combine_feature_dicts( - # Run the feature contract of the pipelines - # This tells you how the features would look like after the pipeline steps - aggregate_pipeline_dataset_features( - pipeline=phone_to_robot_ee_pose_processor, - initial_features=create_initial_features(action=phone.action_features), - use_videos=True, - ), - aggregate_pipeline_dataset_features( - pipeline=robot_joints_to_ee_pose, - initial_features=create_initial_features(observation=robot.observation_features), - use_videos=True, - ), - ), + features=build_dataset_features(robot, phone, use_videos=True), robot_type=robot.name, use_videos=True, image_writer_threads=4, @@ -158,7 +141,7 @@ def main(): while episode_idx < NUM_EPISODES and not events["stop_recording"]: log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop + # Main record loop — pipelines applied internally by robot and phone record_loop( robot=robot, events=events, @@ -168,9 +151,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=phone_to_robot_ee_pose_processor, - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose, ) # Reset the environment if not stopping or re-recording @@ -186,9 +166,6 @@ def main(): control_time_s=RESET_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=phone_to_robot_ee_pose_processor, - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose, ) if events["rerecord_episode"]: diff --git a/examples/rtc/eval_with_real_robot.py b/examples/rtc/eval_with_real_robot.py index 4c803eb7e..f8888d3b8 100644 --- a/examples/rtc/eval_with_real_robot.py +++ b/examples/rtc/eval_with_real_robot.py @@ -87,8 +87,8 @@ from lerobot.policies.rtc.action_queue import ActionQueue from lerobot.policies.rtc.configuration_rtc import RTCConfig from lerobot.policies.rtc.latency_tracker import LatencyTracker from lerobot.processor.factory import ( - make_default_robot_action_processor, - make_default_robot_observation_processor, + _make_identity_observation_pipeline as make_default_robot_observation_processor, + _make_identity_robot_action_pipeline as make_default_robot_action_processor, ) from lerobot.rl.process import ProcessSignalHandler from lerobot.robots import ( # noqa: F401 diff --git a/examples/so100_to_so100_EE/evaluate.py b/examples/so100_to_so100_EE/evaluate.py index b614b89f2..ef51104c4 100644 --- a/examples/so100_to_so100_EE/evaluate.py +++ b/examples/so100_to_so100_EE/evaluate.py @@ -17,30 +17,16 @@ from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import combine_feature_dicts -from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors -from lerobot.processor import ( - RobotAction, - RobotObservation, - RobotProcessorPipeline, - make_default_teleop_action_processor, -) -from lerobot.processor.converters import ( - observation_to_transition, - robot_action_observation_to_transition, - transition_to_observation, - transition_to_robot_action, -) from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig -from lerobot.robots.so_follower.robot_kinematic_processor import ( - ForwardKinematicsJointsToEE, - InverseKinematicsEEToJoints, +from lerobot.robots.so_follower.pipelines import ( + make_so10x_fk_observation_pipeline, + make_so10x_ik_action_pipeline, ) from lerobot.scripts.lerobot_record import record_loop from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.pipeline_utils import build_dataset_features from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun @@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description" HF_MODEL_ID = "/" HF_DATASET_ID = "/" +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" + def main(): # Create the robot configuration & robot @@ -64,68 +54,31 @@ def main(): robot = SO100Follower(robot_config) - # Create policy - policy = ACTPolicy.from_pretrained(HF_MODEL_ID) + # Attach FK/IK pipelines so the robot works in EE space + motor_names = list(robot.bus.motors.keys()) + robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names)) + robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names)) - # NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf - kinematics_solver = RobotKinematics( - urdf_path="./SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(robot.bus.motors.keys()), - ) - - # Build pipeline to convert EE action to joints action - robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[ - InverseKinematicsEEToJoints( - kinematics=kinematics_solver, - motor_names=list(robot.bus.motors.keys()), - initial_guess_current_joints=True, - ), - ], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - - # Build pipeline to convert joints observation to EE observation - robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation]( - steps=[ - ForwardKinematicsJointsToEE( - kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()) - ) - ], - to_transition=observation_to_transition, - to_output=transition_to_observation, - ) - - # Create the dataset + # Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit dataset = LeRobotDataset.create( repo_id=HF_DATASET_ID, fps=FPS, - features=combine_feature_dicts( - aggregate_pipeline_dataset_features( - pipeline=robot_joints_to_ee_pose_processor, - initial_features=create_initial_features(observation=robot.observation_features), - use_videos=True, - ), - # User for now should be explicit on the feature keys that were used for record - # Alternatively, the user can pass the processor step that has the right features - aggregate_pipeline_dataset_features( - pipeline=make_default_teleop_action_processor(), - initial_features=create_initial_features( - action={ - f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,)) - for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"] - } - ), - use_videos=True, - ), + features=build_dataset_features( + robot, + use_videos=True, + action_features={ + f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"] + }, ), robot_type=robot.name, use_videos=True, image_writer_threads=4, ) + # Create policy + policy = ACTPolicy.from_pretrained(HF_MODEL_ID) + # Build Policy Processors preprocessor, postprocessor = make_pre_post_processors( policy_cfg=policy, @@ -135,7 +88,7 @@ def main(): preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}}, ) - # Connect the robot and teleoperator + # Connect the robot robot.connect() # Initialize the keyboard listener and rerun visualization @@ -151,21 +104,18 @@ def main(): for episode_idx in range(NUM_EPISODES): log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop + # Main record loop — pipelines applied internally by robot record_loop( robot=robot, events=events, fps=FPS, policy=policy, - preprocessor=preprocessor, # Pass the pre and post policy processors + preprocessor=preprocessor, postprocessor=postprocessor, dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, ) # Reset the environment if not stopping or re-recording @@ -180,9 +130,6 @@ def main(): control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, ) if events["rerecord_episode"]: diff --git a/src/lerobot/datasets/pipeline_features.py b/src/lerobot/datasets/pipeline_features.py index 9aa815732..b2b494f2e 100644 --- a/src/lerobot/datasets/pipeline_features.py +++ b/src/lerobot/datasets/pipeline_features.py @@ -14,16 +14,12 @@ from __future__ import annotations -import re -from collections.abc import Sequence from typing import TYPE_CHECKING, Any from lerobot.configs.types import PipelineFeatureType -from lerobot.datasets.utils import hw_to_dataset_features -from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR if TYPE_CHECKING: - from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation + from lerobot.processor import RobotAction, RobotObservation def create_initial_features( @@ -45,99 +41,3 @@ def create_initial_features( if observation: features[PipelineFeatureType.OBSERVATION] = observation return features - - -# Helper to filter state/action keys based on regex patterns. -def should_keep(key: str, patterns: tuple[str]) -> bool: - if patterns is None: - return True - return any(re.search(pat, key) for pat in patterns) - - -def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str: - for prefix in prefixes_to_strip: - if key.startswith(prefix): - return key[len(prefix) :] - return key - - -# Define prefixes to strip from feature keys for clean names. -# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms. -PREFIXES_TO_STRIP = tuple( - f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1]) -) - - -def aggregate_pipeline_dataset_features( - pipeline: DataProcessorPipeline, - initial_features: dict[PipelineFeatureType, dict[str, Any]], - *, - use_videos: bool = True, - patterns: Sequence[str] | None = None, -) -> dict[str, dict]: - """ - Aggregates and filters pipeline features to create a dataset-ready features dictionary. - - This function transforms initial features using the pipeline, categorizes them as action or observations - (image or state), filters them based on `use_videos` and `patterns`, and finally - formats them for use with a Hugging Face LeRobot Dataset. - - Args: - pipeline: The DataProcessorPipeline to apply. - initial_features: A dictionary of raw feature specs for actions and observations. - use_videos: If False, image features are excluded. - patterns: A sequence of regex patterns to filter action and state features. - Image features are not affected by this filter. - - Returns: - A dictionary of features formatted for a Hugging Face LeRobot Dataset. - """ - all_features = pipeline.transform_features(initial_features) - - # Intermediate storage for categorized and filtered features. - processed_features: dict[str, dict[str, Any]] = { - ACTION: {}, - OBS_STR: {}, - } - images_token = OBS_IMAGES.split(".")[-1] - - # Iterate through all features transformed by the pipeline. - for ptype, feats in all_features.items(): - if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]: - continue - - for key, value in feats.items(): - # 1. Categorize the feature. - is_action = ptype == PipelineFeatureType.ACTION - # Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3. - # All other observations are treated as state. - is_image = not is_action and ( - (isinstance(value, tuple) and len(value) == 3) - or ( - key.startswith(f"{OBS_IMAGES}.") - or key.startswith(f"{images_token}.") - or f".{images_token}." in key - ) - ) - - # 2. Apply filtering rules. - if is_image and not use_videos: - continue - if not is_image and not should_keep(key, patterns): - continue - - # 3. Add the feature to the appropriate group with a clean name. - name = strip_prefix(key, PREFIXES_TO_STRIP) - if is_action: - processed_features[ACTION][name] = value - else: - processed_features[OBS_STR][name] = value - - # Convert the processed features into the final dataset format. - dataset_features = {} - if processed_features[ACTION]: - dataset_features.update(hw_to_dataset_features(processed_features[ACTION], ACTION, use_videos)) - if processed_features[OBS_STR]: - dataset_features.update(hw_to_dataset_features(processed_features[OBS_STR], OBS_STR, use_videos)) - - return dataset_features diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 0b63e1606..42db35e76 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -30,12 +30,6 @@ from .core import ( ) from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep from .device_processor import DeviceProcessorStep -from .factory import ( - make_default_processors, - make_default_robot_action_processor, - make_default_robot_observation_processor, - make_default_teleop_action_processor, -) from .gym_action_processor import ( Numpy2TorchActionProcessorStep, Torch2NumpyActionProcessorStep, @@ -95,11 +89,7 @@ __all__ = [ "ImageCropResizeProcessorStep", "InfoProcessorStep", "InterventionActionProcessorStep", - "make_default_processors", - "make_default_teleop_action_processor", - "make_default_robot_action_processor", - "make_default_robot_observation_processor", - "MapDeltaActionToRobotActionStep", +"MapDeltaActionToRobotActionStep", "MapTensorToDeltaActionDictStep", "NormalizerProcessorStep", "Numpy2TorchActionProcessorStep", diff --git a/src/lerobot/processor/factory.py b/src/lerobot/processor/factory.py index ffac4cb35..02da28146 100644 --- a/src/lerobot/processor/factory.py +++ b/src/lerobot/processor/factory.py @@ -66,42 +66,3 @@ def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]: ) -# ── Public factory functions (kept for backward compatibility and gym/HIL paths) ────────────────── - - -def make_default_teleop_action_processor() -> RobotProcessorPipeline[ - tuple[RobotAction, RobotObservation], RobotAction -]: - teleop_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[IdentityProcessorStep()], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - return teleop_action_processor - - -def make_default_robot_action_processor() -> RobotProcessorPipeline[ - tuple[RobotAction, RobotObservation], RobotAction -]: - robot_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - steps=[IdentityProcessorStep()], - to_transition=robot_action_observation_to_transition, - to_output=transition_to_robot_action, - ) - return robot_action_processor - - -def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]: - robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation]( - steps=[IdentityProcessorStep()], - to_transition=observation_to_transition, - to_output=transition_to_observation, - ) - return robot_observation_processor - - -def make_default_processors(): - teleop_action_processor = make_default_teleop_action_processor() - robot_action_processor = make_default_robot_action_processor() - robot_observation_processor = make_default_robot_observation_processor() - return (teleop_action_processor, robot_action_processor, robot_observation_processor) diff --git a/src/lerobot/scripts/lerobot_replay.py b/src/lerobot/scripts/lerobot_replay.py index 7c0b5b96b..95499ce6e 100644 --- a/src/lerobot/scripts/lerobot_replay.py +++ b/src/lerobot/scripts/lerobot_replay.py @@ -47,9 +47,6 @@ from pprint import pformat from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.processor import ( - make_default_robot_action_processor, -) from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, @@ -99,8 +96,6 @@ def replay(cfg: ReplayConfig): init_logging() logging.info(pformat(asdict(cfg))) - robot_action_processor = make_default_robot_action_processor() - robot = make_robot_from_config(cfg.robot) dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode]) @@ -120,11 +115,10 @@ def replay(cfg: ReplayConfig): for i, name in enumerate(dataset.features[ACTION]["names"]): action[name] = action_array[i] - robot_obs = robot.get_observation() + # Update cached observation so the robot's input pipeline can use it (e.g. for IK) + robot.get_observation() - processed_action = robot_action_processor((action, robot_obs)) - - _ = robot.send_action(processed_action) + _ = robot.send_action(action) dt_s = time.perf_counter() - start_episode_t precise_sleep(max(1 / dataset.fps - dt_s, 0.0)) diff --git a/src/lerobot/utils/pipeline_utils.py b/src/lerobot/utils/pipeline_utils.py index 119aeb14b..6f4d356a6 100644 --- a/src/lerobot/utils/pipeline_utils.py +++ b/src/lerobot/utils/pipeline_utils.py @@ -20,9 +20,78 @@ checking action/observation space compatibility between teleops and robots. """ import logging +import re +from collections.abc import Sequence -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features -from lerobot.datasets.utils import combine_feature_dicts +from lerobot.datasets.utils import combine_feature_dicts, hw_to_dataset_features +from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR + +# Prefixes stripped from feature keys to produce clean dataset names. +# Handles both fully-qualified (e.g. "observation.state.ee.x") and short (e.g. "state.ee.x") forms. +_PREFIXES_TO_STRIP = tuple( + f"{token}." + for const in (ACTION, OBS_STATE, OBS_IMAGES) + for token in (const, const.split(".")[-1]) +) + +_IMAGES_TOKEN = OBS_IMAGES.split(".")[-1] + + +def _should_keep(key: str, patterns: Sequence[str] | None) -> bool: + if patterns is None: + return True + return any(re.search(pat, key) for pat in patterns) + + +def _strip_prefix(key: str) -> str: + for prefix in _PREFIXES_TO_STRIP: + if key.startswith(prefix): + return key[len(prefix) :] + return key + + +def _features_to_dataset_spec( + features: dict, + *, + is_action: bool, + use_videos: bool, + patterns: Sequence[str] | None = None, +) -> dict: + """ + Convert a flat feature dict (as returned by ``robot.observation_features`` or + ``teleop.action_features``) into a LeRobot dataset feature specification. + + Args: + features: Flat dict mapping feature key → type or shape. + is_action: True when ``features`` describes actions; False for observations. + use_videos: When False, image observation features are excluded entirely. + patterns: Optional regex patterns to filter state/action features. + Image features are not affected by this filter. + + Returns: + A dict suitable for passing to ``LeRobotDataset.create(..., features=...)``. + """ + categorized: dict = {} + for key, value in features.items(): + is_image = not is_action and ( + (isinstance(value, tuple) and len(value) == 3) + or key.startswith(f"{OBS_IMAGES}.") + or key.startswith(f"{_IMAGES_TOKEN}.") + or f".{_IMAGES_TOKEN}." in key + ) + + if is_image and not use_videos: + continue + if not is_image and not _should_keep(key, patterns): + continue + + categorized[_strip_prefix(key)] = value + + if not categorized: + return {} + + prefix = ACTION if is_action else OBS_STR + return hw_to_dataset_features(categorized, prefix, use_videos) def build_dataset_features( @@ -30,22 +99,26 @@ def build_dataset_features( teleop=None, *, use_videos: bool = True, + action_features: dict | None = None, ) -> dict: """ Derive dataset feature specifications from robot and teleoperator pipelines. - Uses the robot's ``output_pipeline`` and ``raw_observation_features`` to determine - what the dataset will store as observations, and (when provided) the teleoperator's - ``output_pipeline`` and ``raw_action_features`` to determine what will be stored as actions. + Reads ``robot.observation_features`` (which already reflects the robot's output + pipeline transformation) and, when provided, ``teleop.action_features`` or an + explicit ``action_features`` dict to determine what the dataset will store. This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features`` with explicit processor objects. Args: - robot: The robot instance (must have ``output_pipeline()`` and ``raw_observation_features``). - teleop: The teleoperator instance. When ``None`` (policy-only recording), only observation - features are returned. + robot: The robot instance (must have ``observation_features``). + teleop: The teleoperator instance. When ``None`` and ``action_features`` is also + ``None`` (policy-only recording), only observation features are returned. use_videos: If True, image observations are included as video features. + action_features: Explicit action feature dict, used when no teleop is available + (e.g. evaluate/inference mode) but the dataset must match a specific action + space (e.g. EE coordinates from a previously recorded dataset). Returns: A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``. @@ -57,20 +130,27 @@ def build_dataset_features( # Policy-only recording (no teleop) features = build_dataset_features(robot, use_videos=True) + + # Evaluate with explicit EE action space + features = build_dataset_features( + robot, + use_videos=True, + action_features={ + f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,)) + for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"] + }, + ) """ - obs_features = aggregate_pipeline_dataset_features( - pipeline=robot.output_pipeline(), - initial_features=create_initial_features(observation=robot.raw_observation_features), - use_videos=use_videos, - ) - if teleop is None: - return obs_features - action_features = aggregate_pipeline_dataset_features( - pipeline=teleop.output_pipeline(), - initial_features=create_initial_features(action=teleop.raw_action_features), - use_videos=False, - ) - return combine_feature_dicts(action_features, obs_features) + obs_ds = _features_to_dataset_spec(robot.observation_features, is_action=False, use_videos=use_videos) + + if action_features is not None: + act_ds = _features_to_dataset_spec(action_features, is_action=True, use_videos=False) + elif teleop is not None: + act_ds = _features_to_dataset_spec(teleop.action_features, is_action=True, use_videos=False) + else: + return obs_ds + + return combine_feature_dicts(act_ds, obs_ds) def check_action_space_compatibility(teleop, robot) -> None: diff --git a/tests/processor/test_pipeline.py b/tests/processor/test_pipeline.py index 58a83fe69..053070096 100644 --- a/tests/processor/test_pipeline.py +++ b/tests/processor/test_pipeline.py @@ -26,7 +26,7 @@ import torch import torch.nn as nn from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature -from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features +from lerobot.utils.pipeline_utils import _features_to_dataset_spec from lerobot.processor import ( DataProcessorPipeline, EnvTransition, @@ -2040,102 +2040,68 @@ def test_features_remove_from_initial(policy_feature_factory): ) -@dataclass -class AddActionEEAndJointFeatures(ProcessorStep): - """Adds both EE and JOINT action features.""" - - def __call__(self, tr): - return tr - - def transform_features( - self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] - ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: - # EE features - features[PipelineFeatureType.ACTION]["action.ee.x"] = float - features[PipelineFeatureType.ACTION]["action.ee.y"] = float - # JOINT features - features[PipelineFeatureType.ACTION]["action.j1.pos"] = float - features[PipelineFeatureType.ACTION]["action.j2.pos"] = float - return features +# ── Tests for _features_to_dataset_spec ────────────────────────────────────────────────────────── +# These replace the old aggregate_pipeline_dataset_features tests, covering the same categorisation +# / filtering / prefix-stripping / HF-format logic via the private helper directly. -@dataclass -class AddObservationStateFeatures(ProcessorStep): - """Adds state features (and optionally an image spec to test precedence).""" - - add_front_image: bool = False - front_image_shape: tuple = (240, 320, 3) - - def __call__(self, tr): - return tr - - def transform_features( - self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] - ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: - # State features (mix EE and a joint state) - features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.ee.x"] = float - features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.j1.pos"] = float - if self.add_front_image: - features[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.front"] = self.front_image_shape - return features - - -def test_aggregate_joint_action_only(): - rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) - initial = {PipelineFeatureType.OBSERVATION: {"front": (480, 640, 3)}, PipelineFeatureType.ACTION: {}} - - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features=initial, - use_videos=True, - patterns=["action.j1.pos", "action.j2.pos"], +def test_dataset_spec_action_with_patterns(): + """Action features are filtered by pattern; unmatched keys are excluded.""" + features = { + "action.ee.x": float, + "action.ee.y": float, + "action.j1.pos": float, + "action.j2.pos": float, + } + out = _features_to_dataset_spec( + features, is_action=True, use_videos=True, patterns=["action.j1.pos", "action.j2.pos"] ) - # Expect only ACTION with joint names - assert ACTION in out and OBS_STATE not in out + assert ACTION in out assert out[ACTION]["dtype"] == "float32" assert set(out[ACTION]["names"]) == {"j1.pos", "j2.pos"} assert out[ACTION]["shape"] == (len(out[ACTION]["names"]),) + assert OBS_STATE not in out -def test_aggregate_ee_action_and_observation_with_videos(): - rp = DataProcessorPipeline([AddActionEEAndJointFeatures(), AddObservationStateFeatures()]) - initial = {"front": (480, 640, 3), "side": (720, 1280, 3)} +def test_dataset_spec_action_and_observation_with_videos(): + """EE action + state obs + image obs; all appear with correct dtypes.""" + action_features = {"action.ee.x": float, "action.ee.y": float} + obs_features = { + f"{OBS_STATE}.ee.x": float, + f"{OBS_STATE}.j1.pos": float, + "front": (480, 640, 3), + "side": (720, 1280, 3), + } - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}}, - use_videos=True, - patterns=["action.ee", OBS_STATE], - ) + act_out = _features_to_dataset_spec(action_features, is_action=True, use_videos=False) + obs_out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True) - # Action should pack only EE names - assert ACTION in out - assert set(out[ACTION]["names"]) == {"ee.x", "ee.y"} - assert out[ACTION]["dtype"] == "float32" + assert ACTION in act_out + assert set(act_out[ACTION]["names"]) == {"ee.x", "ee.y"} + assert act_out[ACTION]["dtype"] == "float32" - # Observation state should pack both ee.x and j1.pos as a vector - assert OBS_STATE in out - assert set(out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"} - assert out[OBS_STATE]["dtype"] == "float32" + assert OBS_STATE in obs_out + assert set(obs_out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"} + assert obs_out[OBS_STATE]["dtype"] == "float32" - # Cameras from initial_features appear as videos - for cam in ("front", "side"): + for cam, shape in [("front", (480, 640, 3)), ("side", (720, 1280, 3))]: key = f"{OBS_IMAGES}.{cam}" - assert key in out - assert out[key]["dtype"] == "video" - assert out[key]["shape"] == initial[cam] - assert out[key]["names"] == ["height", "width", "channels"] + assert key in obs_out, f"missing camera key {key}" + assert obs_out[key]["dtype"] == "video" + assert obs_out[key]["shape"] == shape + assert obs_out[key]["names"] == ["height", "width", "channels"] -def test_aggregate_both_action_types(): - rp = DataProcessorPipeline([AddActionEEAndJointFeatures()]) - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}}, - use_videos=True, - patterns=["action.ee", "action.j1", "action.j2.pos"], - ) +def test_dataset_spec_all_action_types(): + """EE and joint action features are both included when no pattern filter.""" + features = { + "action.ee.x": float, + "action.ee.y": float, + "action.j1.pos": float, + "action.j2.pos": float, + } + out = _features_to_dataset_spec(features, is_action=True, use_videos=True, patterns=None) assert ACTION in out expected = {"ee.x", "ee.y", "j1.pos", "j2.pos"} @@ -2143,58 +2109,40 @@ def test_aggregate_both_action_types(): assert out[ACTION]["shape"] == (len(expected),) -def test_aggregate_images_when_use_videos_false(): - rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)]) - initial = {"back": (480, 640, 3)} +def test_dataset_spec_images_excluded_when_no_videos(): + """Image observation features are dropped entirely when use_videos=False.""" + obs_features = { + f"{OBS_STATE}.j1.pos": float, + "back": (480, 640, 3), + f"{OBS_IMAGES}.front": (240, 320, 3), + } + out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=False) - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial}, - use_videos=False, # expect "image" dtype - patterns=None, - ) - - key = f"{OBS_IMAGES}.back" - key_front = f"{OBS_IMAGES}.front" - assert key not in out - assert key_front not in out + assert f"{OBS_IMAGES}.back" not in out + assert f"{OBS_IMAGES}.front" not in out + # Non-image state feature is still present + assert OBS_STATE in out + assert "j1.pos" in out[OBS_STATE]["names"] -def test_aggregate_images_when_use_videos_true(): - rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)]) - initial = {"back": (480, 640, 3)} +def test_dataset_spec_images_included_when_use_videos(): + """Image features appear as video entries when use_videos=True.""" + obs_features = { + "back": (480, 640, 3), + f"{OBS_IMAGES}.front": (240, 320, 3), + } + out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True) - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}}, - use_videos=True, - patterns=None, - ) + assert f"{OBS_IMAGES}.back" in out + assert out[f"{OBS_IMAGES}.back"]["dtype"] == "video" + assert out[f"{OBS_IMAGES}.back"]["shape"] == (480, 640, 3) - key = f"{OBS_IMAGES}.front" - key_back = f"{OBS_IMAGES}.back" - assert key in out - assert key_back in out - assert out[key]["dtype"] == "video" - assert out[key_back]["dtype"] == "video" - assert out[key_back]["shape"] == initial["back"] + assert f"{OBS_IMAGES}.front" in out + assert out[f"{OBS_IMAGES}.front"]["dtype"] == "video" + assert out[f"{OBS_IMAGES}.front"]["shape"] == (240, 320, 3) -def test_initial_camera_not_overridden_by_step_image(): - # Step explicitly sets a different front image shape; initial has another shape. - # aggregate_pipeline_dataset_features should keep the step's value (setdefault behavior on initial cams). - rp = DataProcessorPipeline( - [AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))] - ) - initial = {"front": (480, 640, 3)} # should NOT override the step-provided (240, 320, 3) - - out = aggregate_pipeline_dataset_features( - pipeline=rp, - initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial}, - use_videos=True, - patterns=[f"{OBS_IMAGES}.front"], - ) - - key = f"{OBS_IMAGES}.front" - assert key in out - assert out[key]["shape"] == (240, 320, 3) # from the step, not from initial +def test_dataset_spec_empty_features_returns_empty(): + """Empty feature dict returns an empty output dict.""" + assert _features_to_dataset_spec({}, is_action=True, use_videos=True) == {} + assert _features_to_dataset_spec({}, is_action=False, use_videos=True) == {}