debug(scripts): simplify record with processors (#1918)

Co-authored-by: Adil Zouitine <adilzouitinegm@gmail.com>
This commit is contained in:
Steven Palma
2025-09-11 18:20:39 +02:00
committed by GitHub
parent efde42d4a9
commit cd0098a5f7
4 changed files with 63 additions and 67 deletions
+1 -3
View File
@@ -93,9 +93,8 @@ robot_joints_to_ee_pose_processor = RobotProcessorPipeline[dict[str, Any], EnvTr
# Build dataset action and gripper features
action_ee_and_gripper = aggregate_pipeline_dataset_features(
pipeline=robot_ee_to_joints_processor,
initial_features=create_initial_features(),
initial_features=create_initial_features(action=robot.action_features),
use_videos=True,
patterns=["action.ee", "action.gripper.pos", "observation.state.gripper.pos"],
) # Get all ee action features + gripper pos action features
# Build dataset observation features
@@ -103,7 +102,6 @@ obs_ee = aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
patterns=["observation.state.ee"],
) # Get all ee observation features
dataset_features = combine_feature_dicts(obs_ee, action_ee_and_gripper)
+1 -4
View File
@@ -125,15 +125,13 @@ action_ee = aggregate_pipeline_dataset_features(
pipeline=phone_to_robot_ee_pose_processor,
initial_features=create_initial_features(action=phone.action_features),
use_videos=True,
patterns=["action.ee"],
)
# Get gripper pos action features
gripper = aggregate_pipeline_dataset_features(
pipeline=robot_ee_to_joints_processor,
initial_features=create_initial_features(),
initial_features=create_initial_features(action=robot.action_features),
use_videos=True,
patterns=["action.gripper.pos", "observation.state.gripper.pos"],
)
# Build dataset ee observation features
@@ -141,7 +139,6 @@ observation_ee = aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
patterns=["observation.state.ee"],
)
dataset_features = combine_feature_dicts(action_ee, gripper, observation_ee)
+1
View File
@@ -381,6 +381,7 @@ def merge_transitions(transitions: Sequence[EnvTransition] | EnvTransition) -> E
return result
# TODO(Steven): Currently unused consider removing after testing
def transition_to_dataset_frame(
transitions_or_transition: EnvTransition | Sequence[EnvTransition], features: dict[str, dict]
) -> dict[str, Any]:
+60 -60
View File
@@ -73,7 +73,8 @@ from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
@@ -90,7 +91,6 @@ from lerobot.processor.converters import (
identity_transition,
observation_to_transition,
robot_action_to_transition,
transition_to_dataset_frame,
transition_to_robot_action,
)
from lerobot.processor.rename_processor import rename_stats
@@ -242,46 +242,18 @@ def record_loop(
robot: Robot,
events: dict,
fps: int,
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition], # runs after teleop
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction], # runs before robot
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition], # runs after robot
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | list[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]] | None = None,
postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction] | None = None,
control_time_s: int | None = None,
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition]
| None = None, # runs after teleop
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction]
| None = None, # runs before robot
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition]
| None = None, # runs after robot
single_task: str | None = None,
display_data: bool = False,
):
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] = (
teleop_action_processor
or RobotProcessorPipeline[RobotAction, EnvTransition](
steps=[IdentityProcessorStep()],
to_transition=robot_action_to_transition,
to_output=identity_transition,
)
)
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] = (
robot_action_processor
or RobotProcessorPipeline[EnvTransition, RobotAction](
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action,
)
)
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] = (
robot_observation_processor
or RobotProcessorPipeline[dict[str, Any], EnvTransition](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
)
)
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
@@ -315,11 +287,6 @@ def record_loop(
preprocessor.reset()
postprocessor.reset()
# Reset custom pipelines
teleop_action_processor.reset()
robot_action_processor.reset()
robot_observation_processor.reset()
policy_transition = None
teleop_transition = None
obs_transition = None
@@ -339,13 +306,14 @@ def record_loop(
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_transition = robot_observation_processor(obs)
if policy is not None or dataset is not None:
# TODO(Steven): We might be able to get rid of this
observation_frame = build_dataset_frame(
dataset.features, obs_transition[TransitionKey.OBSERVATION], prefix="observation"
)
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
if dataset is not None:
observation_frame = transition_to_dataset_frame(
obs_transition, dataset.features
) # Convert the observation to the dataset format
action_values = predict_action(
observation=observation_frame,
policy=policy,
@@ -358,7 +326,7 @@ def record_loop(
)
action_names = dataset.features["action"]["names"]
policy_action = {f"action.{name}": float(action_values[i]) for i, name in enumerate(action_names)}
policy_action = {f"{name}": float(action_values[i]) for i, name in enumerate(action_names)}
policy_transition = {
TransitionKey.ACTION: policy_action,
TransitionKey.COMPLEMENTARY_DATA: {},
@@ -397,21 +365,12 @@ def record_loop(
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
# TODO(pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
_ = robot.send_action(robot_action_to_send)
sent_action = robot.send_action(robot_action_to_send)
# Write to dataset
if dataset is not None:
# If transition_to_dataset_frame is provided, use it to merge the transitions.
merged = []
if obs_transition is not None: # The observation from the robot
merged.append(obs_transition)
if teleop_transition is not None: # The action from teleop
merged.append(teleop_transition)
if policy_transition is not None: # The action from policy
merged.append(policy_transition)
frame = transition_to_dataset_frame(
merged if len(merged) > 1 else merged[0], dataset.features
) # Convert the observation to the dataset format
action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action")
frame = {**observation_frame, **action_frame}
dataset.add_frame(frame, task=single_task)
if display_data:
@@ -435,17 +394,52 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot = make_robot_from_config(cfg.robot)
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
action_features = hw_to_dataset_features(robot.action_features, "action", cfg.dataset.video)
obs_features = hw_to_dataset_features(robot.observation_features, "observation", cfg.dataset.video)
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] = RobotProcessorPipeline[
RobotAction, EnvTransition
](
steps=[IdentityProcessorStep()],
to_transition=robot_action_to_transition,
to_output=identity_transition,
)
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] = RobotProcessorPipeline[
EnvTransition, RobotAction
](
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action,
)
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] = (
RobotProcessorPipeline[dict[str, Any], EnvTransition](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
)
)
atf = aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(action=teleop.action_features, observation=None),
use_videos=cfg.dataset.video,
)
arf = aggregate_pipeline_dataset_features(
pipeline=robot_action_processor,
initial_features=create_initial_features(action=robot.action_features, observation=None),
use_videos=cfg.dataset.video,
)
of = aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=robot.observation_features, action=None),
use_videos=cfg.dataset.video,
)
# Add next.* features that are generated during recording
transition_features = {
_transition_features = {
"next.reward": {"dtype": "float32", "shape": (1,), "names": None},
"next.done": {"dtype": "bool", "shape": (1,), "names": None},
"next.truncated": {"dtype": "bool", "shape": (1,), "names": None},
}
dataset_features = {**action_features, **obs_features, **transition_features}
dataset_features = {**combine_feature_dicts(atf, arf, of)} # , **transition_features}
if cfg.resume:
dataset = LeRobotDataset(
@@ -504,6 +498,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
policy=policy,
preprocessor=preprocessor,
@@ -524,6 +521,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
control_time_s=cfg.dataset.reset_time_s,
single_task=cfg.dataset.single_task,