From 15934d8d08f3c62ec730fb814087619e6c987995 Mon Sep 17 00:00:00 2001 From: Pepijn <138571049+pkooij@users.noreply.github.com> Date: Wed, 1 Apr 2026 12:59:12 +0200 Subject: [PATCH] feat(policies): add relative action support for pi0, pi0.5, and pi0_fast (#2970) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add option for pi family models to train with relative actions (relative to state) * formatting * add recomputation of stats and option to compute delta stats * normalzie after delta conversion * only recompute state for stats * calulate chunk based stats * sample 100k * load from parquet * sample 1m * stats per chunck * fix * use quantiles * stats for entire dataset * fix * max 1m frames * compute before dist * fix multi gpu processor bug * Fix RTC with delta actions and OpenArms motor_type wiring * feat: align pi0_fast delta actions with pi0/pi05 and add RTC integration tests - Add delta_exclude_joints and action_feature_names to PI0FastConfig - Move to_absolute_actions from modeling to processor pipeline for pi0_fast - Add delta action detection and logging to eval_with_real_robot.py - Add delta actions documentation to pi0 and pi05 READMEs - Fix ruff lint issues in test_delta_actions.py - Add test_rtc_delta_actions.py (24 tests) covering: - ActionQueue with delta vs absolute actions - RTC denoise step with delta leftovers - Full pipeline roundtrip (delta → RTC → absolute) - State rebasing approximation bounds - Non-delta policy compatibility - Multi-chunk consistency * chore: clean up test comments, add OpenPI attribution, remove debug logging - Replace decorative comment separators in test files with plain section headers - Add attribution comments for 1e-6 epsilon in normalize_processor.py (from OpenPI) - Remove debug logging blocks from lerobot_train.py * refactor: extract compute_delta_action_stats into compute_stats.py Move the ~70-line inline delta action stats block from lerobot_train.py into a dedicated function in compute_stats.py, where all other stats computation already lives. The training script now calls it in 6 lines. * refactor: remove unused get_processed_left_over from ActionQueue This method was never called outside of tests. Leftover actions for RTC guidance are always retrieved via get_left_over() (delta/original space). * revert: remove logging-only changes from eval_with_real_robot.py The delta actions detection helper and log message added no functional value — the script already handles delta policies correctly via the processor pipeline. * refactor: use ACTION/OBS_STATE constants instead of hardcoded strings Replace hardcoded "action" and "observation.state" with ACTION and OBS_STATE from utils.constants in compute_stats.py, dataset_tools.py, and lerobot_train.py. * style: remove stray blank lines in training loop * refactor: move delta action stats to preprocessing step, remove on-the-fly computation - Remove on-the-fly compute_delta_action_stats from lerobot_train.py - Rewrite recompute_stats to delegate action stats to compute_delta_action_stats (chunk-based sampling matching what the model sees during training) - Add chunk_size parameter to recompute_stats for delta action computation - Add delta actions documentation to pi0.mdx and pi05.mdx * feat: add recompute_stats CLI operation to lerobot-edit-dataset * fix(tests): relax quantile normalization test tolerance for 1e-6 epsilon * chore: remove agents_memory/pr_details.md from repo * refactor: rename delta actions to relative actions throughout What OpenPI calls "DeltaActions" is actually UMI's "relative trajectory" representation: each action in the chunk is an offset from the current state, not from the previous action. This avoids error accumulation. Renamed across all source, tests, docs, and CLI: - DeltaActionsProcessorStep → RelativeActionsProcessorStep - to_delta_actions → to_relative_actions - use_delta_actions → use_relative_actions - delta_exclude_joints → relative_exclude_joints - compute_delta_action_stats → compute_relative_action_stats - delta_action_processor.py → relative_action_processor.py - test_delta_actions.py → test_relative_actions.py Kept as-is: AbsoluteActionsProcessorStep (converts TO absolute), registry ID "delta_actions_processor" (backward compat), and unrelated delta references (IK pipeline, Robosuite, RA-BC metrics, gym envs). * docs: add Action Representations guide Dedicated page explaining absolute, relative, and delta actions with numerical examples, joint vs EE space, and how to use kinematics pipelines and the relative action processor. References UMI paper (Chi et al., 2024) for the terminology. * docs: remove redundant OpenPI naming note from action representations * docs: remove opinionated OpenPI reference from delta actions section * docs: replace ASCII diagram with UMI paper figure * docs: remove OpenPI reference from action representations * docs: use HF-hosted image instead of local asset * docs: clarify figure attribution * revert: restore original normalization epsilon behavior The 1e-6 unconditional epsilon change perturbed all normalized values, breaking backward compatibility tests. The original approach (1e-8 eps for MEAN_STD, conditional torch.where for QUANTILES) already handles division by zero correctly without affecting non-degenerate cases. * fix: restore delta_action_processor.py used by phone/RL teleop The rename commit incorrectly deleted delta_action_processor.py and duplicated its classes into relative_action_processor.py. Restore the original file and import from it instead. * fix(processor): address PR #2970 review comments - Remove shebang from relative_action_processor.py (library module, not script) - Add device alignment in to_relative_actions/to_absolute_actions so _last_state on CPU doesn't cause cross-device errors when actions are on CUDA - Rename delta_step → relative_step in AbsoluteActionsProcessorStep for naming consistency; update factory.py, all processor files, and tests - Expand _reconnect_relative_absolute_steps docstring to explain why post-hoc rewiring is needed after deserialization - Fix off-by-one in compute_stats.py: sample_upper_bound = total_frames - chunk_size + 1 so last valid start index is included and total_frames == chunk_size is not rejected - Remove redundant NOTE comment in processor_pi05.py (duplicated two lines below) - Fix pi0_fast processor ordering: move relative_step before NormalizerProcessorStep so normalizer sees delta actions (matching pi0/pi05); flip postprocessor to unnormalize → absolute accordingly. Relative stats are now required for all pi models - Revert use_relative_joint_actions_aloha → use_delta_joint_actions_aloha in configuration_smolvla.py (preserve existing public API) - Update action_representations.mdx: add missing joint to 6-DOF example, fix 'based on a figure', clarify pi family ordering, add RTC compatibility section * update rtc link * feat: compute relative action stats over full dataset with optional parallelism Remove the 100k sample cap from compute_relative_action_stats and process all valid chunks. Vectorize with numpy (pre-load actions/states, fancy indexing + broadcasting) for a large speedup over the per-index HF dataset loop. Add num_workers param for thread-based parallelism (numpy releases the GIL). Update docs to show --push_to_hub for recompute_stats. * style: apply ruff formatting to compute_stats.py * testing on real robot * style: fix ruff format and remove redundant .keys() calls --- docs/source/_toctree.yml | 2 + docs/source/action_representations.mdx | 223 +++++++ docs/source/pi0.mdx | 40 ++ docs/source/pi05.mdx | 40 ++ examples/rtc/eval_with_real_robot.py | 114 +++- src/lerobot/datasets/compute_stats.py | 143 +++++ src/lerobot/datasets/dataset_tools.py | 116 +++- src/lerobot/policies/factory.py | 66 +- src/lerobot/policies/pi0/README.md | 59 ++ src/lerobot/policies/pi0/configuration_pi0.py | 7 + src/lerobot/policies/pi0/processor_pi0.py | 12 +- src/lerobot/policies/pi05/README.md | 42 ++ .../policies/pi05/configuration_pi05.py | 7 + src/lerobot/policies/pi05/processor_pi05.py | 12 +- .../pi0_fast/configuration_pi0_fast.py | 7 + .../policies/pi0_fast/processor_pi0_fast.py | 21 +- .../policies/smolvla/configuration_smolvla.py | 2 +- src/lerobot/processor/__init__.py | 10 + .../processor/relative_action_processor.py | 208 ++++++ .../bi_openarm_follower.py | 39 +- .../config_bi_openarm_follower.py | 6 +- src/lerobot/scripts/lerobot_edit_dataset.py | 59 +- src/lerobot/scripts/lerobot_train.py | 18 +- .../scripts/lerobot_train_tokenizer.py | 79 +-- tests/policies/rtc/test_action_queue.py | 30 +- .../policies/rtc/test_rtc_relative_actions.py | 607 ++++++++++++++++++ tests/policies/test_relative_actions.py | 346 ++++++++++ 27 files changed, 2215 insertions(+), 100 deletions(-) create mode 100644 docs/source/action_representations.mdx create mode 100644 src/lerobot/processor/relative_action_processor.py create mode 100644 tests/policies/rtc/test_rtc_relative_actions.py create mode 100644 tests/policies/test_relative_actions.py diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 650a21184..cc41ddb51 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -87,6 +87,8 @@ title: Processors for Robots and Teleoperators - local: env_processor title: Environment Processors + - local: action_representations + title: Action Representations title: "Robot Processors" - sections: - local: so101 diff --git a/docs/source/action_representations.mdx b/docs/source/action_representations.mdx new file mode 100644 index 000000000..1604ed467 --- /dev/null +++ b/docs/source/action_representations.mdx @@ -0,0 +1,223 @@ +# Action Representations + +This guide explains the different ways robot actions can be represented in LeRobot, how they relate to each other, and when to use each one. + +## Joint Space vs End-Effector Space + +Before discussing action representations, it helps to understand the two coordinate spaces actions can live in. + +### Joint Space + +Joint-space actions directly specify target positions for each motor. For a 6-DOF arm with a gripper, a joint-space action might look like: + +``` +action = [shoulder_pan: 45.0, shoulder_lift: -20.0, elbow: -30.0, wrist_pitch: 10.0, wrist_roll: 0.0, wrist_yaw: 5.0, gripper: 0.8] +``` + +Joint space is the default in LeRobot. It is simple, requires no kinematics model, and maps directly to motor commands. Most beginner setups (SO-100, Koch) use joint-space actions. + +### End-Effector (EE) Space + +End-effector-space actions specify the desired position and orientation of the robot's tool tip (gripper) in Cartesian coordinates: + +``` +action = [x: 0.25, y: -0.10, z: 0.15, wx: 0.0, wy: 0.0, wz: 0.1, gripper: 0.8] +``` + +EE space is more intuitive for tasks like pick-and-place because it directly describes where the gripper should go, but it requires a kinematics model (URDF) to convert between EE poses and joint angles. + +### Converting Between Spaces + +LeRobot provides processor steps for converting between joint and EE spaces using forward and inverse kinematics. These are built on top of `RobotKinematics`, which loads a URDF model of your robot. + +```python +from lerobot.model.kinematics import RobotKinematics +from lerobot.robots.so_follower.robot_kinematic_processor import ( + ForwardKinematicsJointsToEE, + InverseKinematicsEEToJoints, +) + +kinematics = RobotKinematics( + urdf_path="./SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=["shoulder", "elbow", "wrist_pitch", "wrist_roll", "wrist_yaw"], +) + +# Joints → EE (for observations: "where is my gripper?") +fk_step = ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=[...]) + +# EE → Joints (for actions: "move my gripper here") +ik_step = InverseKinematicsEEToJoints(kinematics=kinematics, motor_names=[...]) +``` + +See [`examples/so100_to_so100_EE/`](https://github.com/huggingface/lerobot/tree/main/examples/so100_to_so100_EE) for a complete working example of recording, replaying, and evaluating with EE-space actions on an SO-100 arm. + +## Absolute, Relative, and Delta Actions + +Regardless of whether you work in joint space or EE space, the action values can be expressed in three different ways. The terminology follows [UMI (Chi et al., 2024)](https://arxiv.org/abs/2402.10329). + +### Absolute Actions (LeRobot default) + +Each action specifies the target position directly. + +**Example** (joint space, chunk of 4): + +``` +current_state = [45.0, -30.0, 10.0] + +action_chunk = [ + [46.0, -29.0, 11.0], # go to 46, -29, 11 + [47.5, -27.0, 12.0], # go to 47.5, -27, 12 + [49.0, -25.0, 13.5], # go to 49, -25, 13.5 + [50.0, -24.0, 15.0], # go to 50, -24, 15 +] +``` + +Each value is a target position in the robot's coordinate frame. Simple and direct, but requires a consistent global coordinate frame. This is the default in LeRobot. + +### Relative Actions (used by OpenPI / pi0) + +Each action in the chunk is an offset from the **current state at the moment of prediction**. All actions in the chunk share the same reference point: + +``` +current_state = [45.0, -30.0, 10.0] + +relative_chunk = [ + [1.0, 1.0, 1.0], # +1 from current → target 46, -29, 11 + [2.5, 3.0, 2.0], # +2.5 from current → target 47.5, -27, 12 + [4.0, 5.0, 3.5], # +4 from current → target 49, -25, 13.5 + [5.0, 6.0, 5.0], # +5 from current → target 50, -24, 15 +] +``` + +The conversion is straightforward: `relative = absolute - current_state`. To recover absolute: `absolute = relative + current_state`. + +**Why use relative actions?** The model learns to predict offsets centered around zero, which is easier to normalize and leads to more stable training. Because every chunk references the same current state, there is no error accumulation across chunks. + +### Delta Actions (sequential differences) + +Each action is an offset from the **previous action** (or from the current state for the first step): + +``` +current_state = [45.0, -30.0, 10.0] + +delta_chunk = [ + [1.0, 1.0, 1.0], # current → 46, -29, 11 + [1.5, 2.0, 1.0], # previous action → 47.5, -27, 12 + [1.5, 2.0, 1.5], # previous action → 49, -25, 13.5 + [1.0, 1.0, 1.5], # previous action → 50, -24, 15 +] +``` + +Here each step is relative to the one before it. To recover absolute positions you must sum all previous deltas, which means errors accumulate over time. UMI explicitly argues against this representation for this reason. + +### Visual Comparison + +The figure below (based on a figure from [UMI, Chi et al., 2024](https://arxiv.org/abs/2402.10329)) illustrates the key difference. With **relative trajectory**, every action in the chunk points back to the same origin (current state), so a new inference step cleanly resets the reference. With **delta**, each action depends on the previous one, so errors accumulate. **Absolute** actions require a consistent global coordinate frame. + +Relative Trajectory as Action Representation (UMI, Chi et al., 2024) + +## Using Relative Actions in LeRobot + +LeRobot provides `RelativeActionsProcessorStep` to convert between absolute and relative actions inside the processor pipeline. This is how pi0, pi0.5, and pi0_fast support relative actions. + +> **Note:** All pi models (pi0, pi0.5, pi0*fast) apply relative conversion \_before* normalization (`relative → normalize`), so the normalizer always sees delta (relative) values. This means **relative action stats are required** for all of them when training with `use_relative_actions=true`. In pi0_fast the `RelativeActionsProcessorStep` only modifies the action — the state observation is unchanged — so `NormalizerProcessorStep` still runs before the state tokenizer and the tokenizer continues to receive normalized state as expected. + +### How it works + +During **training** (preprocessing), actions are converted from absolute to relative before the model sees them: + +``` +raw absolute action → RelativeActionsProcessorStep → normalize → model +``` + +During **inference** (postprocessing), model predictions are converted back to absolute before being sent to the robot: + +``` +model output → unnormalize → AbsoluteActionsProcessorStep → robot +``` + +The `AbsoluteActionsProcessorStep` reads the cached current state from its paired `RelativeActionsProcessorStep`, so the two must be wired together (handled automatically by the policy factory). + +### Enabling relative actions for the pi family (pi0, pi0.5, pi0_fast) + +**Step 1**: Precompute relative action statistics for your dataset: + +```bash +lerobot-edit-dataset \ + --repo_id your_dataset \ + --operation.type recompute_stats \ + --operation.relative_action true \ + --operation.chunk_size 50 \ + --operation.relative_exclude_joints "['gripper']" +``` + +**Step 2**: Train with relative actions enabled: + +```bash +lerobot-train \ + --dataset.repo_id=your_dataset \ + --policy.type=pi0 \ + --policy.use_relative_actions=true \ + --policy.relative_exclude_joints='["gripper"]' +``` + +The `relative_exclude_joints` parameter specifies joints that should remain in absolute space. For example, gripper commands are typically binary (open/close) and don't benefit from relative encoding. + +### Combining relative actions with RTC + +[RTC](https://arxiv.org/abs/2506.07339) runs policy inference at high frequency and sends actions to the robot as they are predicted rather than waiting for a full chunk. Relative actions and RTC are fully compatible: because every chunk in relative mode references the **same** current state (captured at the start of inference), each predicted action in the chunk remains a valid offset even if the robot has already moved. No special handling is needed — `RelativeActionsProcessorStep` caches the state once per inference call and `AbsoluteActionsProcessorStep` applies it to every action in the streamed output. + +### Combining relative actions with EE space + +Relative actions work in both joint space and EE space. For example, if your dataset stores EE actions, relative encoding converts them to offsets from the current EE pose: + +``` +current_ee_state = [x: 0.25, y: -0.10, z: 0.15, gripper: 0.8] + +absolute_ee_chunk = [ + [0.26, -0.09, 0.16, 0.8], + [0.28, -0.07, 0.18, 0.8], +] + +relative_ee_chunk = [ + [0.01, 0.01, 0.01, 0.0], # offset from current EE pose + [0.03, 0.03, 0.03, 0.0], # offset from current EE pose +] +``` + +## Processing Pipeline Summary + +Here is how the different processors compose. Each arrow is a processor step, and they can be chained in a `RobotProcessorPipeline` or `PolicyProcessorPipeline`: + +``` + ┌─────────────────────────────────────────┐ + Action Space │ Joint Space ←──IK──→ EE Space │ + │ ForwardKinematicsJointsToEE │ + │ InverseKinematicsEEToJoints │ + └─────────────────────────────────────────┘ + + ┌─────────────────────────────────────────┐ + Representation │ Absolute ←────→ Relative │ + │ RelativeActionsProcessorStep (pre) │ + │ AbsoluteActionsProcessorStep (post) │ + └─────────────────────────────────────────┘ + + ┌─────────────────────────────────────────┐ + Normalization │ Raw ←────→ Normalized │ + │ NormalizerProcessorStep (pre) │ + │ UnnormalizerProcessorStep (post) │ + └─────────────────────────────────────────┘ +``` + +A typical training preprocessor might chain: `raw absolute joint actions → relative → normalize`. A typical inference postprocessor: `unnormalize → absolute → (optionally IK to joints)`. + +## References + +- [Universal Manipulation Interface (UMI)](https://arxiv.org/abs/2402.10329) - Chi et al., 2024. Defines the relative trajectory action representation and compares it with absolute and delta actions. +- [Introduction to Processors](./introduction_processors) - How processor pipelines work in LeRobot. +- [`examples/so100_to_so100_EE/`](https://github.com/huggingface/lerobot/tree/main/examples/so100_to_so100_EE) - Complete example of recording and evaluating with EE-space actions. diff --git a/docs/source/pi0.mdx b/docs/source/pi0.mdx index be7792b28..980490163 100644 --- a/docs/source/pi0.mdx +++ b/docs/source/pi0.mdx @@ -91,6 +91,46 @@ lerobot-train \ **💡 Tip**: Setting `train_expert_only=true` freezes the VLM and trains only the action expert and projections, allowing finetuning with reduced memory usage. +## Relative Actions + +By default, π₀ predicts absolute actions. You can enable **relative actions** so the model predicts offsets relative to the current robot state. This can improve training stability for certain setups. + +To use relative actions, first recompute your dataset stats in relative space via the CLI: + +```bash +lerobot-edit-dataset \ + --repo_id your_dataset \ + --operation.type recompute_stats \ + --operation.relative_action true \ + --operation.chunk_size 50 \ + --operation.relative_exclude_joints "['gripper']" \ + --push_to_hub true +``` + +Or equivalently in Python: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.dataset_tools import recompute_stats + +dataset = LeRobotDataset("your_dataset") +recompute_stats(dataset, relative_action=True, chunk_size=50, relative_exclude_joints=["gripper"]) +dataset.push_to_hub() +``` + +The `chunk_size` should match your policy's `chunk_size` (default 50 for π₀). `relative_exclude_joints` lists joint names that should remain in absolute space (e.g. gripper commands). Use `--push_to_hub true` to upload the updated stats to the Hub. + +Then train with relative actions enabled: + +```bash +lerobot-train \ + --dataset.repo_id=your_dataset \ + --policy.type=pi0 \ + --policy.use_relative_actions=true \ + --policy.relative_exclude_joints='["gripper"]' \ + ... +``` + ## License This model follows the **Apache 2.0 License**, consistent with the original [OpenPI repository](https://github.com/Physical-Intelligence/openpi). diff --git a/docs/source/pi05.mdx b/docs/source/pi05.mdx index f586f0dc1..74e056efa 100644 --- a/docs/source/pi05.mdx +++ b/docs/source/pi05.mdx @@ -97,6 +97,46 @@ python src/lerobot/datasets/v30/augment_dataset_quantile_stats.py \ Or train pi05 with this normalization mapping: `--policy.normalization_mapping='{"ACTION": "MEAN_STD", "STATE": "MEAN_STD", "VISUAL": "IDENTITY"}'` +## Relative Actions + +By default, π₀.₅ predicts absolute actions. You can enable **relative actions** so the model predicts offsets relative to the current robot state. This can improve training stability for certain setups. + +To use relative actions, first recompute your dataset stats in relative space via the CLI: + +```bash +lerobot-edit-dataset \ + --repo_id your_dataset \ + --operation.type recompute_stats \ + --operation.relative_action true \ + --operation.chunk_size 50 \ + --operation.relative_exclude_joints "['gripper']" \ + --push_to_hub true +``` + +Or equivalently in Python: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.dataset_tools import recompute_stats + +dataset = LeRobotDataset("your_dataset") +recompute_stats(dataset, relative_action=True, chunk_size=50, relative_exclude_joints=["gripper"]) +dataset.push_to_hub() +``` + +The `chunk_size` should match your policy's `chunk_size` (default 50 for π₀.₅). `relative_exclude_joints` lists joint names that should remain in absolute space (e.g. gripper commands). Use `--push_to_hub true` to upload the updated stats to the Hub. + +Then train with relative actions enabled: + +```bash +lerobot-train \ + --dataset.repo_id=your_dataset \ + --policy.type=pi05 \ + --policy.use_relative_actions=true \ + --policy.relative_exclude_joints='["gripper"]' \ + ... +``` + ## Performance Results ### Libero Benchmark Results diff --git a/examples/rtc/eval_with_real_robot.py b/examples/rtc/eval_with_real_robot.py index 36da88e1b..65cd33224 100644 --- a/examples/rtc/eval_with_real_robot.py +++ b/examples/rtc/eval_with_real_robot.py @@ -63,6 +63,26 @@ Usage: --robot.cameras="{ gripper: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}, front: {type: opencv, index_or_path: 1, width: 640, height: 480, fps: 30}}" \ --task="Move green small object into the purple platform" \ --duration=120 + + # Run RTC with bi_openarm_follower (dual-arm OpenArms) and pi0.5 policy + python examples/rtc/eval_with_real_robot.py \ + --policy.path=lerobot-data-collection/folding_final \ + --robot.type=bi_openarm_follower \ + --robot.cameras='{left_wrist: {type: opencv, index_or_path: "/dev/video4", width: 1280, height: 720, fps: 30}, base: {type: opencv, index_or_path: "/dev/video2", width: 640, height: 480, fps: 30}, right_wrist: {type: opencv, index_or_path: "/dev/video0", width: 1280, height: 720, fps: 30}}' \ + --robot.left_arm_config.port=can1 \ + --robot.left_arm_config.side=left \ + --robot.left_arm_config.can_interface=socketcan \ + --robot.right_arm_config.port=can0 \ + --robot.right_arm_config.side=right \ + --robot.right_arm_config.can_interface=socketcan \ + --task="Fold the T-shirt properly" \ + --fps=30 \ + --duration=2000 \ + --rtc.enabled=true \ + --rtc.execution_horizon=20 \ + --rtc.max_guidance_weight=5.0 \ + --rtc.prefix_attention_schedule=LINEAR \ + --device=cuda """ import logging @@ -87,21 +107,29 @@ from lerobot.policies.factory import get_policy_class, make_pre_post_processors from lerobot.policies.rtc.action_queue import ActionQueue from lerobot.policies.rtc.configuration_rtc import RTCConfig from lerobot.policies.rtc.latency_tracker import LatencyTracker +from lerobot.processor import ( + NormalizerProcessorStep, + RelativeActionsProcessorStep, + TransitionKey, + create_transition, +) from lerobot.processor.factory import ( make_default_robot_action_processor, make_default_robot_observation_processor, ) +from lerobot.processor.relative_action_processor import to_relative_actions from lerobot.rl.process import ProcessSignalHandler from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, + bi_openarm_follower, bi_so_follower, koch_follower, so_follower, unitree_g1, ) from lerobot.robots.utils import make_robot_from_config -from lerobot.utils.constants import OBS_IMAGES +from lerobot.utils.constants import OBS_IMAGES, OBS_STATE from lerobot.utils.hub import HubMixin from lerobot.utils.utils import init_logging @@ -212,6 +240,35 @@ def is_image_key(k: str) -> bool: return k.startswith(OBS_IMAGES) +def _reanchor_relative_rtc_prefix( + prev_actions_absolute: Tensor, + current_state: Tensor, + relative_step: RelativeActionsProcessorStep, + normalizer_step: NormalizerProcessorStep | None, + policy_device: torch.device | str, +) -> Tensor: + """Convert absolute leftovers into model-space for relative-action RTC policies. + + When a policy uses relative actions, the RTC prefix (leftover actions from + the previous chunk) is stored in absolute space. Before feeding it back to + the policy we need to re-express it relative to the *current* robot state + and then re-normalize. + """ + state = current_state.detach().cpu() + if state.dim() == 1: + state = state.unsqueeze(0) + + action_cpu = prev_actions_absolute.detach().cpu() + mask = relative_step._build_mask(action_cpu.shape[-1]) + relative_actions = to_relative_actions(action_cpu, state, mask) + + transition = create_transition(action=relative_actions) + if normalizer_step is not None: + transition = normalizer_step(transition) + + return transition[TransitionKey.ACTION].to(policy_device) + + def get_actions( policy, robot: RobotWrapper, @@ -237,7 +294,15 @@ def get_actions( fps = cfg.fps time_per_chunk = 1.0 / fps - dataset_features = hw_to_dataset_features(robot.observation_features(), "observation") + # Only keep .pos joints + camera streams if the policy was trained on positions, + # not the full pos/vel/torque state the robot exposes. + observation_features_hw = { + key: value + for key, value in robot.observation_features().items() + if key.endswith(".pos") or isinstance(value, tuple) + } + + dataset_features = hw_to_dataset_features(observation_features_hw, "observation") policy_device = policy.config.device # Load preprocessor and postprocessor from pretrained files @@ -255,6 +320,25 @@ def get_actions( logger.info("[GET_ACTIONS] Preprocessor/postprocessor loaded successfully with embedded stats") + relative_step = next( + (s for s in preprocessor.steps if isinstance(s, RelativeActionsProcessorStep) and s.enabled), + None, + ) + normalizer_step = next( + (s for s in preprocessor.steps if isinstance(s, NormalizerProcessorStep)), + None, + ) + if relative_step is not None: + if relative_step.action_names is None: + cfg_names = getattr(cfg.policy, "action_feature_names", None) + if cfg_names: + relative_step.action_names = list(cfg_names) + else: + relative_step.action_names = [ + k for k in robot.robot.action_features if k.endswith(".pos") + ] + logger.info("[GET_ACTIONS] Relative actions enabled: will re-anchor RTC prefix") + get_actions_threshold = cfg.action_queue_size_to_get_new_actions if not cfg.rtc.enabled: @@ -297,6 +381,28 @@ def get_actions( preproceseded_obs = preprocessor(obs_with_policy_features) + # Re-anchor leftover actions for relative-action policies. + # We need the *postprocessed* (absolute) leftover, not the original + # (normalized/relative) one that get_left_over() returns. + if ( + prev_actions is not None + and relative_step is not None + and OBS_STATE in obs_with_policy_features + ): + with action_queue.lock: + if action_queue.queue is not None: + prev_actions_abs = action_queue.queue[action_queue.last_index :].clone() + else: + prev_actions_abs = None + if prev_actions_abs is not None and prev_actions_abs.numel() > 0: + prev_actions = _reanchor_relative_rtc_prefix( + prev_actions_absolute=prev_actions_abs, + current_state=obs_with_policy_features[OBS_STATE], + relative_step=relative_step, + normalizer_step=normalizer_step, + policy_device=policy_device, + ) + # Generate actions WITH RTC actions = policy.predict_action_chunk( preproceseded_obs, @@ -352,6 +458,8 @@ def actor_control( try: logger.info("[ACTOR] Starting actor thread") + action_keys = [k for k in robot.action_features() if k.endswith(".pos")] + action_count = 0 action_interval = 1.0 / cfg.fps @@ -363,7 +471,7 @@ def actor_control( if action is not None: action = action.cpu() - action_dict = {key: action[i].item() for i, key in enumerate(robot.action_features())} + action_dict = {key: action[i].item() for i, key in enumerate(action_keys)} action_processed = robot_action_processor((action_dict, None)) robot.send_action(action_processed) diff --git a/src/lerobot/datasets/compute_stats.py b/src/lerobot/datasets/compute_stats.py index 5bd95810b..03eefe40e 100644 --- a/src/lerobot/datasets/compute_stats.py +++ b/src/lerobot/datasets/compute_stats.py @@ -13,9 +13,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + +import logging + import numpy as np from lerobot.datasets.io_utils import load_image_as_numpy +from lerobot.utils.constants import ACTION, OBS_STATE DEFAULT_QUANTILES = [0.01, 0.10, 0.50, 0.90, 0.99] @@ -624,3 +629,141 @@ def aggregate_stats(stats_list: list[dict[str, dict]]) -> dict[str, dict[str, np aggregated_stats[key] = aggregate_feature_stats(stats_with_key) return aggregated_stats + + +def _get_valid_chunk_starts(episode_indices: np.ndarray, chunk_size: int) -> np.ndarray: + """Return all start indices where a chunk of ``chunk_size`` stays within one episode.""" + total = len(episode_indices) + if total < chunk_size: + return np.array([], dtype=np.int64) + max_start = total - chunk_size + starts = np.arange(max_start + 1) + valid = episode_indices[starts] == episode_indices[starts + chunk_size - 1] + return starts[valid] + + +def _compute_relative_chunk_batch( + start_indices: np.ndarray, + all_actions: np.ndarray, + all_states: np.ndarray, + chunk_size: int, + relative_mask: np.ndarray, +) -> np.ndarray: + """Vectorised relative-action computation for a batch of start indices. + + Returns an ``(N * chunk_size, action_dim)`` float32 array. + """ + if len(start_indices) == 0: + return np.empty((0, all_actions.shape[1]), dtype=np.float32) + offsets = np.arange(chunk_size) + frame_idx = start_indices[:, None] + offsets[None, :] + chunks = all_actions[frame_idx].copy() + states = all_states[start_indices] + mask_dim = len(relative_mask) + chunks[:, :, :mask_dim] -= states[:, None, :mask_dim] * relative_mask[None, None, :] + return chunks.reshape(-1, all_actions.shape[1]) + + +def compute_relative_action_stats( + hf_dataset, + features: dict, + chunk_size: int, + exclude_joints: list[str] | None = None, + num_workers: int = 0, +) -> dict[str, np.ndarray]: + """Compute normalization statistics for relative actions over the full dataset. + + Iterates *all* valid action chunks (within single episodes), converts them to + relative actions (action − current_state), and computes per-dimension + statistics suitable for normalization. + + Args: + hf_dataset: The underlying HuggingFace dataset with "action", + "observation.state", and "episode_index" columns. + features: Dataset feature metadata (must contain "action" with "shape" + and optionally "names"). + chunk_size: Number of consecutive frames per action chunk. + exclude_joints: Joint names whose dimensions should remain absolute + (not converted to relative actions). + num_workers: Number of parallel threads for computation. Values ≤1 + mean single-threaded. Numpy releases the GIL so threads give + real parallelism here. + + Returns: + Statistics dict with keys "mean", "std", "min", "max", "q01", …, "q99". + + Raises: + ValueError: If the dataset has fewer frames than ``chunk_size``. + RuntimeError: If no valid (single-episode) chunks are found. + """ + from lerobot.processor.relative_action_processor import RelativeActionsProcessorStep + + if exclude_joints is None: + exclude_joints = [] + + action_dim = features[ACTION]["shape"][0] + action_names = features.get(ACTION, {}).get("names") + mask_step = RelativeActionsProcessorStep( + enabled=True, + exclude_joints=exclude_joints, + action_names=action_names, + ) + relative_mask = np.array(mask_step._build_mask(action_dim), dtype=np.float32) + + logging.info("Loading action/state data for relative action stats...") + all_actions = np.array(hf_dataset[ACTION], dtype=np.float32) + all_states = np.array(hf_dataset[OBS_STATE], dtype=np.float32) + episode_indices = np.array(hf_dataset["episode_index"]) + + valid_starts = _get_valid_chunk_starts(episode_indices, chunk_size) + if len(valid_starts) == 0: + raise RuntimeError( + f"No valid chunks found (total_frames={len(episode_indices)}, chunk_size={chunk_size})" + ) + + effective_workers = max(num_workers, 1) + logging.info( + f"Computing relative action stats from {len(valid_starts)} chunks " + f"(chunk_size={chunk_size}, workers={effective_workers})" + ) + + batch_size = 50_000 + batches = [valid_starts[i : i + batch_size] for i in range(0, len(valid_starts), batch_size)] + + running_stats = RunningQuantileStats() + + if num_workers > 1: + from concurrent.futures import ThreadPoolExecutor, as_completed + + with ThreadPoolExecutor(max_workers=num_workers) as pool: + futures = [ + pool.submit( + _compute_relative_chunk_batch, + batch, + all_actions, + all_states, + chunk_size, + relative_mask, + ) + for batch in batches + ] + for future in as_completed(futures): + running_stats.update(future.result()) + else: + for batch in batches: + running_stats.update( + _compute_relative_chunk_batch(batch, all_actions, all_states, chunk_size, relative_mask) + ) + + stats = running_stats.get_statistics() + + excluded_dims = int(len(relative_mask) - relative_mask.sum()) + total_frames = len(valid_starts) * chunk_size + logging.info( + f"Relative action stats ({len(valid_starts)} chunks, {total_frames} frames): " + f"relative_dims={int(relative_mask.sum())}/{len(relative_mask)} (excluded={excluded_dims}), " + f"mean={np.abs(stats['mean']).mean():.4f}, std={stats['std'].mean():.4f}, " + f"q01={stats['q01'].mean():.4f}, q99={stats['q99'].mean():.4f}" + ) + + return stats diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index cd2b9fc7c..16bf24822 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -37,7 +37,11 @@ import torch from tqdm import tqdm from lerobot.datasets.aggregate import aggregate_datasets -from lerobot.datasets.compute_stats import aggregate_stats +from lerobot.datasets.compute_stats import ( + aggregate_stats, + compute_episode_stats, + compute_relative_action_stats, +) from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata from lerobot.datasets.io_utils import ( get_parquet_file_size_in_mb, @@ -56,7 +60,7 @@ from lerobot.datasets.utils import ( update_chunk_file_indices, ) from lerobot.datasets.video_utils import encode_video_frames, get_video_info -from lerobot.utils.constants import HF_LEROBOT_HOME, OBS_IMAGE +from lerobot.utils.constants import ACTION, HF_LEROBOT_HOME, OBS_IMAGE, OBS_STATE def _load_episode_with_stats(src_dataset: LeRobotDataset, episode_idx: int) -> dict: @@ -1533,6 +1537,114 @@ def modify_tasks( return dataset +def recompute_stats( + dataset: LeRobotDataset, + skip_image_video: bool = True, + relative_action: bool = False, + relative_exclude_joints: list[str] | None = None, + chunk_size: int = 50, + num_workers: int = 0, +) -> LeRobotDataset: + """Recompute stats.json from scratch by iterating all episodes. + + Args: + dataset: The LeRobotDataset to recompute stats for. + skip_image_video: If True (default), only recompute stats for numeric features + (action, state, etc.) and keep existing image/video stats unchanged. + relative_action: If True, compute action stats in relative space by + iterating all valid action chunks and subtracting the current state. + This matches the normalization distribution the model sees during + training with ``use_relative_actions=True``. + relative_exclude_joints: Joint names to exclude from relative conversion when + relative_action=True. These dims keep absolute stats. + chunk_size: Action chunk size used for relative stats computation. Should match + ``policy.chunk_size``. Only used when ``relative_action=True``. + num_workers: Number of parallel threads for relative action stats computation. + Values ≤1 mean single-threaded. Only used when ``relative_action=True``. + + Returns: + The same dataset with updated stats. + """ + features = dataset.meta.features + meta_keys = {"index", "episode_index", "task_index", "frame_index", "timestamp"} + numeric_features = { + k: v + for k, v in features.items() + if v["dtype"] not in ["image", "video", "string"] and k not in meta_keys + } + + if skip_image_video: + features_to_compute = numeric_features + else: + features_to_compute = { + k: v for k, v in features.items() if v["dtype"] != "string" and k not in meta_keys + } + + # When relative_action is enabled, compute action stats via chunk-based sampling + # (matching what the model sees during training) and skip action in the + # per-episode pass below. + relative_action_stats = None + if relative_action and ACTION in features and OBS_STATE in features: + if relative_exclude_joints is None: + relative_exclude_joints = ["gripper"] + relative_action_stats = compute_relative_action_stats( + hf_dataset=dataset.hf_dataset, + features=features, + chunk_size=chunk_size, + exclude_joints=relative_exclude_joints, + num_workers=num_workers, + ) + features_to_compute.pop(ACTION, None) + + logging.info(f"Recomputing stats for features: {list(features_to_compute.keys())}") + + data_dir = dataset.root / DATA_DIR + parquet_files = sorted(data_dir.glob("*/*.parquet")) + if not parquet_files: + raise ValueError(f"No parquet files found in {data_dir}") + + all_episode_stats = [] + numeric_keys = [k for k, v in features_to_compute.items() if v["dtype"] not in ["image", "video"]] + + for parquet_path in tqdm(parquet_files, desc="Computing stats from data files"): + df = pd.read_parquet(parquet_path) + + for ep_idx in sorted(df["episode_index"].unique()): + ep_df = df[df["episode_index"] == ep_idx] + episode_data = {} + for key in numeric_keys: + if key in ep_df.columns: + values = ep_df[key].values + if hasattr(values[0], "__len__"): + episode_data[key] = np.stack(values) + else: + episode_data[key] = np.array(values) + + ep_stats = compute_episode_stats(episode_data, features_to_compute) + all_episode_stats.append(ep_stats) + + if features_to_compute and not all_episode_stats: + logging.warning("No episode stats computed") + return dataset + + new_stats = aggregate_stats(all_episode_stats) if all_episode_stats else {} + + if relative_action_stats is not None: + new_stats[ACTION] = relative_action_stats + + # Merge: keep existing stats for features we didn't recompute + if dataset.meta.stats: + for key, value in dataset.meta.stats.items(): + if key not in new_stats: + new_stats[key] = value + + write_stats(new_stats, dataset.root) + dataset.meta.stats = new_stats + + logging.info("Stats recomputed successfully") + return dataset + + def convert_image_to_video_dataset( dataset: LeRobotDataset, output_dir: Path | None = None, diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index 146924502..501dd7af1 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -59,6 +59,29 @@ from lerobot.utils.constants import ( ) +def _reconnect_relative_absolute_steps( + preprocessor: PolicyProcessorPipeline, postprocessor: PolicyProcessorPipeline +) -> None: + """Wire AbsoluteActionsProcessorStep.relative_step to the RelativeActionsProcessorStep after deserialization. + + After a policy is loaded from disk, the preprocessor and postprocessor are reconstructed + independently from their configs. AbsoluteActionsProcessorStep needs a live reference to + the RelativeActionsProcessorStep so it can read the cached state at inference time. + That reference is not serializable, so we re-establish it here after loading. + """ + from lerobot.processor.relative_action_processor import ( + AbsoluteActionsProcessorStep, + RelativeActionsProcessorStep, + ) + + relative_step = next((s for s in preprocessor.steps if isinstance(s, RelativeActionsProcessorStep)), None) + if relative_step is None: + return + for step in postprocessor.steps: + if isinstance(step, AbsoluteActionsProcessorStep) and step.relative_step is None: + step.relative_step = relative_step + + def get_policy_class(name: str) -> type[PreTrainedPolicy]: """ Retrieves a policy class by its registered name. @@ -269,26 +292,26 @@ def make_pre_post_processors( kwargs["preprocessor_overrides"] = preprocessor_overrides kwargs["postprocessor_overrides"] = postprocessor_overrides - return ( - PolicyProcessorPipeline.from_pretrained( - pretrained_model_name_or_path=pretrained_path, - config_filename=kwargs.get( - "preprocessor_config_filename", f"{POLICY_PREPROCESSOR_DEFAULT_NAME}.json" - ), - overrides=kwargs.get("preprocessor_overrides", {}), - to_transition=batch_to_transition, - to_output=transition_to_batch, - ), - PolicyProcessorPipeline.from_pretrained( - pretrained_model_name_or_path=pretrained_path, - config_filename=kwargs.get( - "postprocessor_config_filename", f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json" - ), - overrides=kwargs.get("postprocessor_overrides", {}), - to_transition=policy_action_to_transition, - to_output=transition_to_policy_action, + preprocessor = PolicyProcessorPipeline.from_pretrained( + pretrained_model_name_or_path=pretrained_path, + config_filename=kwargs.get( + "preprocessor_config_filename", f"{POLICY_PREPROCESSOR_DEFAULT_NAME}.json" ), + overrides=kwargs.get("preprocessor_overrides", {}), + to_transition=batch_to_transition, + to_output=transition_to_batch, ) + postprocessor = PolicyProcessorPipeline.from_pretrained( + pretrained_model_name_or_path=pretrained_path, + config_filename=kwargs.get( + "postprocessor_config_filename", f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json" + ), + overrides=kwargs.get("postprocessor_overrides", {}), + to_transition=policy_action_to_transition, + to_output=transition_to_policy_action, + ) + _reconnect_relative_absolute_steps(preprocessor, postprocessor) + return preprocessor, postprocessor # Create a new processor based on policy type if isinstance(policy_cfg, TDMPCConfig): @@ -486,6 +509,13 @@ def make_policy( cfg.output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION} if not cfg.input_features: cfg.input_features = {key: ft for key, ft in features.items() if key not in cfg.output_features} + + # Store action feature names for relative_exclude_joints support + if ds_meta is not None and hasattr(cfg, "action_feature_names"): + action_names = ds_meta.features.get(ACTION, {}).get("names") + if action_names is not None: + cfg.action_feature_names = list(action_names) + kwargs["config"] = cfg # Pass dataset_stats to the policy if available (needed for some policies like SARM) diff --git a/src/lerobot/policies/pi0/README.md b/src/lerobot/policies/pi0/README.md index 65b331e51..312c3cb3d 100644 --- a/src/lerobot/policies/pi0/README.md +++ b/src/lerobot/policies/pi0/README.md @@ -17,6 +17,65 @@ It is designed as a **Vision-Language-Action model for general robot control**. --- +## Relative Actions + +π₀ supports training with **relative actions**, where the model learns relative offsets +from the current robot state instead of absolute joint positions. This mirrors the +relative-action transform in OpenPI (`DeltaActions`) and can improve performance. + +### How it works + +1. **During preprocessing**, absolute actions are converted to relative offsets: + `relative = action - state` (for selected joints). +2. The relative actions are normalized using statistics computed from the relative distribution. +3. **During postprocessing**, predicted relative actions are converted back to absolute: + `absolute = relative + state`. + +Joints listed in `relative_exclude_joints` (e.g., gripper) are kept absolute. + +### Configuration + +| Parameter | Type | Default | Description | +| ------------------------- | ----------- | ------------- | ---------------------------------------------------------------- | +| `use_relative_actions` | `bool` | `False` | Enable relative-action training | +| `relative_exclude_joints` | `list[str]` | `["gripper"]` | Joint names to keep absolute (matched by substring) | +| `action_feature_names` | `list[str]` | `None` | Auto-populated from dataset metadata at runtime by `make_policy` | + +### Training example + +```bash +python -m lerobot.scripts.lerobot_train \ + --policy.type=pi0 \ + --dataset.repo_id=your_org/your_dataset \ + --policy.use_relative_actions=true \ + --policy.relative_exclude_joints='["gripper"]' +``` + +When `use_relative_actions=true`, the training script automatically: + +- Computes relative action statistics from the dataset (sampled chunk-level relative actions) +- Replaces the standard action stats with relative stats for normalization +- Broadcasts these stats across all ranks in distributed training + +### Recomputing stats for an existing dataset + +If you want to precompute relative action stats offline, use `recompute_stats` from +`lerobot.datasets.dataset_tools`: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.dataset_tools import recompute_stats + +dataset = LeRobotDataset("your_org/your_dataset") +dataset = recompute_stats( + dataset, + relative_action=True, + relative_exclude_joints=["gripper"], +) +``` + +--- + ## Citation If you use this work, please cite both **OpenPI** and the π₀ paper: diff --git a/src/lerobot/policies/pi0/configuration_pi0.py b/src/lerobot/policies/pi0/configuration_pi0.py index be9b4530f..cf4b636a3 100644 --- a/src/lerobot/policies/pi0/configuration_pi0.py +++ b/src/lerobot/policies/pi0/configuration_pi0.py @@ -50,6 +50,13 @@ class PI0Config(PreTrainedConfig): min_period: float = 4e-3 max_period: float = 4.0 + # Relative actions: converts absolute actions to relative (relative to state). + use_relative_actions: bool = False + # Joint names to exclude from relative (kept absolute). Empty list = all dims relative. + relative_exclude_joints: list[str] = field(default_factory=lambda: ["gripper"]) + # Populated at runtime from dataset metadata by make_policy. + action_feature_names: list[str] | None = None + # Real-Time Chunking (RTC) configuration rtc_config: RTCConfig | None = None diff --git a/src/lerobot/policies/pi0/processor_pi0.py b/src/lerobot/policies/pi0/processor_pi0.py index 50f5dec83..0302876a1 100644 --- a/src/lerobot/policies/pi0/processor_pi0.py +++ b/src/lerobot/policies/pi0/processor_pi0.py @@ -21,6 +21,7 @@ import torch from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( + AbsoluteActionsProcessorStep, AddBatchDimensionProcessorStep, ComplementaryDataProcessorStep, DeviceProcessorStep, @@ -29,6 +30,7 @@ from lerobot.processor import ( PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, + RelativeActionsProcessorStep, RenameObservationsProcessorStep, TokenizerProcessorStep, UnnormalizerProcessorStep, @@ -126,7 +128,13 @@ def make_pi0_pre_post_processors( A tuple containing the configured pre-processor and post-processor pipelines. """ - # Add remaining processors + relative_step = RelativeActionsProcessorStep( + enabled=config.use_relative_actions, + exclude_joints=getattr(config, "relative_exclude_joints", []), + action_names=getattr(config, "action_feature_names", None), + ) + + # OpenPI order: raw → relative → normalize → model → unnormalize → absolute input_steps: list[ProcessorStep] = [ RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), @@ -138,6 +146,7 @@ def make_pi0_pre_post_processors( padding="max_length", ), DeviceProcessorStep(device=config.device), + relative_step, NormalizerProcessorStep( features={**config.input_features, **config.output_features}, norm_map=config.normalization_mapping, @@ -149,6 +158,7 @@ def make_pi0_pre_post_processors( UnnormalizerProcessorStep( features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats ), + AbsoluteActionsProcessorStep(enabled=config.use_relative_actions, relative_step=relative_step), DeviceProcessorStep(device="cpu"), ] diff --git a/src/lerobot/policies/pi05/README.md b/src/lerobot/policies/pi05/README.md index 2ae69d978..9abec99fa 100644 --- a/src/lerobot/policies/pi05/README.md +++ b/src/lerobot/policies/pi05/README.md @@ -17,6 +17,48 @@ It is designed as a **Vision-Language-Action model with open-world generalizatio --- +## Relative Actions + +π₀.₅ supports training with **relative actions**, where the model learns relative offsets +from the current robot state instead of absolute joint positions. This mirrors the +relative-action transform in OpenPI (`DeltaActions`) and can improve performance. + +### How it works + +1. **During preprocessing**, absolute actions are converted to relative offsets: + `relative = action - state` (for selected joints). +2. The relative actions are normalized using statistics computed from the relative distribution. +3. **During postprocessing**, predicted relative actions are converted back to absolute: + `absolute = relative + state`. + +Joints listed in `relative_exclude_joints` (e.g., gripper) are kept absolute. + +### Configuration + +| Parameter | Type | Default | Description | +| ------------------------- | ----------- | ------------- | ---------------------------------------------------------------- | +| `use_relative_actions` | `bool` | `False` | Enable relative-action training | +| `relative_exclude_joints` | `list[str]` | `["gripper"]` | Joint names to keep absolute (matched by substring) | +| `action_feature_names` | `list[str]` | `None` | Auto-populated from dataset metadata at runtime by `make_policy` | + +### Training example + +```bash +python -m lerobot.scripts.lerobot_train \ + --policy.type=pi05 \ + --dataset.repo_id=your_org/your_dataset \ + --policy.use_relative_actions=true \ + --policy.relative_exclude_joints='["gripper"]' +``` + +When `use_relative_actions=true`, the training script automatically: + +- Computes relative action statistics from the dataset (sampled chunk-level relative actions) +- Replaces the standard action stats with relative stats for normalization +- Broadcasts these stats across all ranks in distributed training + +--- + ## Citation If you use this work, please cite both **OpenPI** and the π₀.₅ paper: diff --git a/src/lerobot/policies/pi05/configuration_pi05.py b/src/lerobot/policies/pi05/configuration_pi05.py index b96e6d196..6760be0a2 100644 --- a/src/lerobot/policies/pi05/configuration_pi05.py +++ b/src/lerobot/policies/pi05/configuration_pi05.py @@ -50,6 +50,13 @@ class PI05Config(PreTrainedConfig): min_period: float = 4e-3 max_period: float = 4.0 + # Relative actions: converts absolute actions to relative (relative to state). + use_relative_actions: bool = False + # Joint names to exclude from relative (kept absolute). Empty list = all dims relative. + relative_exclude_joints: list[str] = field(default_factory=lambda: ["gripper"]) + # Populated at runtime from dataset metadata by make_policy. + action_feature_names: list[str] | None = None + # Real-Time Chunking (RTC) configuration rtc_config: RTCConfig | None = None diff --git a/src/lerobot/policies/pi05/processor_pi05.py b/src/lerobot/policies/pi05/processor_pi05.py index 425a85577..cb616af87 100644 --- a/src/lerobot/policies/pi05/processor_pi05.py +++ b/src/lerobot/policies/pi05/processor_pi05.py @@ -24,6 +24,7 @@ import torch from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.policies.pi05.configuration_pi05 import PI05Config from lerobot.processor import ( + AbsoluteActionsProcessorStep, AddBatchDimensionProcessorStep, DeviceProcessorStep, NormalizerProcessorStep, @@ -31,6 +32,7 @@ from lerobot.processor import ( PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, + RelativeActionsProcessorStep, RenameObservationsProcessorStep, TokenizerProcessorStep, UnnormalizerProcessorStep, @@ -125,10 +127,17 @@ def make_pi05_pre_post_processors( A tuple containing the configured pre-processor and post-processor pipelines. """ - # Add remaining processors + relative_step = RelativeActionsProcessorStep( + enabled=config.use_relative_actions, + exclude_joints=getattr(config, "relative_exclude_joints", []), + action_names=getattr(config, "action_feature_names", None), + ) + + # OpenPI order: raw → relative → normalize → model → unnormalize → absolute input_steps: list[ProcessorStep] = [ RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), + relative_step, # NOTE: NormalizerProcessorStep MUST come before Pi05PrepareStateTokenizerProcessorStep # because the tokenizer step expects normalized state in [-1, 1] range for discretization NormalizerProcessorStep( @@ -150,6 +159,7 @@ def make_pi05_pre_post_processors( UnnormalizerProcessorStep( features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats ), + AbsoluteActionsProcessorStep(enabled=config.use_relative_actions, relative_step=relative_step), DeviceProcessorStep(device="cpu"), ] diff --git a/src/lerobot/policies/pi0_fast/configuration_pi0_fast.py b/src/lerobot/policies/pi0_fast/configuration_pi0_fast.py index e12522833..6a645fae1 100644 --- a/src/lerobot/policies/pi0_fast/configuration_pi0_fast.py +++ b/src/lerobot/policies/pi0_fast/configuration_pi0_fast.py @@ -41,6 +41,13 @@ class PI0FastConfig(PreTrainedConfig): max_action_dim: int = 32 max_action_tokens: int = 256 + # Relative actions: converts absolute actions to relative (relative to state). + use_relative_actions: bool = False + # Joint names to exclude from relative (kept absolute). Empty list = all dims relative. + relative_exclude_joints: list[str] = field(default_factory=lambda: ["gripper"]) + # Populated at runtime from dataset metadata by make_policy. + action_feature_names: list[str] | None = None + # Real-Time Chunking (RTC) configuration rtc_config: RTCConfig | None = None diff --git a/src/lerobot/policies/pi0_fast/processor_pi0_fast.py b/src/lerobot/policies/pi0_fast/processor_pi0_fast.py index 46e54432a..c4a510615 100644 --- a/src/lerobot/policies/pi0_fast/processor_pi0_fast.py +++ b/src/lerobot/policies/pi0_fast/processor_pi0_fast.py @@ -24,6 +24,7 @@ import torch from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.policies.pi0_fast.configuration_pi0_fast import PI0FastConfig from lerobot.processor import ( + AbsoluteActionsProcessorStep, ActionTokenizerProcessorStep, AddBatchDimensionProcessorStep, DeviceProcessorStep, @@ -32,6 +33,7 @@ from lerobot.processor import ( PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, + RelativeActionsProcessorStep, RenameObservationsProcessorStep, TokenizerProcessorStep, UnnormalizerProcessorStep, @@ -125,12 +127,24 @@ def make_pi0_fast_pre_post_processors( Returns: A tuple containing the configured pre-processor and post-processor pipelines. """ - # Add remaining processors + relative_step = RelativeActionsProcessorStep( + enabled=config.use_relative_actions, + exclude_joints=getattr(config, "relative_exclude_joints", []), + action_names=getattr(config, "action_feature_names", None), + ) + + # Pi0Fast order: relative → normalize → tokenize → model → unnormalize → absolute + # This matches pi0/pi0.5: RelativeActionsProcessorStep runs first on raw absolute actions, + # caching the raw state. NormalizerProcessorStep then normalizes the raw relative actions, + # so the normalizer (and action tokenizer) sees delta values — relative stats are required. + # NOTE: RelativeActionsProcessorStep only modifies the action in the transition; it reads + # state from the observation but does not change it. NormalizerProcessorStep still runs + # before Pi0FastPrepareStateAndLanguageTokenizerProcessorStep, so the state tokenizer + # continues to receive normalized state in [-1, 1] as expected. input_steps: list[ProcessorStep] = [ RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one AddBatchDimensionProcessorStep(), - # NOTE: NormalizerProcessorStep MUST come before Pi0FastPrepareStateAndLanguageTokenizerProcessorStep - # because the tokenizer step expects normalized state in [-1, 1] range for discretization + relative_step, NormalizerProcessorStep( features={**config.input_features, **config.output_features}, norm_map=config.normalization_mapping, @@ -156,6 +170,7 @@ def make_pi0_fast_pre_post_processors( UnnormalizerProcessorStep( features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats ), + AbsoluteActionsProcessorStep(enabled=config.use_relative_actions, relative_step=relative_step), DeviceProcessorStep(device="cpu"), ] diff --git a/src/lerobot/policies/smolvla/configuration_smolvla.py b/src/lerobot/policies/smolvla/configuration_smolvla.py index b861b856b..5007abbb4 100644 --- a/src/lerobot/policies/smolvla/configuration_smolvla.py +++ b/src/lerobot/policies/smolvla/configuration_smolvla.py @@ -55,7 +55,7 @@ class SmolVLAConfig(PreTrainedConfig): # the space used by the pi internal runtime which was used to train the base model. adapt_to_pi_aloha: bool = False - # Converts joint dimensions to deltas with respect to the current state before passing to the model. + # Converts joint dimensions to relative values with respect to the current state before passing to the model. # Gripper dimensions will remain in absolute values. use_delta_joint_actions_aloha: bool = False diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 12dcf0c6d..122b3533c 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -75,6 +75,12 @@ from .policy_robot_bridge import ( PolicyActionToRobotActionProcessorStep, RobotActionToPolicyActionProcessorStep, ) +from .relative_action_processor import ( + AbsoluteActionsProcessorStep, + RelativeActionsProcessorStep, + to_absolute_actions, + to_relative_actions, +) from .rename_processor import RenameObservationsProcessorStep from .tokenizer_processor import ActionTokenizerProcessorStep, TokenizerProcessorStep @@ -100,6 +106,8 @@ __all__ = [ "make_default_teleop_action_processor", "make_default_robot_action_processor", "make_default_robot_observation_processor", + "AbsoluteActionsProcessorStep", + "RelativeActionsProcessorStep", "MapDeltaActionToRobotActionStep", "MapTensorToDeltaActionDictStep", "NormalizerProcessorStep", @@ -129,6 +137,8 @@ __all__ = [ "transition_to_batch", "TransitionKey", "TruncatedProcessorStep", + "to_absolute_actions", + "to_relative_actions", "UnnormalizerProcessorStep", "VanillaObservationProcessorStep", ] diff --git a/src/lerobot/processor/relative_action_processor.py b/src/lerobot/processor/relative_action_processor.py new file mode 100644 index 000000000..e00d26e98 --- /dev/null +++ b/src/lerobot/processor/relative_action_processor.py @@ -0,0 +1,208 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Sequence +from dataclasses import dataclass, field +from typing import Any + +import torch +from torch import Tensor + +from lerobot.configs.types import PipelineFeatureType, PolicyFeature +from lerobot.types import EnvTransition, TransitionKey +from lerobot.utils.constants import OBS_STATE + +from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep +from .pipeline import ProcessorStep, ProcessorStepRegistry + +# Re-export for backward compatibility +__all__ = [ + "MapDeltaActionToRobotActionStep", + "MapTensorToDeltaActionDictStep", + "RelativeActionsProcessorStep", + "AbsoluteActionsProcessorStep", + "to_relative_actions", + "to_absolute_actions", +] + + +def to_relative_actions(actions: Tensor, state: Tensor, mask: Sequence[bool]) -> Tensor: + """Convert absolute actions to relative: relative = action - state (for masked dims). + + Args: + actions: (B, T, action_dim) or (B, action_dim). + state: (B, state_dim). Broadcast across time dimension. + mask: Which dims to convert. Can be shorter than action_dim. + """ + mask_t = torch.tensor(mask, dtype=actions.dtype, device=actions.device) + dims = mask_t.shape[0] + # Align state to the same device/dtype as actions. _last_state is cached before + # DeviceProcessorStep moves the transition, so it can be on CPU while actions are on CUDA. + if state.device != actions.device or state.dtype != actions.dtype: + state = state.to(device=actions.device, dtype=actions.dtype) + state_offset = state[..., :dims] * mask_t + if actions.ndim == 3: + state_offset = state_offset.unsqueeze(-2) + actions = actions.clone() + actions[..., :dims] -= state_offset + return actions + + +def to_absolute_actions(actions: Tensor, state: Tensor, mask: Sequence[bool]) -> Tensor: + """Convert relative actions back to absolute: absolute = relative + state (for masked dims). + + Args: + actions: (B, T, action_dim) or (B, action_dim). + state: (B, state_dim). Broadcast across time dimension. + mask: Which dims to convert. Can be shorter than action_dim. + """ + mask_t = torch.tensor(mask, dtype=actions.dtype, device=actions.device) + dims = mask_t.shape[0] + # Align state to the same device/dtype as actions. _last_state is cached before + # DeviceProcessorStep moves the transition, so it can be on CPU while actions are on CUDA. + if state.device != actions.device or state.dtype != actions.dtype: + state = state.to(device=actions.device, dtype=actions.dtype) + state_offset = state[..., :dims] * mask_t + if actions.ndim == 3: + state_offset = state_offset.unsqueeze(-2) + actions = actions.clone() + actions[..., :dims] += state_offset + return actions + + +@ProcessorStepRegistry.register("delta_actions_processor") +@dataclass +class RelativeActionsProcessorStep(ProcessorStep): + """Converts absolute actions to relative actions (action -= state) for masked dimensions. + + Mirrors OpenPI's DeltaActions transform. Applied during preprocessing so the model + trains on relative offsets instead of absolute positions. + Caches the last seen state so a paired AbsoluteActionsProcessorStep can reverse + the conversion during postprocessing. + + Attributes: + enabled: Whether to apply the relative conversion. + exclude_joints: Joint names to keep absolute (not converted to relative). + action_names: Action dimension names from dataset metadata, used to build + the mask from exclude_joints. If None, all dims are converted. + """ + + enabled: bool = False + exclude_joints: list[str] = field(default_factory=list) + action_names: list[str] | None = None + _last_state: torch.Tensor | None = field(default=None, init=False, repr=False) + + def _build_mask(self, action_dim: int) -> list[bool]: + if not self.exclude_joints or self.action_names is None: + return [True] * action_dim + + exclude_tokens = [str(name).lower() for name in self.exclude_joints if name] + if not exclude_tokens: + return [True] * action_dim + + mask = [] + for name in self.action_names[:action_dim]: + action_name = str(name).lower() + is_excluded = any(token == action_name or token in action_name for token in exclude_tokens) + mask.append(not is_excluded) + + if len(mask) < action_dim: + mask.extend([True] * (action_dim - len(mask))) + + return mask + + def __call__(self, transition: EnvTransition) -> EnvTransition: + observation = transition.get(TransitionKey.OBSERVATION, {}) + state = observation.get(OBS_STATE) if observation else None + + # Always cache state for the paired AbsoluteActionsProcessorStep + if state is not None: + self._last_state = state + + if not self.enabled: + return transition + + new_transition = transition.copy() + action = new_transition.get(TransitionKey.ACTION) + if action is None or state is None: + return new_transition + + mask = self._build_mask(action.shape[-1]) + new_transition[TransitionKey.ACTION] = to_relative_actions(action, state, mask) + return new_transition + + def get_config(self) -> dict[str, Any]: + return { + "enabled": self.enabled, + "exclude_joints": self.exclude_joints, + "action_names": self.action_names, + } + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + return features + + +@ProcessorStepRegistry.register("absolute_actions_processor") +@dataclass +class AbsoluteActionsProcessorStep(ProcessorStep): + """Converts relative actions back to absolute actions (action += state) for all dimensions. + + Mirrors OpenPI's AbsoluteActions transform. Applied during postprocessing so + predicted relative offsets are converted back to absolute positions for execution. + Reads the cached state from its paired RelativeActionsProcessorStep. + + Attributes: + enabled: Whether to apply the absolute conversion. + relative_step: Reference to the paired RelativeActionsProcessorStep that caches state. + """ + + enabled: bool = False + relative_step: RelativeActionsProcessorStep | None = field(default=None, repr=False) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + if not self.enabled: + return transition + + if self.relative_step is None: + raise RuntimeError( + "AbsoluteActionsProcessorStep requires a paired RelativeActionsProcessorStep " + "but relative_step is None. Ensure relative_step is set when constructing the postprocessor." + ) + + if self.relative_step._last_state is None: + raise RuntimeError( + "AbsoluteActionsProcessorStep requires state from RelativeActionsProcessorStep " + "but no state has been cached. Ensure the preprocessor runs before the postprocessor." + ) + + new_transition = transition.copy() + action = new_transition.get(TransitionKey.ACTION) + if action is None: + return new_transition + + mask = self.relative_step._build_mask(action.shape[-1]) + new_transition[TransitionKey.ACTION] = to_absolute_actions( + action, self.relative_step._last_state, mask + ) + return new_transition + + def get_config(self) -> dict[str, Any]: + return {"enabled": self.enabled} + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + return features diff --git a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py index 7f5e92271..11da75d18 100644 --- a/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py @@ -39,13 +39,23 @@ class BiOpenArmFollower(Robot): super().__init__(config) self.config = config + # Top-level cameras are distributed evenly: each arm's OpenArmFollower + # will only open the cameras assigned to it. Per-arm cameras are used + # as fallback when top-level cameras are empty. + if config.cameras: + left_cameras = config.cameras + right_cameras = {} + else: + left_cameras = config.left_arm_config.cameras + right_cameras = config.right_arm_config.cameras + left_arm_config = OpenArmFollowerConfig( id=f"{config.id}_left" if config.id else None, calibration_dir=config.calibration_dir, port=config.left_arm_config.port, disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect, max_relative_target=config.left_arm_config.max_relative_target, - cameras=config.left_arm_config.cameras, + cameras=left_cameras, side=config.left_arm_config.side, can_interface=config.left_arm_config.can_interface, use_can_fd=config.left_arm_config.use_can_fd, @@ -63,7 +73,7 @@ class BiOpenArmFollower(Robot): port=config.right_arm_config.port, disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect, max_relative_target=config.right_arm_config.max_relative_target, - cameras=config.right_arm_config.cameras, + cameras=right_cameras, side=config.right_arm_config.side, can_interface=config.right_arm_config.can_interface, use_can_fd=config.right_arm_config.use_can_fd, @@ -93,13 +103,10 @@ class BiOpenArmFollower(Robot): @property def _cameras_ft(self) -> dict[str, tuple]: - left_arm_cameras_ft = self.left_arm._cameras_ft - right_arm_cameras_ft = self.right_arm._cameras_ft - - return { - **{f"left_{k}": v for k, v in left_arm_cameras_ft.items()}, - **{f"right_{k}": v for k, v in right_arm_cameras_ft.items()}, - } + # Cameras already have unique user-chosen names (e.g. "left_wrist", "base", + # "right_wrist"), so we merge them directly — unlike motors which need the + # left_/right_ prefix to disambiguate identical per-arm joint names. + return {**self.left_arm._cameras_ft, **self.right_arm._cameras_ft} @cached_property def observation_features(self) -> dict[str, type | tuple]: @@ -139,13 +146,17 @@ class BiOpenArmFollower(Robot): def get_observation(self) -> RobotObservation: obs_dict = {} - # Add "left_" prefix - left_obs = self.left_arm.get_observation() - obs_dict.update({f"left_{key}": value for key, value in left_obs.items()}) + # Camera keys that should NOT get the arm prefix (they already have unique names) + left_cam_keys = set(self.left_arm.cameras.keys()) + right_cam_keys = set(self.right_arm.cameras.keys()) + + left_obs = self.left_arm.get_observation() + for key, value in left_obs.items(): + obs_dict[key if key in left_cam_keys else f"left_{key}"] = value - # Add "right_" prefix right_obs = self.right_arm.get_observation() - obs_dict.update({f"right_{key}": value for key, value in right_obs.items()}) + for key, value in right_obs.items(): + obs_dict[key if key in right_cam_keys else f"right_{key}"] = value return obs_dict diff --git a/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py b/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py index 9d11f7b4e..1a7b7059d 100644 --- a/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py +++ b/src/lerobot/robots/bi_openarm_follower/config_bi_openarm_follower.py @@ -14,8 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass +from dataclasses import dataclass, field +from lerobot.cameras import CameraConfig from lerobot.robots.openarm_follower import OpenArmFollowerConfigBase from ..config import RobotConfig @@ -28,3 +29,6 @@ class BiOpenArmFollowerConfig(RobotConfig): left_arm_config: OpenArmFollowerConfigBase right_arm_config: OpenArmFollowerConfigBase + + # Top-level cameras shared across both arms. + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index 49825317d..db06f90c6 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -18,7 +18,7 @@ Edit LeRobot datasets using various transformation tools. This script allows you to delete episodes, split datasets, merge datasets, -remove features, modify tasks, and convert image datasets to video format. +remove features, modify tasks, recompute stats, and convert image datasets to video format. When new_repo_id is specified, creates a new dataset. Path semantics (v2): --root and --new_root are exact dataset folders containing @@ -148,6 +148,21 @@ Show dataset information without feature details: --operation.type info \ --operation.show_features false +Recompute dataset statistics: + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --operation.type recompute_stats + +Recompute stats for relative actions and push to hub: + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --operation.type recompute_stats \ + --operation.relative_action true \ + --operation.chunk_size 50 \ + --operation.relative_exclude_joints "['gripper']" \ + --operation.num_workers 4 \ + --push_to_hub true + Using JSON config file: lerobot-edit-dataset \ --config_path path/to/edit_config.json @@ -168,6 +183,7 @@ from lerobot.datasets.dataset_tools import ( delete_episodes, merge_datasets, modify_tasks, + recompute_stats, remove_feature, split_dataset, ) @@ -230,6 +246,16 @@ class ConvertImageToVideoConfig(OperationConfig): max_frames_per_batch: int | None = None +@OperationConfig.register_subclass("recompute_stats") +@dataclass +class RecomputeStatsConfig(OperationConfig): + skip_image_video: bool = True + relative_action: bool = False + relative_exclude_joints: list[str] | None = None + chunk_size: int = 50 + num_workers: int = 0 + + @OperationConfig.register_subclass("info") @dataclass class InfoConfig(OperationConfig): @@ -525,6 +551,35 @@ def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None: logging.info("Dataset saved locally (not pushed to hub)") +def handle_recompute_stats(cfg: EditDatasetConfig) -> None: + if not isinstance(cfg.operation, RecomputeStatsConfig): + raise ValueError("Operation config must be RecomputeStatsConfig") + + dataset = LeRobotDataset(cfg.repo_id, root=cfg.root) + + logging.info(f"Recomputing stats for {cfg.repo_id}") + if cfg.operation.relative_action: + logging.info( + f"Relative action stats enabled (chunk_size={cfg.operation.chunk_size}, " + f"exclude_joints={cfg.operation.relative_exclude_joints})" + ) + + recompute_stats( + dataset, + skip_image_video=cfg.operation.skip_image_video, + relative_action=cfg.operation.relative_action, + relative_exclude_joints=cfg.operation.relative_exclude_joints, + chunk_size=cfg.operation.chunk_size, + num_workers=cfg.operation.num_workers, + ) + + logging.info(f"Stats written to {dataset.root}") + + if cfg.push_to_hub: + logging.info(f"Pushing to hub as {dataset.meta.repo_id}...") + dataset.push_to_hub() + + def _get_dataset_size(repo_path): import os @@ -596,6 +651,8 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_modify_tasks(cfg) elif operation_type == "convert_image_to_video": handle_convert_image_to_video(cfg) + elif operation_type == "recompute_stats": + handle_recompute_stats(cfg) elif operation_type == "info": handle_info(cfg) else: diff --git a/src/lerobot/scripts/lerobot_train.py b/src/lerobot/scripts/lerobot_train.py index 1fed3bee4..0a7212911 100644 --- a/src/lerobot/scripts/lerobot_train.py +++ b/src/lerobot/scripts/lerobot_train.py @@ -252,10 +252,22 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None): # Wait for all processes to finish policy creation before continuing accelerator.wait_for_everyone() + processor_pretrained_path = cfg.policy.pretrained_path + if ( + getattr(cfg.policy, "use_relative_actions", False) + and processor_pretrained_path is not None + and not cfg.resume + ): + logging.warning( + "use_relative_actions=true with pretrained processors can skip relative transforms if " + "the checkpoint processors do not define them. Building processors from current policy config." + ) + processor_pretrained_path = None + # Create processors - only provide dataset_stats if not resuming from saved processors processor_kwargs = {} postprocessor_kwargs = {} - if (cfg.policy.pretrained_path and not cfg.resume) or not cfg.policy.pretrained_path: + if (processor_pretrained_path and not cfg.resume) or not processor_pretrained_path: # Only provide dataset_stats when not resuming from saved processor state processor_kwargs["dataset_stats"] = dataset.meta.stats @@ -263,7 +275,7 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None): if cfg.policy.type == "sarm": processor_kwargs["dataset_meta"] = dataset.meta - if cfg.policy.pretrained_path is not None: + if processor_pretrained_path is not None: processor_kwargs["preprocessor_overrides"] = { "device_processor": {"device": device.type}, "normalizer_processor": { @@ -285,7 +297,7 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None): preprocessor, postprocessor = make_pre_post_processors( policy_cfg=cfg.policy, - pretrained_path=cfg.policy.pretrained_path, + pretrained_path=processor_pretrained_path, **processor_kwargs, **postprocessor_kwargs, ) diff --git a/src/lerobot/scripts/lerobot_train_tokenizer.py b/src/lerobot/scripts/lerobot_train_tokenizer.py index 70185fc51..35c2b60cd 100644 --- a/src/lerobot/scripts/lerobot_train_tokenizer.py +++ b/src/lerobot/scripts/lerobot_train_tokenizer.py @@ -15,7 +15,7 @@ This script: 1. Loads action chunks from LeRobotDataset (with episode sampling) -2. Optionally applies delta transforms (relative vs absolute actions) +2. Optionally applies relative transforms (relative vs absolute actions) 3. Extracts specified action dimensions for encoding 4. Applies normalization (MEAN_STD, MIN_MAX, QUANTILES, or other modes) 5. Trains FAST tokenizer (BPE on DCT coefficients) on the action chunks @@ -32,8 +32,8 @@ lerobot-train-tokenizer \ --max_episodes=100 \ --sample_fraction=0.1 \ --encoded_dims="0:6" \ - --delta_dims="0,1,2,3,4,5" \ - --use_delta_transform=true \ + --relative_dims="0,1,2,3,4,5" \ + --use_relative_transform=true \ --state_key="observation.state" \ --normalization_mode="QUANTILES" \ --vocab_size=1024 \ @@ -82,10 +82,10 @@ class TokenizerTrainingConfig: sample_fraction: float = 0.1 # Comma-separated dimension ranges to encode (e.g., "0:6,7:23") encoded_dims: str = "0:6,7:23" - # Comma-separated dimension indices for delta transform (e.g., "0,1,2,3,4,5") - delta_dims: str | None = None - # Whether to apply delta transform (relative actions vs absolute actions) - use_delta_transform: bool = False + # Comma-separated dimension indices for relative transform (e.g., "0,1,2,3,4,5") + relative_dims: str | None = None + # Whether to apply relative transform (relative actions vs absolute actions) + use_relative_transform: bool = False # Dataset key for state observations (default: "observation.state") state_key: str = OBS_STATE # Normalization mode (MEAN_STD, MIN_MAX, QUANTILES, QUANTILE10, IDENTITY) @@ -104,25 +104,27 @@ class TokenizerTrainingConfig: hub_private: bool = False -def apply_delta_transform(state: np.ndarray, actions: np.ndarray, delta_dims: list[int] | None) -> np.ndarray: - """Apply delta transform to specified dimensions. +def apply_relative_transform( + state: np.ndarray, actions: np.ndarray, relative_dims: list[int] | None +) -> np.ndarray: + """Apply relative transform to specified dimensions. Args: state: Current state [D] actions: Future actions [D] - delta_dims: List of dimension indices to apply delta transform to + relative_dims: List of dimension indices to apply relative transform to Returns: Transformed actions [D] """ - if delta_dims is None or len(delta_dims) == 0: + if relative_dims is None or len(relative_dims) == 0: return actions - delta_actions = actions.copy() - for dim in delta_dims: - delta_actions[dim] = actions[dim] - state[dim] + relative_actions = actions.copy() + for dim in relative_dims: + relative_actions[dim] = actions[dim] - state[dim] - return delta_actions + return relative_actions def apply_normalization( @@ -185,7 +187,7 @@ def apply_normalization( def process_episode(args): """Process single episode and return action chunks.""" - dataset, ep_idx, action_horizon, delta_dims, sample_fraction, state_key, use_delta_transform = args + dataset, ep_idx, action_horizon, relative_dims, sample_fraction, state_key, use_relative_transform = args try: # get episode info @@ -222,7 +224,7 @@ def process_episode(args): else np.array(frame[state_key]) ) else: - # if no state key, use zeros (no delta transform) + # if no state key, use zeros (no relative transform) state = np.zeros_like( frame[ACTION].numpy() if torch.is_tensor(frame[ACTION]) else np.array(frame[ACTION]) ) @@ -243,18 +245,18 @@ def process_episode(args): current_state = states[i] # First state in chunk future_absolute_actions = actions[i : i + action_horizon] - if use_delta_transform: + if use_relative_transform: # relative actions - delta_chunk = np.zeros_like(future_absolute_actions) + relative_chunk = np.zeros_like(future_absolute_actions) for t in range(action_horizon): - delta_chunk[t] = apply_delta_transform( + relative_chunk[t] = apply_relative_transform( current_state, future_absolute_actions[t], - delta_dims, + relative_dims, ) - action_chunks.append(delta_chunk) + action_chunks.append(relative_chunk) else: - # absolute actions (no delta) + # absolute actions (no relative transform) action_chunks.append(future_absolute_actions) if len(action_chunks) == 0: @@ -407,17 +409,20 @@ def train_tokenizer(cfg: TokenizerTrainingConfig): total_encoded_dims = sum(end - start for start, end in encoded_dim_ranges) print(f"Encoding {total_encoded_dims} dimensions: {cfg.encoded_dims}") - # parse delta dimensions - delta_dim_list = None - if cfg.delta_dims is not None and cfg.delta_dims.strip(): - delta_dim_list = [int(d.strip()) for d in cfg.delta_dims.split(",")] - print(f"Delta dimensions: {delta_dim_list}") + # parse relative dimensions + relative_dim_list = None + if cfg.relative_dims is not None and cfg.relative_dims.strip(): + relative_dim_list = [int(d.strip()) for d in cfg.relative_dims.split(",")] + print(f"Relative dimensions: {relative_dim_list}") else: - print("No delta dimensions specified") + print("No relative dimensions specified") - print(f"Use delta transform: {cfg.use_delta_transform}") - if cfg.use_delta_transform and (delta_dim_list is None or len(delta_dim_list) == 0): - print("Warning: use_delta_transform=True but no delta_dims specified. No delta will be applied.") + print(f"Use relative transform: {cfg.use_relative_transform}") + if cfg.use_relative_transform and (relative_dim_list is None or len(relative_dim_list) == 0): + print( + "Warning: use_relative_transform=True but no relative_dims specified. " + "No relative transform will be applied." + ) print(f"Action horizon: {cfg.action_horizon}") print(f"State key: {cfg.state_key}") @@ -440,10 +445,10 @@ def train_tokenizer(cfg: TokenizerTrainingConfig): dataset, ep_idx, cfg.action_horizon, - delta_dim_list, + relative_dim_list, cfg.sample_fraction, cfg.state_key, - cfg.use_delta_transform, + cfg.use_relative_transform, ) ) if chunks is not None: @@ -544,9 +549,9 @@ def train_tokenizer(cfg: TokenizerTrainingConfig): "encoded_dims": cfg.encoded_dims, "encoded_dim_ranges": encoded_dim_ranges, "total_encoded_dims": total_encoded_dims, - "delta_dims": cfg.delta_dims, - "delta_dim_list": delta_dim_list, - "use_delta_transform": cfg.use_delta_transform, + "relative_dims": cfg.relative_dims, + "relative_dim_list": relative_dim_list, + "use_relative_transform": cfg.use_relative_transform, "state_key": cfg.state_key, "normalization_mode": norm_mode.value, "action_horizon": cfg.action_horizon, diff --git a/tests/policies/rtc/test_action_queue.py b/tests/policies/rtc/test_action_queue.py index 2f9b84384..460df97fe 100644 --- a/tests/policies/rtc/test_action_queue.py +++ b/tests/policies/rtc/test_action_queue.py @@ -25,7 +25,7 @@ import torch from lerobot.policies.rtc.action_queue import ActionQueue from lerobot.policies.rtc.configuration_rtc import RTCConfig -# ====================== Fixtures ====================== +# Fixtures @pytest.fixture @@ -63,7 +63,7 @@ def action_queue_rtc_disabled(rtc_config_disabled): return ActionQueue(rtc_config_disabled) -# ====================== Initialization Tests ====================== +# Initialization tests def test_action_queue_initialization_rtc_enabled(rtc_config_enabled): @@ -84,7 +84,7 @@ def test_action_queue_initialization_rtc_disabled(rtc_config_disabled): assert queue.cfg.enabled is False -# ====================== get() Tests ====================== +# get() tests def test_get_returns_none_when_empty(action_queue_rtc_enabled): @@ -136,7 +136,7 @@ def test_get_increments_last_index(action_queue_rtc_enabled, sample_actions): assert action_queue_rtc_enabled.last_index == 2 -# ====================== qsize() Tests ====================== +# qsize() tests def test_qsize_returns_zero_when_empty(action_queue_rtc_enabled): @@ -167,7 +167,7 @@ def test_qsize_after_exhaustion(action_queue_rtc_enabled, sample_actions): assert action_queue_rtc_enabled.qsize() == 0 -# ====================== empty() Tests ====================== +# empty() tests def test_empty_returns_true_when_empty(action_queue_rtc_enabled): @@ -202,7 +202,7 @@ def test_empty_after_full_consumption(action_queue_rtc_enabled, sample_actions): assert action_queue_rtc_enabled.empty() is True -# ====================== get_action_index() Tests ====================== +# get_action_index() tests def test_get_action_index_initial_value(action_queue_rtc_enabled): @@ -222,7 +222,7 @@ def test_get_action_index_after_consumption(action_queue_rtc_enabled, sample_act assert action_queue_rtc_enabled.get_action_index() == 3 -# ====================== get_left_over() Tests ====================== +# get_left_over() tests def test_get_left_over_returns_none_when_empty(action_queue_rtc_enabled): @@ -269,7 +269,7 @@ def test_get_left_over_returns_empty_after_exhaustion(action_queue_rtc_enabled, assert leftover.shape == (0, 6) -# ====================== merge() with RTC Enabled Tests ====================== +# merge() with RTC enabled tests def test_merge_replaces_queue_when_rtc_enabled(action_queue_rtc_enabled, sample_actions): @@ -336,7 +336,7 @@ def test_merge_with_large_delay(action_queue_rtc_enabled, sample_actions): assert action_queue_rtc_enabled.qsize() == 0 -# ====================== merge() with RTC Disabled Tests ====================== +# merge() with RTC disabled tests def test_merge_appends_when_rtc_disabled(action_queue_rtc_disabled, sample_actions): @@ -402,7 +402,7 @@ def test_merge_first_call_with_rtc_disabled(action_queue_rtc_disabled, sample_ac assert action_queue_rtc_disabled.last_index == 0 -# ====================== merge() with Different Action Shapes Tests ====================== +# merge() with different action shapes tests def test_merge_with_different_action_dims(): @@ -431,7 +431,7 @@ def test_merge_with_different_lengths(): assert queue.qsize() == 35 -# ====================== merge() Delay Validation Tests ====================== +# merge() delay validation tests def test_merge_validates_delay_consistency(action_queue_rtc_enabled, sample_actions, caplog): @@ -509,7 +509,7 @@ def test_merge_skips_validation_when_action_index_none(action_queue_rtc_enabled, assert "Indexes diff is not equal to real delay" not in caplog.text -# ====================== Thread Safety Tests ====================== +# Thread safety tests def test_get_is_thread_safe(action_queue_rtc_enabled, sample_actions): @@ -621,7 +621,7 @@ def test_concurrent_get_and_merge(action_queue_rtc_disabled, sample_actions): assert consumed_count[0] <= 200 -# ====================== get_left_over() Thread Safety Tests ====================== +# get_left_over() thread safety tests def test_get_left_over_is_thread_safe(action_queue_rtc_enabled, sample_actions): @@ -670,7 +670,7 @@ def test_get_left_over_is_thread_safe(action_queue_rtc_enabled, sample_actions): assert len(leftovers) > 0 -# ====================== Edge Cases Tests ====================== +# Edge cases tests def test_queue_with_single_action(action_queue_rtc_enabled): @@ -773,7 +773,7 @@ def test_qsize_with_none_queue(action_queue_rtc_enabled): assert action_queue_rtc_enabled.qsize() == 0 -# ====================== Integration Tests ====================== +# Integration tests def test_typical_rtc_workflow(action_queue_rtc_enabled, sample_actions): diff --git a/tests/policies/rtc/test_rtc_relative_actions.py b/tests/policies/rtc/test_rtc_relative_actions.py new file mode 100644 index 000000000..fa888ec05 --- /dev/null +++ b/tests/policies/rtc/test_rtc_relative_actions.py @@ -0,0 +1,607 @@ +"""Tests for RTC + relative actions integration. + +Validates that Real-Time Chunking (RTC) works correctly when the policy uses +relative actions. The key invariant: RTC guidance operates in model space +(normalized relative actions), while the robot receives absolute actions after postprocessing. + +Flow under test: + Preprocessor: raw obs → relative step caches state → normalizer + Model: generates normalized relative actions (guided by RTC using leftover relative actions) + Postprocessor: unnormalize → absolute step (relative + cached state) → robot actions +""" + +import importlib.util +import sys +from pathlib import Path + +import torch + +from lerobot.configs.types import ( + FeatureType, + NormalizationMode, + PolicyFeature, + RTCAttentionSchedule, +) +from lerobot.processor import TransitionKey, batch_to_transition +from lerobot.processor.normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep +from lerobot.processor.relative_action_processor import ( + AbsoluteActionsProcessorStep, + RelativeActionsProcessorStep, + to_relative_actions, +) +from lerobot.utils.constants import ACTION, OBS_STATE + + +def _import_rtc_module(module_name: str, filename: str): + """Import an RTC module directly from its file path, bypassing lerobot.policies.__init__.""" + rtc_dir = Path(__file__).resolve().parents[3] / "src" / "lerobot" / "policies" / "rtc" + spec = importlib.util.spec_from_file_location(module_name, rtc_dir / filename) + mod = importlib.util.module_from_spec(spec) + sys.modules[module_name] = mod + spec.loader.exec_module(mod) + return mod + + +_rtc_cfg_mod = _import_rtc_module("lerobot.policies.rtc.configuration_rtc", "configuration_rtc.py") +RTCConfig = _rtc_cfg_mod.RTCConfig + +_action_queue_mod = _import_rtc_module("lerobot.policies.rtc.action_queue", "action_queue.py") +ActionQueue = _action_queue_mod.ActionQueue + +_rtc_debug_mod = _import_rtc_module("lerobot.policies.rtc.debug_tracker", "debug_tracker.py") +_rtc_mod = _import_rtc_module("lerobot.policies.rtc.modeling_rtc", "modeling_rtc.py") +RTCProcessor = _rtc_mod.RTCProcessor + +ACTION_DIM = 6 +CHUNK_SIZE = 50 +EXECUTION_HORIZON = 10 + + +def _make_rtc_config(enabled=True): + return RTCConfig( + enabled=enabled, + execution_horizon=EXECUTION_HORIZON, + max_guidance_weight=10.0, + prefix_attention_schedule=RTCAttentionSchedule.EXP, + ) + + +def _make_relative_pipeline(action_dim=ACTION_DIM, norm_mode=NormalizationMode.MEAN_STD): + """Build paired relative/absolute processor steps and normalizer/unnormalizer.""" + features = {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))} + norm_map = {FeatureType.ACTION: norm_mode} + + stats = { + ACTION: { + "mean": torch.zeros(action_dim).numpy(), + "std": torch.ones(action_dim).numpy(), + "q01": (-2 * torch.ones(action_dim)).numpy(), + "q99": (2 * torch.ones(action_dim)).numpy(), + "min": (-3 * torch.ones(action_dim)).numpy(), + "max": (3 * torch.ones(action_dim)).numpy(), + } + } + + relative_step = RelativeActionsProcessorStep(enabled=True) + normalizer = NormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + unnormalizer = UnnormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + absolute_step = AbsoluteActionsProcessorStep(enabled=True, relative_step=relative_step) + return relative_step, normalizer, unnormalizer, absolute_step + + +class TestActionQueueRelativeActions: + """Verify ActionQueue stores model-space (relative) actions for RTC and absolute for robot.""" + + def test_left_over_returns_relative_actions(self): + """get_left_over() should return the original (relative-space) actions.""" + cfg = _make_rtc_config() + queue = ActionQueue(cfg) + + relative_actions = torch.randn(CHUNK_SIZE, ACTION_DIM) + absolute_actions = torch.randn(CHUNK_SIZE, ACTION_DIM) + queue.merge(relative_actions, absolute_actions, real_delay=0) + + for _ in range(5): + queue.get() + + leftover = queue.get_left_over() + torch.testing.assert_close(leftover, relative_actions[5:]) + + def test_robot_receives_absolute_actions(self): + """The robot (via get()) should receive postprocessed absolute actions.""" + cfg = _make_rtc_config() + queue = ActionQueue(cfg) + + relative_actions = torch.randn(CHUNK_SIZE, ACTION_DIM) + absolute_actions = torch.randn(CHUNK_SIZE, ACTION_DIM) + queue.merge(relative_actions, absolute_actions, real_delay=0) + + first_action = queue.get() + torch.testing.assert_close(first_action, absolute_actions[0]) + + +class TestRTCDenoiseWithRelativeLeftovers: + """Verify RTC denoise_step correctly handles relative-space prev_chunk_left_over.""" + + def test_first_chunk_no_guidance(self): + """First chunk (no leftovers) should return v_t without guidance.""" + rtc = RTCProcessor(_make_rtc_config()) + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + def mock_denoise(x): + return torch.ones_like(x) + + result = rtc.denoise_step( + x_t=x_t, + prev_chunk_left_over=None, + inference_delay=0, + time=0.5, + original_denoise_step_partial=mock_denoise, + ) + torch.testing.assert_close(result, torch.ones_like(x_t)) + + def test_relative_leftovers_shape_preserved(self): + """RTC output should have the same shape as input regardless of leftover shape.""" + rtc = RTCProcessor(_make_rtc_config()) + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + shorter_leftover = torch.randn(1, 20, ACTION_DIM) + + def mock_denoise(x): + return torch.zeros_like(x) + + result = rtc.denoise_step( + x_t=x_t, + prev_chunk_left_over=shorter_leftover, + inference_delay=5, + time=0.5, + original_denoise_step_partial=mock_denoise, + ) + assert result.shape == x_t.shape + + def test_guidance_steers_toward_previous_relative_actions(self): + """RTC guidance should push x1_t toward prev_chunk_left_over in relative space.""" + rtc = RTCProcessor(_make_rtc_config()) + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + prev_relatives = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + def mock_denoise(x): + return torch.zeros_like(x) + + result_without_guidance = rtc.denoise_step( + x_t=x_t.clone(), + prev_chunk_left_over=None, + inference_delay=5, + time=0.5, + original_denoise_step_partial=mock_denoise, + ) + + result_with_guidance = rtc.denoise_step( + x_t=x_t.clone(), + prev_chunk_left_over=prev_relatives, + inference_delay=5, + time=0.5, + original_denoise_step_partial=mock_denoise, + ) + + assert not torch.allclose(result_with_guidance, result_without_guidance, atol=1e-6) + + +class TestFullPipelineRelativeRTC: + """End-to-end test of the RTC + relative actions pipeline matching eval_with_real_robot.py flow.""" + + def test_preprocessor_caches_state_for_postprocessor(self): + """Preprocessor's relative step should cache state so postprocessor can convert back.""" + relative_step, normalizer, unnormalizer, absolute_step = _make_relative_pipeline() + + state = torch.randn(1, ACTION_DIM) + actions = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + batch = {ACTION: actions, OBS_STATE: state} + transition = batch_to_transition(batch) + + relative_step(transition) + assert relative_step._last_state is not None + torch.testing.assert_close(relative_step._last_state, state) + + def test_preprocessor_caches_state_without_actions(self): + """During inference, preprocessor receives only observations (no actions). + Relative step should still cache state for the postprocessor.""" + relative_step, _, _, _ = _make_relative_pipeline() + + state = torch.randn(1, ACTION_DIM) + batch = {OBS_STATE: state} + transition = batch_to_transition(batch) + + relative_step(transition) + assert relative_step._last_state is not None + torch.testing.assert_close(relative_step._last_state, state) + + def test_roundtrip_with_identity_normalization(self): + """Actions → relative → normalize → [model] → unnormalize → absolute should recover originals. + Using mean=0, std=1 normalization (identity).""" + relative_step, normalizer, unnormalizer, absolute_step = _make_relative_pipeline() + + state = torch.randn(1, ACTION_DIM) + actions = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + batch = {ACTION: actions.clone(), OBS_STATE: state} + transition = batch_to_transition(batch) + + t1 = relative_step(transition) + t2 = normalizer(t1) + + model_output = t2[TransitionKey.ACTION].clone() + + model_transition = {TransitionKey.ACTION: model_output} + t3 = unnormalizer(model_transition) + t4 = absolute_step(t3) + + recovered = t4[TransitionKey.ACTION] + torch.testing.assert_close(recovered, actions, atol=1e-5, rtol=1e-5) + + def test_eval_loop_simulation(self): + """Simulate the eval_with_real_robot.py loop with relative actions. + + Iteration 1: No leftovers → model generates relative actions → store for RTC + Iteration 2: Use leftovers as RTC guidance → model generates new relative actions + Both iterations: postprocessor converts relative actions to absolute for robot + """ + relative_step, normalizer, unnormalizer, absolute_step = _make_relative_pipeline() + rtc = RTCProcessor(_make_rtc_config()) + queue = ActionQueue(_make_rtc_config()) + + def mock_model(prev_chunk_left_over, inference_delay, state): + """Simulate model generating relative actions with RTC.""" + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + def denoise(x): + return -0.1 * x + + guided_v = rtc.denoise_step( + x_t=x_t, + prev_chunk_left_over=prev_chunk_left_over, + inference_delay=inference_delay, + time=0.5, + original_denoise_step_partial=denoise, + ) + return x_t - 0.5 * guided_v + + # --- Iteration 1: first chunk, no leftovers --- + state_1 = torch.randn(1, ACTION_DIM) + obs_batch_1 = {OBS_STATE: state_1} + relative_step(batch_to_transition(obs_batch_1)) + + model_relatives_1 = mock_model(prev_chunk_left_over=None, inference_delay=0, state=state_1) + original_actions_1 = model_relatives_1.squeeze(0) + + model_transition_1 = {TransitionKey.ACTION: model_relatives_1} + postprocessed_1 = absolute_step(unnormalizer(model_transition_1))[TransitionKey.ACTION].squeeze(0) + + queue.merge(original_actions_1, postprocessed_1, real_delay=0) + + # Consume some actions (simulate robot executing) + for _ in range(5): + action = queue.get() + assert action is not None + + # --- Iteration 2: use leftovers for RTC --- + prev_actions = queue.get_left_over() + assert prev_actions is not None + assert prev_actions.shape[0] == CHUNK_SIZE - 5 + + state_2 = state_1 + 0.01 * torch.randn(1, ACTION_DIM) + obs_batch_2 = {OBS_STATE: state_2} + relative_step(batch_to_transition(obs_batch_2)) + + model_relatives_2 = mock_model( + prev_chunk_left_over=prev_actions.unsqueeze(0), inference_delay=3, state=state_2 + ) + original_actions_2 = model_relatives_2.squeeze(0) + + model_transition_2 = {TransitionKey.ACTION: model_relatives_2} + postprocessed_2 = absolute_step(unnormalizer(model_transition_2))[TransitionKey.ACTION].squeeze(0) + + queue.merge(original_actions_2, postprocessed_2, real_delay=3) + + # Postprocessed actions should be in absolute space + action = queue.get() + assert action is not None + assert action.shape == (ACTION_DIM,) + + # Verify leftovers are in relative space (original_queue stores relative actions) + leftover_relatives = queue.get_left_over() + assert leftover_relatives is not None + assert leftover_relatives.shape[1] == ACTION_DIM + + def test_postprocessor_uses_correct_state_per_iteration(self): + """Each iteration's postprocessor should use the state from that iteration's preprocessor, + not a stale state from a previous iteration.""" + relative_step, _, unnormalizer, absolute_step = _make_relative_pipeline() + + state_1 = torch.tensor([[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]]) + state_2 = torch.tensor([[10.0, 20.0, 30.0, 40.0, 50.0, 60.0]]) + relatives = torch.zeros(1, 5, ACTION_DIM) + + # Iteration 1: cache state_1 + relative_step(batch_to_transition({OBS_STATE: state_1})) + result_1 = absolute_step(unnormalizer({TransitionKey.ACTION: relatives.clone()}))[ + TransitionKey.ACTION + ] + # relative=0 + state_1 should give state_1 + for t in range(5): + torch.testing.assert_close(result_1[0, t], state_1[0], atol=1e-5, rtol=1e-5) + + # Iteration 2: cache state_2 + relative_step(batch_to_transition({OBS_STATE: state_2})) + result_2 = absolute_step(unnormalizer({TransitionKey.ACTION: relatives.clone()}))[ + TransitionKey.ACTION + ] + for t in range(5): + torch.testing.assert_close(result_2[0, t], state_2[0], atol=1e-5, rtol=1e-5) + + +class TestStateRebasingApproximation: + """Verify that the approximation from not rebasing leftover relative actions is small + when state changes between inference calls are small (real-time control regime).""" + + def test_small_state_change_produces_small_error(self): + """With small state changes (typical in real-time control), + using stale relative actions for RTC guidance introduces negligible error.""" + state_old = torch.randn(1, ACTION_DIM) + state_new = state_old + 0.01 * torch.randn(1, ACTION_DIM) + + actions_absolute = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + mask = [True] * ACTION_DIM + + relatives_old = to_relative_actions(actions_absolute, state_old, mask) + relatives_new = to_relative_actions(actions_absolute, state_new, mask) + + error = (relatives_old - relatives_new).abs().mean() + state_change = (state_old - state_new).abs().mean() + + # Error should be proportional to state change + assert error < 0.1, ( + f"Relative-action error {error:.4f} should be small for small state change {state_change:.4f}" + ) + + def test_large_state_change_produces_proportional_error(self): + """With large state changes, stale relative actions diverge more (but RTC guidance decays).""" + state_old = torch.randn(1, ACTION_DIM) + state_new = state_old + 10.0 * torch.randn(1, ACTION_DIM) + + actions_absolute = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + mask = [True] * ACTION_DIM + + relatives_old = to_relative_actions(actions_absolute, state_old, mask) + relatives_new = to_relative_actions(actions_absolute, state_new, mask) + + error = (relatives_old - relatives_new).abs().mean() + state_change = (state_old - state_new).abs().mean() + + # Error should be roughly equal to state change + torch.testing.assert_close( + error.clone().detach(), state_change.clone().detach(), atol=1e-5, rtol=1e-5 + ) + + def test_excluded_joints_not_affected_by_state_change(self): + """Joints excluded from relative conversion should not contribute rebasing error.""" + state_old = torch.randn(1, ACTION_DIM) + state_new = state_old.clone() + state_new[0, -1] = state_old[0, -1] + 100.0 + + actions = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + mask = [True] * (ACTION_DIM - 1) + [False] + + relatives_old = to_relative_actions(actions, state_old, mask) + relatives_new = to_relative_actions(actions, state_new, mask) + + # Last dim (excluded) should have zero error + error_excluded = (relatives_old[..., -1] - relatives_new[..., -1]).abs().max() + assert error_excluded < 1e-6, f"Excluded joint should have zero error, got {error_excluded}" + + +def _detect_relative_actions(preprocessor) -> bool: + """Mirror of the helper in eval_with_real_robot.py for testing without importing it.""" + return any(isinstance(step, RelativeActionsProcessorStep) and step.enabled for step in preprocessor.steps) + + +class TestDetectRelativeActions: + """Test the _detect_relative_actions helper logic used by eval_with_real_robot.py.""" + + def test_detects_enabled_relative_step(self): + class FakePipeline: + steps = [RelativeActionsProcessorStep(enabled=True)] + + assert _detect_relative_actions(FakePipeline()) is True + + def test_ignores_disabled_relative_step(self): + class FakePipeline: + steps = [RelativeActionsProcessorStep(enabled=False)] + + assert _detect_relative_actions(FakePipeline()) is False + + def test_returns_false_when_no_relative_step(self): + class FakePipeline: + steps = [] + + assert _detect_relative_actions(FakePipeline()) is False + + +class TestNonRelativePolicy: + """Verify the same pipeline works when relative actions are disabled (standard absolute policy).""" + + def test_disabled_relative_step_is_noop(self): + relative_step = RelativeActionsProcessorStep(enabled=False) + absolute_step = AbsoluteActionsProcessorStep(enabled=False, relative_step=relative_step) + + state = torch.randn(1, ACTION_DIM) + actions = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + transition = batch_to_transition({ACTION: actions.clone(), OBS_STATE: state}) + t1 = relative_step(transition) + torch.testing.assert_close(t1[TransitionKey.ACTION], actions) + + t2 = absolute_step({TransitionKey.ACTION: actions.clone()}) + torch.testing.assert_close(t2[TransitionKey.ACTION], actions) + + def test_eval_loop_without_relative_actions(self): + """Full eval loop simulation with relative actions disabled: original and processed actions are identical.""" + features = {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(ACTION_DIM,))} + norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD} + stats = { + ACTION: { + "mean": torch.zeros(ACTION_DIM).numpy(), + "std": torch.ones(ACTION_DIM).numpy(), + } + } + + relative_step = RelativeActionsProcessorStep(enabled=False) + unnormalizer = UnnormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + absolute_step = AbsoluteActionsProcessorStep(enabled=False, relative_step=relative_step) + + rtc = RTCProcessor(_make_rtc_config()) + queue = ActionQueue(_make_rtc_config()) + + state = torch.randn(1, ACTION_DIM) + relative_step(batch_to_transition({OBS_STATE: state})) + + model_output = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + post = absolute_step(unnormalizer({TransitionKey.ACTION: model_output.clone()}))[ + TransitionKey.ACTION + ].squeeze(0) + original = model_output.squeeze(0) + + # With identity norm and no relative-action transform, original and postprocessed should match + torch.testing.assert_close(original, post, atol=1e-5, rtol=1e-5) + + queue.merge(original, post, real_delay=0) + + for _ in range(5): + queue.get() + + prev_actions = queue.get_left_over() + assert prev_actions is not None + + # RTC guidance works the same way (absolute space) + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + result = rtc.denoise_step( + x_t=x_t, + prev_chunk_left_over=prev_actions.unsqueeze(0), + inference_delay=3, + time=0.5, + original_denoise_step_partial=lambda x: torch.zeros_like(x), + ) + assert result.shape == x_t.shape + + def test_detect_relative_returns_false_when_disabled(self): + class FakePipeline: + steps = [RelativeActionsProcessorStep(enabled=False)] + + assert not _detect_relative_actions(FakePipeline()) + + def test_detect_relative_returns_false_when_absent(self): + class FakePipeline: + steps = [] + + assert not _detect_relative_actions(FakePipeline()) + + +class TestMultiChunkConsistency: + """Test multiple RTC iterations with relative actions maintain consistency.""" + + def test_three_iteration_pipeline(self): + """Simulate three consecutive RTC iterations and verify queue state consistency.""" + relative_step, normalizer, unnormalizer, absolute_step = _make_relative_pipeline() + queue = ActionQueue(_make_rtc_config()) + + states = [torch.randn(1, ACTION_DIM) + i * 0.01 for i in range(3)] + + for i in range(3): + queue.get_left_over() + + relative_step(batch_to_transition({OBS_STATE: states[i]})) + + model_output = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + post_transition = absolute_step(unnormalizer({TransitionKey.ACTION: model_output.clone()})) + postprocessed = post_transition[TransitionKey.ACTION].squeeze(0) + original = model_output.squeeze(0) + + delay = min(i * 2, CHUNK_SIZE - 1) + queue.merge(original, postprocessed, real_delay=delay) + + for _ in range(5): + action = queue.get() + assert action is not None + assert action.shape == (ACTION_DIM,) + + # After 3 iterations, queue should still be in valid state + assert queue.qsize() > 0 + leftover = queue.get_left_over() + assert leftover is not None + assert leftover.ndim == 2 + assert leftover.shape[1] == ACTION_DIM + + def test_leftover_and_processed_differ_when_relative_enabled(self): + """With relative actions enabled, original leftovers (relative) must differ from processed (absolute).""" + relative_step, _, unnormalizer, absolute_step = _make_relative_pipeline() + queue = ActionQueue(_make_rtc_config()) + + state = torch.tensor([[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]]) + relative_step(batch_to_transition({OBS_STATE: state})) + + model_relatives = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + post = absolute_step(unnormalizer({TransitionKey.ACTION: model_relatives.clone()}))[ + TransitionKey.ACTION + ].squeeze(0) + original = model_relatives.squeeze(0) + + queue.merge(original, post, real_delay=0) + + relative_leftover = queue.get_left_over() + + # Leftovers (relative) must differ from the postprocessed absolute actions + assert not torch.allclose(relative_leftover, post, atol=1e-3) + state_expanded = state.squeeze(0).unsqueeze(0).expand_as(relative_leftover) + torch.testing.assert_close(post, relative_leftover + state_expanded, atol=1e-5, rtol=1e-5) + + def test_rtc_guidance_uses_relative_space(self): + """Verify that RTC denoise_step receives relative-space leftovers, not absolute.""" + relative_step, _, unnormalizer, absolute_step = _make_relative_pipeline() + rtc = RTCProcessor(_make_rtc_config()) + queue = ActionQueue(_make_rtc_config()) + + state = torch.tensor([[10.0, 20.0, 30.0, 40.0, 50.0, 60.0]]) + relative_step(batch_to_transition({OBS_STATE: state})) + + model_relatives = 0.1 * torch.randn(1, CHUNK_SIZE, ACTION_DIM) + post = absolute_step(unnormalizer({TransitionKey.ACTION: model_relatives.clone()}))[ + TransitionKey.ACTION + ].squeeze(0) + original = model_relatives.squeeze(0) + + queue.merge(original, post, real_delay=0) + + for _ in range(5): + queue.get() + + prev_left_over = queue.get_left_over() + + # prev_left_over should be small relative offsets (around 0.1 * randn), not large absolute values + assert prev_left_over.abs().mean() < 5.0, ( + f"Leftover should be small relative offsets, got mean abs {prev_left_over.abs().mean():.2f}" + ) + + x_t = torch.randn(1, CHUNK_SIZE, ACTION_DIM) + + def denoise(x): + return torch.zeros_like(x) + + result = rtc.denoise_step( + x_t=x_t, + prev_chunk_left_over=prev_left_over.unsqueeze(0), + inference_delay=3, + time=0.5, + original_denoise_step_partial=denoise, + ) + + assert result.shape == x_t.shape diff --git a/tests/policies/test_relative_actions.py b/tests/policies/test_relative_actions.py new file mode 100644 index 000000000..64c2ee9c4 --- /dev/null +++ b/tests/policies/test_relative_actions.py @@ -0,0 +1,346 @@ +"""Tests for relative action transforms — full pipeline validation. + +Tests the complete flow matching OpenPI: + raw actions → RelativeActions → Normalize(relative_stats) → model → Unnormalize → AbsoluteActions + +Uses real dataset: lerobot-data-collection/dagger_final_1_21 +""" + +import numpy as np +import pytest +import torch + +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature +from lerobot.datasets.compute_stats import get_feature_stats +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.processor import TransitionKey, batch_to_transition +from lerobot.processor.normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep +from lerobot.processor.relative_action_processor import ( + AbsoluteActionsProcessorStep, + RelativeActionsProcessorStep, + to_absolute_actions, + to_relative_actions, +) +from lerobot.utils.constants import ACTION, OBS_STATE + +CHUNK_SIZE = 10 +REPO_ID = "lerobot-data-collection/dagger_final_1_21" + + +@pytest.fixture(scope="module") +def dataset(): + return LeRobotDataset(REPO_ID, episodes=[0]) + + +@pytest.fixture(scope="module") +def action_dim(dataset): + return dataset.meta.features["action"]["shape"][0] + + +def _build_action_chunks(dataset, chunk_size, max_chunks=50): + """Build action chunks from hf_dataset, like the training script does.""" + hf = dataset.hf_dataset + total = len(hf) + all_ep = torch.tensor([int(hf[i]["episode_index"]) for i in range(total)]) + chunks, states = [], [] + for i in range(total - chunk_size + 1): + if all_ep[i] != all_ep[i + chunk_size - 1]: + continue + chunk_actions = torch.stack([hf[i + k]["action"] for k in range(chunk_size)]).float() + state = hf[i]["observation.state"].float() + chunks.append(chunk_actions) + states.append(state) + if len(chunks) >= max_chunks: + break + assert len(chunks) > 0, f"No valid chunks found. total={total}, ep_indices={all_ep.tolist()}" + return torch.stack(chunks), torch.stack(states) + + +def _compute_relative_chunk_stats(action_chunks, states, mask): + all_chunks = [] + for actions, state in zip(action_chunks, states, strict=True): + relative = to_relative_actions(actions.unsqueeze(0), state.unsqueeze(0), mask).squeeze(0) + all_chunks.append(relative.numpy()) + all_relative = np.concatenate(all_chunks, axis=0) + return get_feature_stats(all_relative, axis=0, keepdims=all_relative.ndim == 1) + + +# Basic roundtrip tests + + +def test_roundtrip_3d(action_dim): + actions = torch.randn(4, CHUNK_SIZE, action_dim) + state = torch.randn(4, action_dim) + mask = [True] * action_dim + recovered = to_absolute_actions(to_relative_actions(actions, state, mask), state, mask) + torch.testing.assert_close(recovered, actions) + + +def test_roundtrip_2d(action_dim): + actions = torch.randn(4, action_dim) + state = torch.randn(4, action_dim) + mask = [True] * action_dim + recovered = to_absolute_actions(to_relative_actions(actions, state, mask), state, mask) + torch.testing.assert_close(recovered, actions) + + +def test_no_mutation(action_dim): + actions = torch.randn(2, CHUNK_SIZE, action_dim) + original = actions.clone() + state = torch.randn(2, action_dim) + to_relative_actions(actions, state, [True] * action_dim) + torch.testing.assert_close(actions, original) + + +def test_exclude_joints_supports_partial_name_matching(): + names = [ + "right_joint_1.pos", + "right_gripper.pos", + "left_joint_1.pos", + "left_gripper.pos", + ] + step = RelativeActionsProcessorStep(enabled=True, exclude_joints=["gripper"], action_names=names) + assert step._build_mask(len(names)) == [True, False, True, False] + + +# Chunk-level relative stats test + + +def test_chunk_stats_have_larger_std_than_frame_stats(dataset, action_dim): + """Chunk-level relative stats should have larger std than per-frame relative stats.""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + mask = [True] * action_dim + + chunk_stats = _compute_relative_chunk_stats(action_chunks, states, mask) + + # Per-frame stats + hf = dataset.hf_dataset + n = min(500, len(hf)) + frame_actions = torch.stack([hf[i]["action"] for i in range(n)]).float() + frame_states = torch.stack([hf[i]["observation.state"] for i in range(n)]).float() + frame_relatives = to_relative_actions(frame_actions, frame_states, mask).numpy() + frame_stats = get_feature_stats(frame_relatives, axis=0, keepdims=frame_relatives.ndim == 1) + + assert chunk_stats["std"].mean() >= frame_stats["std"].mean(), ( + f"Chunk std ({chunk_stats['std'].mean():.4f}) should be >= " + f"frame std ({frame_stats['std'].mean():.4f})" + ) + + +# Full pipeline roundtrip: relative → normalize → unnormalize → absolute + + +def test_full_pipeline_roundtrip(dataset, action_dim): + """Test the complete OpenPI pipeline: relative → normalize → unnormalize → absolute.""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + mask = [True] * action_dim + + relative_stats = _compute_relative_chunk_stats(action_chunks, states, mask) + stats = {ACTION: dict(relative_stats.items())} + + features = {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))} + norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD} + + relative_step = RelativeActionsProcessorStep(enabled=True) + normalizer = NormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + unnormalizer = UnnormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + absolute_step = AbsoluteActionsProcessorStep(enabled=True, relative_step=relative_step) + + original_actions = action_chunks[0].unsqueeze(0) + state = states[0].unsqueeze(0) + + batch = {ACTION: original_actions, OBS_STATE: state} + transition = batch_to_transition(batch) + + # Forward: relative → normalize + t1 = relative_step(transition) + t2 = normalizer(t1) + + normalized_action = t2[TransitionKey.ACTION] + assert normalized_action.abs().mean() < 10, ( + f"Normalized actions should be in reasonable range, got mean abs {normalized_action.abs().mean():.2f}" + ) + + # Reverse: unnormalize → absolute + t3 = unnormalizer(t2) + t4 = absolute_step(t3) + + recovered_actions = t4[TransitionKey.ACTION] + torch.testing.assert_close(recovered_actions, original_actions, atol=1e-4, rtol=1e-4) + + +def test_normalized_relative_values_are_reasonable(dataset, action_dim): + """With correct chunk stats, normalized relative actions should be in a reasonable range.""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + mask = [True] * action_dim + + relative_stats = _compute_relative_chunk_stats(action_chunks, states, mask) + mean = torch.tensor(relative_stats["mean"]).float() + std = torch.tensor(relative_stats["std"]).float() + + all_normalized = [] + for actions, state in zip(action_chunks, states, strict=True): + relative = to_relative_actions(actions.unsqueeze(0), state.unsqueeze(0), mask).squeeze(0) + normalized = (relative - mean) / (std + 1e-6) + all_normalized.append(normalized) + + all_normalized = torch.cat(all_normalized, dim=0) + + pct_in_range = (all_normalized.abs() < 5).float().mean() + assert pct_in_range > 0.9, ( + f"Only {pct_in_range * 100:.1f}% of normalized values in [-5, 5], expected >90%" + ) + + assert all_normalized.mean().abs() < 1.0, ( + f"Mean of normalized relative actions is {all_normalized.mean():.2f}, expected near 0" + ) + + +def test_processor_step_roundtrip(dataset, action_dim): + """RelativeActionsProcessorStep applies relative offsets; to_absolute_actions recovers original.""" + hf = dataset.hf_dataset + batch = { + ACTION: torch.stack([hf[i]["action"] for i in range(4)]), + OBS_STATE: torch.stack([hf[i]["observation.state"] for i in range(4)]), + } + original_actions = batch[ACTION].clone() + transition = batch_to_transition(batch) + + step = RelativeActionsProcessorStep(enabled=True) + relative_transition = step(transition) + assert not torch.allclose(relative_transition[TransitionKey.ACTION], original_actions) + + state = transition[TransitionKey.OBSERVATION][OBS_STATE] + mask = [True] * action_dim + recovered = to_absolute_actions(relative_transition[TransitionKey.ACTION], state, mask) + torch.testing.assert_close(recovered, original_actions) + + +def test_processor_step_disabled_is_noop(dataset, action_dim): + """enabled=False should be a no-op.""" + hf = dataset.hf_dataset + batch = { + ACTION: torch.stack([hf[i]["action"] for i in range(2)]), + OBS_STATE: torch.stack([hf[i]["observation.state"] for i in range(2)]), + } + original = batch[ACTION].clone() + transition = batch_to_transition(batch) + result = RelativeActionsProcessorStep(enabled=False)(transition) + torch.testing.assert_close(result[TransitionKey.ACTION], original) + + +# Training batch shape validation + + +def test_relative_with_action_chunks(dataset, action_dim): + """Verify relative actions work correctly with (B, chunk_size, action_dim) shaped actions.""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + + # Simulate a training batch: actions=(B, chunk_size, action_dim), state=(B, state_dim) + batch_actions = action_chunks[:4] # (4, chunk_size, action_dim) + batch_states = states[:4] # (4, state_dim) + + mask = [True] * action_dim + relative = to_relative_actions(batch_actions, batch_states, mask) + + # First action in each chunk should be close to zero (action[t] - state[t] ≈ small) + first_relatives = relative[:, 0, :] # (B, action_dim) + assert first_relatives.abs().mean() < relative.abs().mean(), ( + f"First action in chunk should have smaller relative offset than average. " + f"First: {first_relatives.abs().mean():.4f}, Average: {relative.abs().mean():.4f}" + ) + + # Later actions should have larger relative offsets + last_relatives = relative[:, -1, :] # (B, action_dim) + assert last_relatives.abs().mean() >= first_relatives.abs().mean(), ( + f"Last action in chunk should have >= relative offset than first. " + f"Last: {last_relatives.abs().mean():.4f}, First: {first_relatives.abs().mean():.4f}" + ) + + # Roundtrip + recovered = to_absolute_actions(relative, batch_states, mask) + torch.testing.assert_close(recovered, batch_actions) + + +def test_relative_stats_match_actual_data_distribution(dataset, action_dim): + """Verify computed stats match the actual relative-action distribution.""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + mask = [True] * action_dim + + # Compute stats like the training script does + relative_stats = _compute_relative_chunk_stats(action_chunks, states, mask) + + # Also compute directly + all_relatives = [] + for actions, state in zip(action_chunks, states, strict=True): + rel = to_relative_actions(actions.unsqueeze(0), state.unsqueeze(0), mask).squeeze(0) + all_relatives.append(rel) + all_relatives_tensor = torch.cat(all_relatives, dim=0) + + # Compare mean + actual_mean = all_relatives_tensor.mean(dim=0).numpy() + np.testing.assert_allclose(relative_stats["mean"], actual_mean, atol=0.01) + + # Compare std + actual_std = all_relatives_tensor.std(dim=0).numpy() + np.testing.assert_allclose(relative_stats["std"], actual_std, atol=0.1) + + # Verify q01 < mean < q99 + assert (relative_stats["q01"] < relative_stats["mean"]).all(), "q01 should be < mean" + assert (relative_stats["mean"] < relative_stats["q99"]).all(), "mean should be < q99" + + +def test_quantile_normalization_roundtrip(dataset, action_dim): + """Full roundtrip with QUANTILES normalization (what OpenPI uses for pi05).""" + action_chunks, states = _build_action_chunks(dataset, CHUNK_SIZE) + mask = [True] * action_dim + + relative_stats = _compute_relative_chunk_stats(action_chunks, states, mask) + stats = {ACTION: dict(relative_stats.items())} + + features = {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(action_dim,))} + norm_map = {FeatureType.ACTION: NormalizationMode.QUANTILES} + + relative_step = RelativeActionsProcessorStep(enabled=True) + normalizer = NormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + unnormalizer = UnnormalizerProcessorStep(features=features, norm_map=norm_map, stats=stats) + absolute_step = AbsoluteActionsProcessorStep(enabled=True, relative_step=relative_step) + + original_actions = action_chunks[0].unsqueeze(0) + state = states[0].unsqueeze(0) + + batch = {ACTION: original_actions, OBS_STATE: state} + transition = batch_to_transition(batch) + + # Forward: relative → quantile normalize + t1 = relative_step(transition) + t2 = normalizer(t1) + + normalized = t2[TransitionKey.ACTION] + # Most values should be in [-1, 1] with quantile normalization + pct_in_range = (normalized.abs() < 2).float().mean() + assert pct_in_range > 0.5, f"Only {pct_in_range * 100:.1f}% in [-2, 2] after quantile norm, expected >50%" + + # Reverse: unnormalize → absolute + t3 = unnormalizer(t2) + t4 = absolute_step(t3) + + recovered = t4[TransitionKey.ACTION] + torch.testing.assert_close(recovered, original_actions, atol=1e-3, rtol=1e-3) + + +def test_state_not_modified_by_relative_processor(dataset, action_dim): + """State should never be modified by the relative-action processor.""" + hf = dataset.hf_dataset + batch = { + ACTION: torch.stack([hf[i]["action"] for i in range(4)]), + OBS_STATE: torch.stack([hf[i]["observation.state"] for i in range(4)]), + } + original_state = batch[OBS_STATE].clone() + transition = batch_to_transition(batch) + + step = RelativeActionsProcessorStep(enabled=True) + result = step(transition) + + result_state = result[TransitionKey.OBSERVATION][OBS_STATE] + torch.testing.assert_close(result_state, original_state)