also refactor and remove use of aggregate_pipeline_dataset_features() as we already aggregate expected features on robot and teleop classes

This commit is contained in:
Pepijn
2026-03-02 21:05:58 +01:00
parent 7940bfad52
commit 9860f794cf
12 changed files with 286 additions and 562 deletions
-10
View File
@@ -18,7 +18,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.constants import ACTION, OBS_STR
@@ -71,9 +70,6 @@ def main():
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Initialize the keyboard listener and rerun visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="lekiwi_evaluate")
@@ -99,9 +95,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -116,9 +109,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:
-10
View File
@@ -16,7 +16,6 @@
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.scripts.lerobot_record import record_loop
@@ -46,9 +45,6 @@ def main():
leader_arm = SO100Leader(leader_arm_config)
keyboard = KeyboardTeleop(keyboard_config)
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, ACTION)
obs_features = hw_to_dataset_features(robot.observation_features, OBS_STR)
@@ -93,9 +89,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -111,9 +104,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:
+25 -78
View File
@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:
+50 -73
View File
@@ -16,21 +16,17 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.pipelines import make_so10x_fk_observation_pipeline
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
EEReferenceAndDelta,
ForwardKinematicsJointsToEE,
GripperVelocityToJoint,
InverseKinematicsEEToJoints,
)
@@ -39,6 +35,7 @@ from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS
from lerobot.teleoperators.phone.phone_processor import MapPhoneActionToRobotAction
from lerobot.teleoperators.phone.teleop_phone import Phone
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -49,6 +46,10 @@ RESET_TIME_SEC = 30
TASK_DESCRIPTION = "My task description"
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot and teleoperator configurations
@@ -65,77 +66,59 @@ def main():
robot = SO100Follower(robot_config)
phone = Phone(teleop_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
motor_names = list(robot.bus.motors.keys())
from lerobot.model.kinematics import RobotKinematics
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
urdf_path=URDF_PATH,
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
joint_names=motor_names,
)
# Build pipeline to convert phone action to EE action
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
](
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=list(robot.bus.motors.keys()),
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Phone output pipeline: map raw phone gesture to EE delta (no robot obs needed)
phone.set_output_pipeline(
RobotProcessorPipeline[RobotAction, RobotAction](
steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os)],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Robot FK observation pipeline: joints → EE pose
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
# Robot input pipeline: EE delta + current robot obs → joint commands
robot.set_input_pipeline(
RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=motor_names,
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=motor_names,
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert joint observation to EE observation
robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Dataset features auto-derived from robot's FK obs pipeline and phone's mapped action pipeline
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=phone_to_robot_ee_pose_processor,
initial_features=create_initial_features(action=phone.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
),
features=build_dataset_features(robot, phone, use_videos=True),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
@@ -158,7 +141,7 @@ def main():
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot and phone
record_loop(
robot=robot,
events=events,
@@ -168,9 +151,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
# Reset the environment if not stopping or re-recording
@@ -186,9 +166,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
if events["rerecord_episode"]:
+2 -2
View File
@@ -87,8 +87,8 @@ from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.rtc.latency_tracker import LatencyTracker
from lerobot.processor.factory import (
make_default_robot_action_processor,
make_default_robot_observation_processor,
_make_identity_observation_pipeline as make_default_robot_observation_processor,
_make_identity_robot_action_pipeline as make_default_robot_action_processor,
)
from lerobot.rl.process import ProcessSignalHandler
from lerobot.robots import ( # noqa: F401
+26 -79
View File
@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -135,7 +88,7 @@ def main():
preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}},
)
# Connect the robot and teleoperator
# Connect the robot
robot.connect()
# Initialize the keyboard listener and rerun visualization
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:
+1 -101
View File
@@ -14,16 +14,12 @@
from __future__ import annotations
import re
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
from lerobot.configs.types import PipelineFeatureType
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
if TYPE_CHECKING:
from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation
from lerobot.processor import RobotAction, RobotObservation
def create_initial_features(
@@ -45,99 +41,3 @@ def create_initial_features(
if observation:
features[PipelineFeatureType.OBSERVATION] = observation
return features
# Helper to filter state/action keys based on regex patterns.
def should_keep(key: str, patterns: tuple[str]) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str:
for prefix in prefixes_to_strip:
if key.startswith(prefix):
return key[len(prefix) :]
return key
# Define prefixes to strip from feature keys for clean names.
# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms.
PREFIXES_TO_STRIP = tuple(
f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1])
)
def aggregate_pipeline_dataset_features(
pipeline: DataProcessorPipeline,
initial_features: dict[PipelineFeatureType, dict[str, Any]],
*,
use_videos: bool = True,
patterns: Sequence[str] | None = None,
) -> dict[str, dict]:
"""
Aggregates and filters pipeline features to create a dataset-ready features dictionary.
This function transforms initial features using the pipeline, categorizes them as action or observations
(image or state), filters them based on `use_videos` and `patterns`, and finally
formats them for use with a Hugging Face LeRobot Dataset.
Args:
pipeline: The DataProcessorPipeline to apply.
initial_features: A dictionary of raw feature specs for actions and observations.
use_videos: If False, image features are excluded.
patterns: A sequence of regex patterns to filter action and state features.
Image features are not affected by this filter.
Returns:
A dictionary of features formatted for a Hugging Face LeRobot Dataset.
"""
all_features = pipeline.transform_features(initial_features)
# Intermediate storage for categorized and filtered features.
processed_features: dict[str, dict[str, Any]] = {
ACTION: {},
OBS_STR: {},
}
images_token = OBS_IMAGES.split(".")[-1]
# Iterate through all features transformed by the pipeline.
for ptype, feats in all_features.items():
if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]:
continue
for key, value in feats.items():
# 1. Categorize the feature.
is_action = ptype == PipelineFeatureType.ACTION
# Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3.
# All other observations are treated as state.
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or (
key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{images_token}.")
or f".{images_token}." in key
)
)
# 2. Apply filtering rules.
if is_image and not use_videos:
continue
if not is_image and not should_keep(key, patterns):
continue
# 3. Add the feature to the appropriate group with a clean name.
name = strip_prefix(key, PREFIXES_TO_STRIP)
if is_action:
processed_features[ACTION][name] = value
else:
processed_features[OBS_STR][name] = value
# Convert the processed features into the final dataset format.
dataset_features = {}
if processed_features[ACTION]:
dataset_features.update(hw_to_dataset_features(processed_features[ACTION], ACTION, use_videos))
if processed_features[OBS_STR]:
dataset_features.update(hw_to_dataset_features(processed_features[OBS_STR], OBS_STR, use_videos))
return dataset_features
+1 -11
View File
@@ -30,12 +30,6 @@ from .core import (
)
from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep
from .device_processor import DeviceProcessorStep
from .factory import (
make_default_processors,
make_default_robot_action_processor,
make_default_robot_observation_processor,
make_default_teleop_action_processor,
)
from .gym_action_processor import (
Numpy2TorchActionProcessorStep,
Torch2NumpyActionProcessorStep,
@@ -95,11 +89,7 @@ __all__ = [
"ImageCropResizeProcessorStep",
"InfoProcessorStep",
"InterventionActionProcessorStep",
"make_default_processors",
"make_default_teleop_action_processor",
"make_default_robot_action_processor",
"make_default_robot_observation_processor",
"MapDeltaActionToRobotActionStep",
"MapDeltaActionToRobotActionStep",
"MapTensorToDeltaActionDictStep",
"NormalizerProcessorStep",
"Numpy2TorchActionProcessorStep",
-39
View File
@@ -66,42 +66,3 @@ def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]:
)
# ── Public factory functions (kept for backward compatibility and gym/HIL paths) ──────────────────
def make_default_teleop_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
teleop_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return teleop_action_processor
def make_default_robot_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
robot_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return robot_action_processor
def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
return robot_observation_processor
def make_default_processors():
teleop_action_processor = make_default_teleop_action_processor()
robot_action_processor = make_default_robot_action_processor()
robot_observation_processor = make_default_robot_observation_processor()
return (teleop_action_processor, robot_action_processor, robot_observation_processor)
+3 -9
View File
@@ -47,9 +47,6 @@ from pprint import pformat
from lerobot.configs import parser
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor import (
make_default_robot_action_processor,
)
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -99,8 +96,6 @@ def replay(cfg: ReplayConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
robot_action_processor = make_default_robot_action_processor()
robot = make_robot_from_config(cfg.robot)
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
@@ -120,11 +115,10 @@ def replay(cfg: ReplayConfig):
for i, name in enumerate(dataset.features[ACTION]["names"]):
action[name] = action_array[i]
robot_obs = robot.get_observation()
# Update cached observation so the robot's input pipeline can use it (e.g. for IK)
robot.get_observation()
processed_action = robot_action_processor((action, robot_obs))
_ = robot.send_action(processed_action)
_ = robot.send_action(action)
dt_s = time.perf_counter() - start_episode_t
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))
+101 -21
View File
@@ -20,9 +20,78 @@ checking action/observation space compatibility between teleops and robots.
"""
import logging
import re
from collections.abc import Sequence
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.datasets.utils import combine_feature_dicts, hw_to_dataset_features
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
# Prefixes stripped from feature keys to produce clean dataset names.
# Handles both fully-qualified (e.g. "observation.state.ee.x") and short (e.g. "state.ee.x") forms.
_PREFIXES_TO_STRIP = tuple(
f"{token}."
for const in (ACTION, OBS_STATE, OBS_IMAGES)
for token in (const, const.split(".")[-1])
)
_IMAGES_TOKEN = OBS_IMAGES.split(".")[-1]
def _should_keep(key: str, patterns: Sequence[str] | None) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def _strip_prefix(key: str) -> str:
for prefix in _PREFIXES_TO_STRIP:
if key.startswith(prefix):
return key[len(prefix) :]
return key
def _features_to_dataset_spec(
features: dict,
*,
is_action: bool,
use_videos: bool,
patterns: Sequence[str] | None = None,
) -> dict:
"""
Convert a flat feature dict (as returned by ``robot.observation_features`` or
``teleop.action_features``) into a LeRobot dataset feature specification.
Args:
features: Flat dict mapping feature key type or shape.
is_action: True when ``features`` describes actions; False for observations.
use_videos: When False, image observation features are excluded entirely.
patterns: Optional regex patterns to filter state/action features.
Image features are not affected by this filter.
Returns:
A dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
"""
categorized: dict = {}
for key, value in features.items():
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{_IMAGES_TOKEN}.")
or f".{_IMAGES_TOKEN}." in key
)
if is_image and not use_videos:
continue
if not is_image and not _should_keep(key, patterns):
continue
categorized[_strip_prefix(key)] = value
if not categorized:
return {}
prefix = ACTION if is_action else OBS_STR
return hw_to_dataset_features(categorized, prefix, use_videos)
def build_dataset_features(
@@ -30,22 +99,26 @@ def build_dataset_features(
teleop=None,
*,
use_videos: bool = True,
action_features: dict | None = None,
) -> dict:
"""
Derive dataset feature specifications from robot and teleoperator pipelines.
Uses the robot's ``output_pipeline`` and ``raw_observation_features`` to determine
what the dataset will store as observations, and (when provided) the teleoperator's
``output_pipeline`` and ``raw_action_features`` to determine what will be stored as actions.
Reads ``robot.observation_features`` (which already reflects the robot's output
pipeline transformation) and, when provided, ``teleop.action_features`` or an
explicit ``action_features`` dict to determine what the dataset will store.
This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features``
with explicit processor objects.
Args:
robot: The robot instance (must have ``output_pipeline()`` and ``raw_observation_features``).
teleop: The teleoperator instance. When ``None`` (policy-only recording), only observation
features are returned.
robot: The robot instance (must have ``observation_features``).
teleop: The teleoperator instance. When ``None`` and ``action_features`` is also
``None`` (policy-only recording), only observation features are returned.
use_videos: If True, image observations are included as video features.
action_features: Explicit action feature dict, used when no teleop is available
(e.g. evaluate/inference mode) but the dataset must match a specific action
space (e.g. EE coordinates from a previously recorded dataset).
Returns:
A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
@@ -57,20 +130,27 @@ def build_dataset_features(
# Policy-only recording (no teleop)
features = build_dataset_features(robot, use_videos=True)
# Evaluate with explicit EE action space
features = build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
)
"""
obs_features = aggregate_pipeline_dataset_features(
pipeline=robot.output_pipeline(),
initial_features=create_initial_features(observation=robot.raw_observation_features),
use_videos=use_videos,
)
if teleop is None:
return obs_features
action_features = aggregate_pipeline_dataset_features(
pipeline=teleop.output_pipeline(),
initial_features=create_initial_features(action=teleop.raw_action_features),
use_videos=False,
)
return combine_feature_dicts(action_features, obs_features)
obs_ds = _features_to_dataset_spec(robot.observation_features, is_action=False, use_videos=use_videos)
if action_features is not None:
act_ds = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
elif teleop is not None:
act_ds = _features_to_dataset_spec(teleop.action_features, is_action=True, use_videos=False)
else:
return obs_ds
return combine_feature_dicts(act_ds, obs_ds)
def check_action_space_compatibility(teleop, robot) -> None:
+77 -129
View File
@@ -26,7 +26,7 @@ import torch
import torch.nn as nn
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
from lerobot.utils.pipeline_utils import _features_to_dataset_spec
from lerobot.processor import (
DataProcessorPipeline,
EnvTransition,
@@ -2040,102 +2040,68 @@ def test_features_remove_from_initial(policy_feature_factory):
)
@dataclass
class AddActionEEAndJointFeatures(ProcessorStep):
"""Adds both EE and JOINT action features."""
def __call__(self, tr):
return tr
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# EE features
features[PipelineFeatureType.ACTION]["action.ee.x"] = float
features[PipelineFeatureType.ACTION]["action.ee.y"] = float
# JOINT features
features[PipelineFeatureType.ACTION]["action.j1.pos"] = float
features[PipelineFeatureType.ACTION]["action.j2.pos"] = float
return features
# ── Tests for _features_to_dataset_spec ──────────────────────────────────────────────────────────
# These replace the old aggregate_pipeline_dataset_features tests, covering the same categorisation
# / filtering / prefix-stripping / HF-format logic via the private helper directly.
@dataclass
class AddObservationStateFeatures(ProcessorStep):
"""Adds state features (and optionally an image spec to test precedence)."""
add_front_image: bool = False
front_image_shape: tuple = (240, 320, 3)
def __call__(self, tr):
return tr
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
# State features (mix EE and a joint state)
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.ee.x"] = float
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.j1.pos"] = float
if self.add_front_image:
features[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.front"] = self.front_image_shape
return features
def test_aggregate_joint_action_only():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
initial = {PipelineFeatureType.OBSERVATION: {"front": (480, 640, 3)}, PipelineFeatureType.ACTION: {}}
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features=initial,
use_videos=True,
patterns=["action.j1.pos", "action.j2.pos"],
def test_dataset_spec_action_with_patterns():
"""Action features are filtered by pattern; unmatched keys are excluded."""
features = {
"action.ee.x": float,
"action.ee.y": float,
"action.j1.pos": float,
"action.j2.pos": float,
}
out = _features_to_dataset_spec(
features, is_action=True, use_videos=True, patterns=["action.j1.pos", "action.j2.pos"]
)
# Expect only ACTION with joint names
assert ACTION in out and OBS_STATE not in out
assert ACTION in out
assert out[ACTION]["dtype"] == "float32"
assert set(out[ACTION]["names"]) == {"j1.pos", "j2.pos"}
assert out[ACTION]["shape"] == (len(out[ACTION]["names"]),)
assert OBS_STATE not in out
def test_aggregate_ee_action_and_observation_with_videos():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures(), AddObservationStateFeatures()])
initial = {"front": (480, 640, 3), "side": (720, 1280, 3)}
def test_dataset_spec_action_and_observation_with_videos():
"""EE action + state obs + image obs; all appear with correct dtypes."""
action_features = {"action.ee.x": float, "action.ee.y": float}
obs_features = {
f"{OBS_STATE}.ee.x": float,
f"{OBS_STATE}.j1.pos": float,
"front": (480, 640, 3),
"side": (720, 1280, 3),
}
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
use_videos=True,
patterns=["action.ee", OBS_STATE],
)
act_out = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
obs_out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
# Action should pack only EE names
assert ACTION in out
assert set(out[ACTION]["names"]) == {"ee.x", "ee.y"}
assert out[ACTION]["dtype"] == "float32"
assert ACTION in act_out
assert set(act_out[ACTION]["names"]) == {"ee.x", "ee.y"}
assert act_out[ACTION]["dtype"] == "float32"
# Observation state should pack both ee.x and j1.pos as a vector
assert OBS_STATE in out
assert set(out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
assert out[OBS_STATE]["dtype"] == "float32"
assert OBS_STATE in obs_out
assert set(obs_out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
assert obs_out[OBS_STATE]["dtype"] == "float32"
# Cameras from initial_features appear as videos
for cam in ("front", "side"):
for cam, shape in [("front", (480, 640, 3)), ("side", (720, 1280, 3))]:
key = f"{OBS_IMAGES}.{cam}"
assert key in out
assert out[key]["dtype"] == "video"
assert out[key]["shape"] == initial[cam]
assert out[key]["names"] == ["height", "width", "channels"]
assert key in obs_out, f"missing camera key {key}"
assert obs_out[key]["dtype"] == "video"
assert obs_out[key]["shape"] == shape
assert obs_out[key]["names"] == ["height", "width", "channels"]
def test_aggregate_both_action_types():
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}},
use_videos=True,
patterns=["action.ee", "action.j1", "action.j2.pos"],
)
def test_dataset_spec_all_action_types():
"""EE and joint action features are both included when no pattern filter."""
features = {
"action.ee.x": float,
"action.ee.y": float,
"action.j1.pos": float,
"action.j2.pos": float,
}
out = _features_to_dataset_spec(features, is_action=True, use_videos=True, patterns=None)
assert ACTION in out
expected = {"ee.x", "ee.y", "j1.pos", "j2.pos"}
@@ -2143,58 +2109,40 @@ def test_aggregate_both_action_types():
assert out[ACTION]["shape"] == (len(expected),)
def test_aggregate_images_when_use_videos_false():
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
initial = {"back": (480, 640, 3)}
def test_dataset_spec_images_excluded_when_no_videos():
"""Image observation features are dropped entirely when use_videos=False."""
obs_features = {
f"{OBS_STATE}.j1.pos": float,
"back": (480, 640, 3),
f"{OBS_IMAGES}.front": (240, 320, 3),
}
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=False)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
use_videos=False, # expect "image" dtype
patterns=None,
)
key = f"{OBS_IMAGES}.back"
key_front = f"{OBS_IMAGES}.front"
assert key not in out
assert key_front not in out
assert f"{OBS_IMAGES}.back" not in out
assert f"{OBS_IMAGES}.front" not in out
# Non-image state feature is still present
assert OBS_STATE in out
assert "j1.pos" in out[OBS_STATE]["names"]
def test_aggregate_images_when_use_videos_true():
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
initial = {"back": (480, 640, 3)}
def test_dataset_spec_images_included_when_use_videos():
"""Image features appear as video entries when use_videos=True."""
obs_features = {
"back": (480, 640, 3),
f"{OBS_IMAGES}.front": (240, 320, 3),
}
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
use_videos=True,
patterns=None,
)
assert f"{OBS_IMAGES}.back" in out
assert out[f"{OBS_IMAGES}.back"]["dtype"] == "video"
assert out[f"{OBS_IMAGES}.back"]["shape"] == (480, 640, 3)
key = f"{OBS_IMAGES}.front"
key_back = f"{OBS_IMAGES}.back"
assert key in out
assert key_back in out
assert out[key]["dtype"] == "video"
assert out[key_back]["dtype"] == "video"
assert out[key_back]["shape"] == initial["back"]
assert f"{OBS_IMAGES}.front" in out
assert out[f"{OBS_IMAGES}.front"]["dtype"] == "video"
assert out[f"{OBS_IMAGES}.front"]["shape"] == (240, 320, 3)
def test_initial_camera_not_overridden_by_step_image():
# Step explicitly sets a different front image shape; initial has another shape.
# aggregate_pipeline_dataset_features should keep the step's value (setdefault behavior on initial cams).
rp = DataProcessorPipeline(
[AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))]
)
initial = {"front": (480, 640, 3)} # should NOT override the step-provided (240, 320, 3)
out = aggregate_pipeline_dataset_features(
pipeline=rp,
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
use_videos=True,
patterns=[f"{OBS_IMAGES}.front"],
)
key = f"{OBS_IMAGES}.front"
assert key in out
assert out[key]["shape"] == (240, 320, 3) # from the step, not from initial
def test_dataset_spec_empty_features_returns_empty():
"""Empty feature dict returns an empty output dict."""
assert _features_to_dataset_spec({}, is_action=True, use_videos=True) == {}
assert _features_to_dataset_spec({}, is_action=False, use_videos=True) == {}