mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-19 01:07:18 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bf03414b38 | |||
| 9860f794cf | |||
| 7940bfad52 | |||
| a2246a650b |
@@ -188,7 +188,7 @@ jobs:
|
||||
- name: Verify GPU availability
|
||||
run: |
|
||||
nvidia-smi
|
||||
python -c "import torch; print(f'PyTorch version: {torch.__version__}'); print(f'PyTorch CUDA available: {torch.cuda.is_available()}'); print(f'Number of GPUs: {torch.cuda.device_count()}')"
|
||||
python -c "import torch; print(f'PyTorch CUDA available: {torch.cuda.is_available()}'); print(f'Number of GPUs: {torch.cuda.device_count()}')"
|
||||
|
||||
- name: Run multi-GPU training tests
|
||||
# TODO(Steven): Investigate why motors tests are failing in multi-GPU setup
|
||||
|
||||
@@ -19,8 +19,6 @@
|
||||
title: Multi GPU training
|
||||
- local: peft_training
|
||||
title: Training with PEFT (e.g., LoRA)
|
||||
- local: rename_map
|
||||
title: Using Rename Map and Empty Cameras
|
||||
title: "Tutorials"
|
||||
- sections:
|
||||
- local: lerobot-dataset-v3
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
# Understanding the Rename Map and Empty Cameras
|
||||
|
||||
When you train or evaluate a robot policy, your **dataset** or **environment** hands you observations under one set of keys (e.g. `observation.images.front`, `observation.images.eagle`), while your **policy** was built to expect another (e.g. `observation.images.image`, `observation.images.image2`). The rename map is how you bridge that gap without changing the policy or the data source.
|
||||
|
||||
This guide explains why it exists, how to use it in training and evaluation, and when to use **empty cameras** so you can fine-tune multi-camera policies on datasets that have fewer views.
|
||||
|
||||
---
|
||||
|
||||
## Why observation keys don’t always match
|
||||
|
||||
Policies have a fixed set of **input feature names** (often coming from a pretrained config). For example:
|
||||
|
||||
- **XVLA-base** expects three image keys: `observation.images.image`, `observation.images.image2`, `observation.images.image3`.
|
||||
- **pi0-fast-libero** might expect `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb`.
|
||||
|
||||
Your dataset or sim might use completely different names: `observation.images.front`, `observation.images.eagle`, `observation.images.glove` (e.g. [svla_so100_sorting](https://huggingface.co/datasets/lerobot/svla_so100_sorting)). Or your eval env (e.g. LIBERO) might return `observation.images.image` and `observation.images.image2`.
|
||||
|
||||
Rather than renaming columns in the dataset or editing the policy code, LeRobot lets you pass a **rename map**: a dictionary that says “when you see this key in the data, treat it as this key for the policy.” Renaming is applied in the preprocessing pipeline so the policy always receives the keys it expects.
|
||||
|
||||
---
|
||||
|
||||
## How the rename map works
|
||||
|
||||
The rename map is a dictionary:
|
||||
|
||||
- **Keys** = observation keys as produced by your **dataset** (training) or **environment** (evaluation).
|
||||
- **Values** = the observation keys your **policy** expects.
|
||||
|
||||
Only keys listed in the map are renamed; everything else is left as-is. Under the hood, the [RenameObservationsProcessorStep](https://github.com/huggingface/lerobot/blob/main/src/lerobot/processor/rename_processor.py) runs in the preprocessor and rewrites observation keys (and keeps normalization stats aligned) so the batch matches the policy’s `input_features`.
|
||||
|
||||
You can use the same idea for **training** (dataset → policy) and **evaluation** (env → policy).
|
||||
|
||||
<p align="center">
|
||||
<img
|
||||
src="https://huggingface.co/datasets/jadechoghari/images/resolve/main/rename-map.png"
|
||||
alt="Rename map: mapping dataset or environment observation keys to policy input keys"
|
||||
style="max-width: 100%; height: auto;"
|
||||
/>
|
||||
</p>
|
||||
|
||||
---
|
||||
|
||||
## Option 1: Use a rename map (recommended)
|
||||
|
||||
You pass the mapping on the command line so dataset/env keys are renamed to what the policy expects. No need to change the policy repo or the data.
|
||||
|
||||
### Training example: XVLA on a dataset with different camera names
|
||||
|
||||
Suppose you fine-tune [lerobot/xvla-base](https://huggingface.co/lerobot/xvla-base) on a dataset whose images are stored under `observation.images.front`, `observation.images.eagle`, and `observation.images.glove`. XVLA expects `observation.images.image`, `observation.images.image2`, and `observation.images.image3`. Map the dataset keys to the policy keys:
|
||||
|
||||
```bash
|
||||
lerobot-train \
|
||||
--dataset.repo_id=YOUR_DATASET \
|
||||
--output_dir=./outputs/xvla_training \
|
||||
--job_name=xvla_training \
|
||||
--policy.path="lerobot/xvla-base" \
|
||||
--policy.repo_id="HF_USER/xvla-your-robot" \
|
||||
--policy.dtype=bfloat16 \
|
||||
--policy.action_mode=auto \
|
||||
--steps=20000 \
|
||||
--policy.device=cuda \
|
||||
--policy.freeze_vision_encoder=false \
|
||||
--policy.freeze_language_encoder=false \
|
||||
--policy.train_policy_transformer=true \
|
||||
--policy.train_soft_prompts=true \
|
||||
--rename_map='{"observation.images.front": "observation.images.image", "observation.images.eagle": "observation.images.image2", "observation.images.glove": "observation.images.image3"}'
|
||||
```
|
||||
|
||||
Order of entries in the map doesn’t matter; each dataset key is renamed to the corresponding policy key.
|
||||
|
||||
### Evaluation example: Policy trained on different camera names than the env
|
||||
|
||||
You trained (or downloaded) a policy that expects `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb` (e.g. [pi0fast-libero](https://huggingface.co/lerobot/pi0fast-libero)), but your evaluation environment (e.g. LIBERO) returns `observation.images.image` and `observation.images.image2`. Tell the eval script how to rename env keys to policy keys:
|
||||
|
||||
```bash
|
||||
lerobot-eval \
|
||||
--policy.path=lerobot/pi0fast-libero \
|
||||
--env.type=libero \
|
||||
... \
|
||||
--rename_map='{"observation.images.image": "observation.images.base_0_rgb", "observation.images.image2": "observation.images.left_wrist_0_rgb"}'
|
||||
```
|
||||
|
||||
So: **key = what the env gives, value = what the policy expects.** Same convention as in training.
|
||||
|
||||
---
|
||||
|
||||
## Option 2: Change the policy config (no rename map)
|
||||
|
||||
If you prefer not to pass a rename map every time, you can **edit the policy’s `config.json`** so that its expected observation keys match your dataset or environment. For example, change the policy’s visual input keys to `observation.images.front`, `observation.images.eagle`, `observation.images.glove` to match your dataset, or to `observation.images.image` / `observation.images.image2` to match LIBERO.
|
||||
|
||||
- **Training:** If the dataset’s camera keys match the (modified) policy config, you don’t need a rename map.
|
||||
- **Evaluation:** If the env’s keys match the (modified) policy config, you don’t need a rename map for eval either.
|
||||
|
||||
The tradeoff: you’re changing the policy repo or your local checkpoint. That’s fine if you’re only ever using that one dataset or env; a rename map keeps the same policy usable across multiple data sources without touching the config.
|
||||
|
||||
---
|
||||
|
||||
## When you have fewer cameras than the policy expects: empty cameras
|
||||
|
||||
Some policies (e.g. XVLA) are built for a fixed number of image inputs (e.g. three). Your dataset might only have **two** cameras. You still want to fine-tune without changing the model architecture.
|
||||
|
||||
LeRobot supports this with **empty cameras**: the config declares extra “slots” that the policy expects, but the dataset (or env) does not provide. Those slots are filled with placeholder keys and typically zero or masked inputs so the policy can run with fewer real views.
|
||||
|
||||
<p align="center">
|
||||
<img
|
||||
src="https://huggingface.co/datasets/jadechoghari/images/resolve/main/empty_cam.png"
|
||||
alt="Empty cameras: using placeholder slots when the dataset has fewer views than the policy expects"
|
||||
style="max-width: 100%; height: auto;"
|
||||
/>
|
||||
</p>
|
||||
|
||||
- In the policy config (e.g. [xvla-base config.json](https://huggingface.co/lerobot/xvla-base/blob/main/config.json)), `empty_cameras` is the number of these extra slots (default `0`).
|
||||
- For each slot, the config adds an observation key of the form:
|
||||
`observation.images.empty_camera_0`, `observation.images.empty_camera_1`, …
|
||||
|
||||
Example: XVLA-base has three visual inputs and `empty_cameras=0`. Your dataset has only two images. Set **`empty_cameras=1`**. Then:
|
||||
|
||||
1. The config gains a third visual key: `observation.images.empty_camera_0`.
|
||||
2. You still use the rename map (or matching config keys) for the two real cameras.
|
||||
3. The third view is treated as “empty” (no corresponding dataset key); the policy ignores or masks it as needed.
|
||||
|
||||
So you fine-tune on two observations only, and the third visual input is effectively unused. You do **not** need to add a fake third image to your dataset.
|
||||
|
||||
---
|
||||
|
||||
## Where the rename map is used in the codebase
|
||||
|
||||
- **Training** ([`lerobot_train.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/scripts/lerobot_train.py)): `rename_map` is passed into `make_policy(..., rename_map=cfg.rename_map)` and into the preprocessor as `rename_observations_processor: {"rename_map": cfg.rename_map}`. Batches from the dataset are renamed before being fed to the policy.
|
||||
- **Evaluation** ([`lerobot_eval.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/scripts/lerobot_eval.py)): Same idea—`rename_map` is passed to `make_policy` and to the preprocessor so env observations are renamed before the policy sees them.
|
||||
- **Processor** ([`rename_processor.py`](https://github.com/huggingface/lerobot/blob/main/src/lerobot/processor/rename_processor.py)): `RenameObservationsProcessorStep` does the actual key renaming and updates feature metadata so normalization stats stay consistent with the renamed keys.
|
||||
|
||||
If you see a feature mismatch error (“Missing features” / “Extra features”), the error message suggests using `--rename_map` with a mapping from your data’s keys to the policy’s expected keys.
|
||||
|
||||
---
|
||||
|
||||
## Quick reference
|
||||
|
||||
| Goal | What to do |
|
||||
| ------------------------------------- | ---------------------------------------------------------------------------------------------------------- |
|
||||
| Dataset keys ≠ policy keys (training) | `--rename_map='{"dataset_key": "policy_key", ...}'` |
|
||||
| Env keys ≠ policy keys (eval) | `--rename_map='{"env_key": "policy_key", ...}'` |
|
||||
| Fewer cameras than policy expects | Set `empty_cameras` in the policy config (e.g. `1` when you have 2 real cameras and the policy expects 3). |
|
||||
| Avoid passing a rename map | Edit the policy’s `config.json` so its observation keys match your dataset or env. |
|
||||
|
||||
The rename map keeps your pipeline flexible: one policy, many data sources, no code changes—just a small dictionary on the command line or in your config.
|
||||
@@ -18,7 +18,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.policies.act.modeling_act import ACTPolicy
|
||||
from lerobot.policies.factory import make_pre_post_processors
|
||||
from lerobot.processor import make_default_processors
|
||||
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
|
||||
from lerobot.scripts.lerobot_record import record_loop
|
||||
from lerobot.utils.constants import ACTION, OBS_STR
|
||||
@@ -71,9 +70,6 @@ def main():
|
||||
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
|
||||
robot.connect()
|
||||
|
||||
# TODO(Steven): Update this example to use pipelines
|
||||
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
|
||||
|
||||
# Initialize the keyboard listener and rerun visualization
|
||||
listener, events = init_keyboard_listener()
|
||||
init_rerun(session_name="lekiwi_evaluate")
|
||||
@@ -99,9 +95,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -116,9 +109,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.processor import make_default_processors
|
||||
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
|
||||
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
|
||||
from lerobot.scripts.lerobot_record import record_loop
|
||||
@@ -46,9 +45,6 @@ def main():
|
||||
leader_arm = SO100Leader(leader_arm_config)
|
||||
keyboard = KeyboardTeleop(keyboard_config)
|
||||
|
||||
# TODO(Steven): Update this example to use pipelines
|
||||
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
|
||||
|
||||
# Configure the dataset features
|
||||
action_features = hw_to_dataset_features(robot.action_features, ACTION)
|
||||
obs_features = hw_to_dataset_features(robot.observation_features, OBS_STR)
|
||||
@@ -93,9 +89,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -111,9 +104,6 @@ def main():
|
||||
control_time_s=RESET_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -17,30 +17,16 @@
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
|
||||
from lerobot.datasets.utils import combine_feature_dicts
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.policies.act.modeling_act import ACTPolicy
|
||||
from lerobot.policies.factory import make_pre_post_processors
|
||||
from lerobot.processor import (
|
||||
RobotAction,
|
||||
RobotObservation,
|
||||
RobotProcessorPipeline,
|
||||
make_default_teleop_action_processor,
|
||||
)
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
ForwardKinematicsJointsToEE,
|
||||
InverseKinematicsEEToJoints,
|
||||
from lerobot.robots.so_follower.pipelines import (
|
||||
make_so10x_fk_observation_pipeline,
|
||||
make_so10x_ik_action_pipeline,
|
||||
)
|
||||
from lerobot.scripts.lerobot_record import record_loop
|
||||
from lerobot.utils.control_utils import init_keyboard_listener
|
||||
from lerobot.utils.pipeline_utils import build_dataset_features
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import init_rerun
|
||||
|
||||
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
|
||||
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
|
||||
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
|
||||
|
||||
def main():
|
||||
# Create the robot configuration & robot
|
||||
@@ -64,68 +54,31 @@ def main():
|
||||
|
||||
robot = SO100Follower(robot_config)
|
||||
|
||||
# Create policy
|
||||
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
|
||||
# Attach FK/IK pipelines so the robot works in EE space
|
||||
motor_names = list(robot.bus.motors.keys())
|
||||
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(robot.bus.motors.keys()),
|
||||
)
|
||||
|
||||
# Build pipeline to convert EE action to joints action
|
||||
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=kinematics_solver,
|
||||
motor_names=list(robot.bus.motors.keys()),
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
# Build pipeline to convert joints observation to EE observation
|
||||
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
|
||||
)
|
||||
],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
# Create the dataset
|
||||
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=HF_DATASET_ID,
|
||||
fps=FPS,
|
||||
features=combine_feature_dicts(
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=robot_joints_to_ee_pose_processor,
|
||||
initial_features=create_initial_features(observation=robot.observation_features),
|
||||
use_videos=True,
|
||||
),
|
||||
# User for now should be explicit on the feature keys that were used for record
|
||||
# Alternatively, the user can pass the processor step that has the right features
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=make_default_teleop_action_processor(),
|
||||
initial_features=create_initial_features(
|
||||
action={
|
||||
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
|
||||
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
|
||||
}
|
||||
),
|
||||
use_videos=True,
|
||||
),
|
||||
features=build_dataset_features(
|
||||
robot,
|
||||
use_videos=True,
|
||||
action_features={
|
||||
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
|
||||
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
|
||||
},
|
||||
),
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4,
|
||||
)
|
||||
|
||||
# Create policy
|
||||
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
|
||||
|
||||
# Build Policy Processors
|
||||
preprocessor, postprocessor = make_pre_post_processors(
|
||||
policy_cfg=policy,
|
||||
@@ -151,21 +104,18 @@ def main():
|
||||
for episode_idx in range(NUM_EPISODES):
|
||||
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
|
||||
|
||||
# Main record loop
|
||||
# Main record loop — pipelines applied internally by robot
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
policy=policy,
|
||||
preprocessor=preprocessor, # Pass the pre and post policy processors
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
dataset=dataset,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=make_default_teleop_action_processor(),
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose_processor,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -180,9 +130,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=make_default_teleop_action_processor(),
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose_processor,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -16,21 +16,17 @@
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
|
||||
from lerobot.datasets.utils import combine_feature_dicts
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
transition_to_observation,
|
||||
robot_action_to_transition,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.robots.so_follower.pipelines import make_so10x_fk_observation_pipeline
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
EEBoundsAndSafety,
|
||||
EEReferenceAndDelta,
|
||||
ForwardKinematicsJointsToEE,
|
||||
GripperVelocityToJoint,
|
||||
InverseKinematicsEEToJoints,
|
||||
)
|
||||
@@ -39,6 +35,7 @@ from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS
|
||||
from lerobot.teleoperators.phone.phone_processor import MapPhoneActionToRobotAction
|
||||
from lerobot.teleoperators.phone.teleop_phone import Phone
|
||||
from lerobot.utils.control_utils import init_keyboard_listener
|
||||
from lerobot.utils.pipeline_utils import build_dataset_features
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import init_rerun
|
||||
|
||||
@@ -49,6 +46,10 @@ RESET_TIME_SEC = 30
|
||||
TASK_DESCRIPTION = "My task description"
|
||||
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
|
||||
|
||||
def main():
|
||||
# Create the robot and teleoperator configurations
|
||||
@@ -65,77 +66,59 @@ def main():
|
||||
robot = SO100Follower(robot_config)
|
||||
phone = Phone(teleop_config)
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
motor_names = list(robot.bus.motors.keys())
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
|
||||
kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
urdf_path=URDF_PATH,
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(robot.bus.motors.keys()),
|
||||
joint_names=motor_names,
|
||||
)
|
||||
|
||||
# Build pipeline to convert phone action to EE action
|
||||
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
](
|
||||
steps=[
|
||||
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
|
||||
EEReferenceAndDelta(
|
||||
kinematics=kinematics_solver,
|
||||
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
|
||||
motor_names=list(robot.bus.motors.keys()),
|
||||
use_latched_reference=True,
|
||||
),
|
||||
EEBoundsAndSafety(
|
||||
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
|
||||
max_ee_step_m=0.20,
|
||||
),
|
||||
GripperVelocityToJoint(speed_factor=20.0),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
# Phone output pipeline: map raw phone gesture to EE delta (no robot obs needed)
|
||||
phone.set_output_pipeline(
|
||||
RobotProcessorPipeline[RobotAction, RobotAction](
|
||||
steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os)],
|
||||
to_transition=robot_action_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
)
|
||||
|
||||
# Build pipeline to convert EE action to joints action
|
||||
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=kinematics_solver,
|
||||
motor_names=list(robot.bus.motors.keys()),
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
# Robot FK observation pipeline: joints → EE pose
|
||||
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
|
||||
# Robot input pipeline: EE delta + current robot obs → joint commands
|
||||
robot.set_input_pipeline(
|
||||
RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
EEReferenceAndDelta(
|
||||
kinematics=kinematics_solver,
|
||||
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
|
||||
motor_names=motor_names,
|
||||
use_latched_reference=True,
|
||||
),
|
||||
EEBoundsAndSafety(
|
||||
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
|
||||
max_ee_step_m=0.20,
|
||||
),
|
||||
GripperVelocityToJoint(speed_factor=20.0),
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=kinematics_solver,
|
||||
motor_names=motor_names,
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
)
|
||||
|
||||
# Build pipeline to convert joint observation to EE observation
|
||||
robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
|
||||
)
|
||||
],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
# Create the dataset
|
||||
# Dataset features auto-derived from robot's FK obs pipeline and phone's mapped action pipeline
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=HF_REPO_ID,
|
||||
fps=FPS,
|
||||
features=combine_feature_dicts(
|
||||
# Run the feature contract of the pipelines
|
||||
# This tells you how the features would look like after the pipeline steps
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=phone_to_robot_ee_pose_processor,
|
||||
initial_features=create_initial_features(action=phone.action_features),
|
||||
use_videos=True,
|
||||
),
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=robot_joints_to_ee_pose,
|
||||
initial_features=create_initial_features(observation=robot.observation_features),
|
||||
use_videos=True,
|
||||
),
|
||||
),
|
||||
features=build_dataset_features(robot, phone, use_videos=True),
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4,
|
||||
@@ -158,7 +141,7 @@ def main():
|
||||
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
|
||||
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
|
||||
|
||||
# Main record loop
|
||||
# Main record loop — pipelines applied internally by robot and phone
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
@@ -168,9 +151,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=phone_to_robot_ee_pose_processor,
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -186,9 +166,6 @@ def main():
|
||||
control_time_s=RESET_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=phone_to_robot_ee_pose_processor,
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -87,8 +87,8 @@ from lerobot.policies.rtc.action_queue import ActionQueue
|
||||
from lerobot.policies.rtc.configuration_rtc import RTCConfig
|
||||
from lerobot.policies.rtc.latency_tracker import LatencyTracker
|
||||
from lerobot.processor.factory import (
|
||||
make_default_robot_action_processor,
|
||||
make_default_robot_observation_processor,
|
||||
_make_identity_observation_pipeline as make_default_robot_observation_processor,
|
||||
_make_identity_robot_action_pipeline as make_default_robot_action_processor,
|
||||
)
|
||||
from lerobot.rl.process import ProcessSignalHandler
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
|
||||
@@ -17,30 +17,16 @@
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
|
||||
from lerobot.datasets.utils import combine_feature_dicts
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.policies.act.modeling_act import ACTPolicy
|
||||
from lerobot.policies.factory import make_pre_post_processors
|
||||
from lerobot.processor import (
|
||||
RobotAction,
|
||||
RobotObservation,
|
||||
RobotProcessorPipeline,
|
||||
make_default_teleop_action_processor,
|
||||
)
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
ForwardKinematicsJointsToEE,
|
||||
InverseKinematicsEEToJoints,
|
||||
from lerobot.robots.so_follower.pipelines import (
|
||||
make_so10x_fk_observation_pipeline,
|
||||
make_so10x_ik_action_pipeline,
|
||||
)
|
||||
from lerobot.scripts.lerobot_record import record_loop
|
||||
from lerobot.utils.control_utils import init_keyboard_listener
|
||||
from lerobot.utils.pipeline_utils import build_dataset_features
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import init_rerun
|
||||
|
||||
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
|
||||
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
|
||||
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
|
||||
|
||||
def main():
|
||||
# Create the robot configuration & robot
|
||||
@@ -64,68 +54,31 @@ def main():
|
||||
|
||||
robot = SO100Follower(robot_config)
|
||||
|
||||
# Create policy
|
||||
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
|
||||
# Attach FK/IK pipelines so the robot works in EE space
|
||||
motor_names = list(robot.bus.motors.keys())
|
||||
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(robot.bus.motors.keys()),
|
||||
)
|
||||
|
||||
# Build pipeline to convert EE action to joints action
|
||||
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=kinematics_solver,
|
||||
motor_names=list(robot.bus.motors.keys()),
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
# Build pipeline to convert joints observation to EE observation
|
||||
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
|
||||
)
|
||||
],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
# Create the dataset
|
||||
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=HF_DATASET_ID,
|
||||
fps=FPS,
|
||||
features=combine_feature_dicts(
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=robot_joints_to_ee_pose_processor,
|
||||
initial_features=create_initial_features(observation=robot.observation_features),
|
||||
use_videos=True,
|
||||
),
|
||||
# User for now should be explicit on the feature keys that were used for record
|
||||
# Alternatively, the user can pass the processor step that has the right features
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=make_default_teleop_action_processor(),
|
||||
initial_features=create_initial_features(
|
||||
action={
|
||||
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
|
||||
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
|
||||
}
|
||||
),
|
||||
use_videos=True,
|
||||
),
|
||||
features=build_dataset_features(
|
||||
robot,
|
||||
use_videos=True,
|
||||
action_features={
|
||||
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
|
||||
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
|
||||
},
|
||||
),
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4,
|
||||
)
|
||||
|
||||
# Create policy
|
||||
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
|
||||
|
||||
# Build Policy Processors
|
||||
preprocessor, postprocessor = make_pre_post_processors(
|
||||
policy_cfg=policy,
|
||||
@@ -135,7 +88,7 @@ def main():
|
||||
preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}},
|
||||
)
|
||||
|
||||
# Connect the robot and teleoperator
|
||||
# Connect the robot
|
||||
robot.connect()
|
||||
|
||||
# Initialize the keyboard listener and rerun visualization
|
||||
@@ -151,21 +104,18 @@ def main():
|
||||
for episode_idx in range(NUM_EPISODES):
|
||||
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
|
||||
|
||||
# Main record loop
|
||||
# Main record loop — pipelines applied internally by robot
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
policy=policy,
|
||||
preprocessor=preprocessor, # Pass the pre and post policy processors
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
dataset=dataset,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=make_default_teleop_action_processor(),
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose_processor,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -180,9 +130,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=make_default_teleop_action_processor(),
|
||||
robot_action_processor=robot_ee_to_joints_processor,
|
||||
robot_observation_processor=robot_joints_to_ee_pose_processor,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -17,25 +17,20 @@
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
|
||||
from lerobot.datasets.utils import combine_feature_dicts
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
EEBoundsAndSafety,
|
||||
ForwardKinematicsJointsToEE,
|
||||
InverseKinematicsEEToJoints,
|
||||
from lerobot.robots.so_follower.pipelines import (
|
||||
make_so10x_fk_observation_pipeline,
|
||||
make_so10x_ik_action_pipeline,
|
||||
)
|
||||
from lerobot.scripts.lerobot_record import record_loop
|
||||
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
|
||||
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
|
||||
from lerobot.utils.control_utils import init_keyboard_listener
|
||||
from lerobot.utils.pipeline_utils import (
|
||||
build_dataset_features,
|
||||
check_action_space_compatibility,
|
||||
check_observation_space_compatibility,
|
||||
)
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import init_rerun
|
||||
|
||||
@@ -46,6 +41,10 @@ RESET_TIME_SEC = 30
|
||||
TASK_DESCRIPTION = "My task description"
|
||||
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
|
||||
|
||||
# NOTE: Use the URDF from the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
|
||||
|
||||
def main():
|
||||
# Create the robot and teleoperator configurations
|
||||
@@ -62,77 +61,17 @@ def main():
|
||||
follower = SO100Follower(follower_config)
|
||||
leader = SO100Leader(leader_config)
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
follower_kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(follower.bus.motors.keys()),
|
||||
)
|
||||
# Attach EE-space pipelines to the objects
|
||||
motor_names = list(follower.bus.motors.keys())
|
||||
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
|
||||
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
leader_kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(leader.bus.motors.keys()),
|
||||
)
|
||||
|
||||
# Build pipeline to convert follower joints to EE observation
|
||||
follower_joints_to_ee = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=follower_kinematics_solver, motor_names=list(follower.bus.motors.keys())
|
||||
),
|
||||
],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
# Build pipeline to convert leader joints to EE action
|
||||
leader_joints_to_ee = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
# Build pipeline to convert EE action to follower joints
|
||||
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
[
|
||||
EEBoundsAndSafety(
|
||||
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
|
||||
max_ee_step_m=0.10,
|
||||
),
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=follower_kinematics_solver,
|
||||
motor_names=list(follower.bus.motors.keys()),
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
# Create the dataset
|
||||
# Dataset features are derived automatically from robot/teleop pipelines
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=HF_REPO_ID,
|
||||
fps=FPS,
|
||||
features=combine_feature_dicts(
|
||||
# Run the feature contract of the pipelines
|
||||
# This tells you how the features would look like after the pipeline steps
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=leader_joints_to_ee,
|
||||
initial_features=create_initial_features(action=leader.action_features),
|
||||
use_videos=True,
|
||||
),
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=follower_joints_to_ee,
|
||||
initial_features=create_initial_features(observation=follower.observation_features),
|
||||
use_videos=True,
|
||||
),
|
||||
),
|
||||
features=build_dataset_features(follower, leader, use_videos=True),
|
||||
robot_type=follower.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4,
|
||||
@@ -142,9 +81,13 @@ def main():
|
||||
leader.connect()
|
||||
follower.connect()
|
||||
|
||||
# Verify action/observation space alignment (warns on mismatch)
|
||||
check_action_space_compatibility(leader, follower)
|
||||
check_observation_space_compatibility(follower, leader)
|
||||
|
||||
# Initialize the keyboard listener and rerun visualization
|
||||
listener, events = init_keyboard_listener()
|
||||
init_rerun(session_name="recording_phone")
|
||||
init_rerun(session_name="recording_ee")
|
||||
|
||||
try:
|
||||
if not leader.is_connected or not follower.is_connected:
|
||||
@@ -155,7 +98,8 @@ def main():
|
||||
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
|
||||
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
|
||||
|
||||
# Main record loop
|
||||
# Pipelines applied automatically inside robot.get_observation(),
|
||||
# teleop.get_action(), and robot.send_action()
|
||||
record_loop(
|
||||
robot=follower,
|
||||
events=events,
|
||||
@@ -165,9 +109,6 @@ def main():
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=leader_joints_to_ee,
|
||||
robot_action_processor=ee_to_follower_joints,
|
||||
robot_observation_processor=follower_joints_to_ee,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
@@ -183,9 +124,6 @@ def main():
|
||||
control_time_s=RESET_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
teleop_action_processor=leader_joints_to_ee,
|
||||
robot_action_processor=ee_to_follower_joints,
|
||||
robot_observation_processor=follower_joints_to_ee,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
|
||||
@@ -14,27 +14,23 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
|
||||
from lerobot.processor.converters import (
|
||||
robot_action_observation_to_transition,
|
||||
robot_action_to_transition,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
EEBoundsAndSafety,
|
||||
ForwardKinematicsJointsToEE,
|
||||
InverseKinematicsEEToJoints,
|
||||
from lerobot.robots.so_follower.pipelines import (
|
||||
make_so10x_fk_observation_pipeline,
|
||||
make_so10x_ik_action_pipeline,
|
||||
)
|
||||
from lerobot.scripts.lerobot_teleoperate import teleop_loop
|
||||
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
|
||||
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
|
||||
from lerobot.utils.pipeline_utils import check_action_space_compatibility
|
||||
from lerobot.utils.visualization_utils import init_rerun
|
||||
|
||||
FPS = 30
|
||||
|
||||
# NOTE: Use the URDF from the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
|
||||
|
||||
def main():
|
||||
# Initialize the robot and teleoperator config
|
||||
@@ -47,47 +43,14 @@ def main():
|
||||
follower = SO100Follower(follower_config)
|
||||
leader = SO100Leader(leader_config)
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
follower_kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(follower.bus.motors.keys()),
|
||||
)
|
||||
# Attach EE-space pipelines to the objects
|
||||
motor_names = list(follower.bus.motors.keys())
|
||||
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
|
||||
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
leader_kinematics_solver = RobotKinematics(
|
||||
urdf_path="./SO101/so101_new_calib.urdf",
|
||||
target_frame_name="gripper_frame_link",
|
||||
joint_names=list(leader.bus.motors.keys()),
|
||||
)
|
||||
|
||||
# Build pipeline to convert teleop joints to EE action
|
||||
leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction](
|
||||
steps=[
|
||||
ForwardKinematicsJointsToEE(
|
||||
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
# build pipeline to convert EE action to robot joints
|
||||
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
[
|
||||
EEBoundsAndSafety(
|
||||
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
|
||||
max_ee_step_m=0.10,
|
||||
),
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=follower_kinematics_solver,
|
||||
motor_names=list(follower.bus.motors.keys()),
|
||||
initial_guess_current_joints=False,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
# Verify action space alignment (warns if leader EE ≠ follower action_features)
|
||||
check_action_space_compatibility(leader, follower)
|
||||
|
||||
# Connect to the robot and teleoperator
|
||||
follower.connect()
|
||||
@@ -97,28 +60,12 @@ def main():
|
||||
init_rerun(session_name="so100_so100_EE_teleop")
|
||||
|
||||
print("Starting teleop loop...")
|
||||
while True:
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get robot observation
|
||||
robot_obs = follower.get_observation()
|
||||
|
||||
# Get teleop observation
|
||||
leader_joints_obs = leader.get_action()
|
||||
|
||||
# teleop joints -> teleop EE action
|
||||
leader_ee_act = leader_to_ee(leader_joints_obs)
|
||||
|
||||
# teleop EE -> robot joints
|
||||
follower_joints_act = ee_to_follower_joints((leader_ee_act, robot_obs))
|
||||
|
||||
# Send action to robot
|
||||
_ = follower.send_action(follower_joints_act)
|
||||
|
||||
# Visualize
|
||||
log_rerun_data(observation=leader_ee_act, action=follower_joints_act)
|
||||
|
||||
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
|
||||
try:
|
||||
# Pipelines applied automatically inside teleop.get_action() and robot.send_action()
|
||||
teleop_loop(teleop=leader, robot=follower, fps=FPS, display_data=True)
|
||||
finally:
|
||||
follower.disconnect()
|
||||
leader.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
+1
-1
@@ -76,7 +76,7 @@ dependencies = [
|
||||
"pyserial>=3.5,<4.0",
|
||||
"wandb>=0.24.0,<0.25.0",
|
||||
|
||||
"torch==2.10.0",
|
||||
"torch>=2.2.1,<2.11.0", # TODO: Bump dependency
|
||||
"torchcodec>=0.2.1,<0.11.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bump dependency
|
||||
"torchvision>=0.21.0,<0.26.0", # TODO: Bump dependency
|
||||
|
||||
|
||||
@@ -12,14 +12,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import re
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from lerobot.configs.types import PipelineFeatureType
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation
|
||||
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from lerobot.processor import RobotAction, RobotObservation
|
||||
|
||||
|
||||
def create_initial_features(
|
||||
@@ -41,99 +41,3 @@ def create_initial_features(
|
||||
if observation:
|
||||
features[PipelineFeatureType.OBSERVATION] = observation
|
||||
return features
|
||||
|
||||
|
||||
# Helper to filter state/action keys based on regex patterns.
|
||||
def should_keep(key: str, patterns: tuple[str]) -> bool:
|
||||
if patterns is None:
|
||||
return True
|
||||
return any(re.search(pat, key) for pat in patterns)
|
||||
|
||||
|
||||
def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str:
|
||||
for prefix in prefixes_to_strip:
|
||||
if key.startswith(prefix):
|
||||
return key[len(prefix) :]
|
||||
return key
|
||||
|
||||
|
||||
# Define prefixes to strip from feature keys for clean names.
|
||||
# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms.
|
||||
PREFIXES_TO_STRIP = tuple(
|
||||
f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1])
|
||||
)
|
||||
|
||||
|
||||
def aggregate_pipeline_dataset_features(
|
||||
pipeline: DataProcessorPipeline,
|
||||
initial_features: dict[PipelineFeatureType, dict[str, Any]],
|
||||
*,
|
||||
use_videos: bool = True,
|
||||
patterns: Sequence[str] | None = None,
|
||||
) -> dict[str, dict]:
|
||||
"""
|
||||
Aggregates and filters pipeline features to create a dataset-ready features dictionary.
|
||||
|
||||
This function transforms initial features using the pipeline, categorizes them as action or observations
|
||||
(image or state), filters them based on `use_videos` and `patterns`, and finally
|
||||
formats them for use with a Hugging Face LeRobot Dataset.
|
||||
|
||||
Args:
|
||||
pipeline: The DataProcessorPipeline to apply.
|
||||
initial_features: A dictionary of raw feature specs for actions and observations.
|
||||
use_videos: If False, image features are excluded.
|
||||
patterns: A sequence of regex patterns to filter action and state features.
|
||||
Image features are not affected by this filter.
|
||||
|
||||
Returns:
|
||||
A dictionary of features formatted for a Hugging Face LeRobot Dataset.
|
||||
"""
|
||||
all_features = pipeline.transform_features(initial_features)
|
||||
|
||||
# Intermediate storage for categorized and filtered features.
|
||||
processed_features: dict[str, dict[str, Any]] = {
|
||||
ACTION: {},
|
||||
OBS_STR: {},
|
||||
}
|
||||
images_token = OBS_IMAGES.split(".")[-1]
|
||||
|
||||
# Iterate through all features transformed by the pipeline.
|
||||
for ptype, feats in all_features.items():
|
||||
if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]:
|
||||
continue
|
||||
|
||||
for key, value in feats.items():
|
||||
# 1. Categorize the feature.
|
||||
is_action = ptype == PipelineFeatureType.ACTION
|
||||
# Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3.
|
||||
# All other observations are treated as state.
|
||||
is_image = not is_action and (
|
||||
(isinstance(value, tuple) and len(value) == 3)
|
||||
or (
|
||||
key.startswith(f"{OBS_IMAGES}.")
|
||||
or key.startswith(f"{images_token}.")
|
||||
or f".{images_token}." in key
|
||||
)
|
||||
)
|
||||
|
||||
# 2. Apply filtering rules.
|
||||
if is_image and not use_videos:
|
||||
continue
|
||||
if not is_image and not should_keep(key, patterns):
|
||||
continue
|
||||
|
||||
# 3. Add the feature to the appropriate group with a clean name.
|
||||
name = strip_prefix(key, PREFIXES_TO_STRIP)
|
||||
if is_action:
|
||||
processed_features[ACTION][name] = value
|
||||
else:
|
||||
processed_features[OBS_STR][name] = value
|
||||
|
||||
# Convert the processed features into the final dataset format.
|
||||
dataset_features = {}
|
||||
if processed_features[ACTION]:
|
||||
dataset_features.update(hw_to_dataset_features(processed_features[ACTION], ACTION, use_videos))
|
||||
if processed_features[OBS_STR]:
|
||||
dataset_features.update(hw_to_dataset_features(processed_features[OBS_STR], OBS_STR, use_videos))
|
||||
|
||||
return dataset_features
|
||||
|
||||
@@ -30,12 +30,6 @@ from .core import (
|
||||
)
|
||||
from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep
|
||||
from .device_processor import DeviceProcessorStep
|
||||
from .factory import (
|
||||
make_default_processors,
|
||||
make_default_robot_action_processor,
|
||||
make_default_robot_observation_processor,
|
||||
make_default_teleop_action_processor,
|
||||
)
|
||||
from .gym_action_processor import (
|
||||
Numpy2TorchActionProcessorStep,
|
||||
Torch2NumpyActionProcessorStep,
|
||||
@@ -95,11 +89,7 @@ __all__ = [
|
||||
"ImageCropResizeProcessorStep",
|
||||
"InfoProcessorStep",
|
||||
"InterventionActionProcessorStep",
|
||||
"make_default_processors",
|
||||
"make_default_teleop_action_processor",
|
||||
"make_default_robot_action_processor",
|
||||
"make_default_robot_observation_processor",
|
||||
"MapDeltaActionToRobotActionStep",
|
||||
"MapDeltaActionToRobotActionStep",
|
||||
"MapTensorToDeltaActionDictStep",
|
||||
"NormalizerProcessorStep",
|
||||
"Numpy2TorchActionProcessorStep",
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
from .converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
robot_action_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
@@ -24,39 +25,44 @@ from .core import RobotAction, RobotObservation
|
||||
from .pipeline import IdentityProcessorStep, RobotProcessorPipeline
|
||||
|
||||
|
||||
def make_default_teleop_action_processor() -> RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
]:
|
||||
teleop_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
return teleop_action_processor
|
||||
# ── Internal identity pipeline helpers (used by Robot/Teleoperator base classes) ──────────────────
|
||||
|
||||
|
||||
def make_default_robot_action_processor() -> RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
]:
|
||||
robot_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
return robot_action_processor
|
||||
|
||||
|
||||
def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
|
||||
robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
def _make_identity_observation_pipeline() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
|
||||
"""Identity pipeline for robot observations (get_observation output pipeline)."""
|
||||
return RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
|
||||
def _make_identity_robot_action_pipeline() -> RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
]:
|
||||
"""Identity pipeline for robot action input (send_action input pipeline, takes (action, obs) tuple)."""
|
||||
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
|
||||
def _make_identity_teleop_action_pipeline() -> RobotProcessorPipeline[RobotAction, RobotAction]:
|
||||
"""Identity pipeline for teleop action output (get_action output pipeline, takes just action)."""
|
||||
return RobotProcessorPipeline[RobotAction, RobotAction](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=robot_action_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
|
||||
|
||||
def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]:
|
||||
"""Identity pipeline for teleop feedback input (send_feedback input pipeline)."""
|
||||
return RobotProcessorPipeline[dict, dict](
|
||||
steps=[IdentityProcessorStep()],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
return robot_observation_processor
|
||||
|
||||
|
||||
def make_default_processors():
|
||||
teleop_action_processor = make_default_teleop_action_processor()
|
||||
robot_action_processor = make_default_robot_action_processor()
|
||||
robot_observation_processor = make_default_robot_observation_processor()
|
||||
return (teleop_action_processor, robot_action_processor, robot_observation_processor)
|
||||
|
||||
@@ -19,15 +19,17 @@ from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import torch
|
||||
from torch import Tensor
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.utils.constants import ACTION
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
|
||||
from .converters import from_tensor_to_numpy, to_tensor
|
||||
from .core import EnvTransition, PolicyAction, TransitionKey
|
||||
from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, RobotObservation
|
||||
|
||||
@@ -43,12 +43,9 @@ from lerobot.utils.import_utils import _transformers_available
|
||||
from .core import EnvTransition, RobotObservation, TransitionKey
|
||||
from .pipeline import ActionProcessorStep, ObservationProcessorStep, ProcessorStepRegistry
|
||||
|
||||
# Conditional import for type checking and lazy loading
|
||||
if TYPE_CHECKING or _transformers_available:
|
||||
# Type-checking only import — do NOT import transformers at module level (it loads TF which blocks)
|
||||
if TYPE_CHECKING:
|
||||
from transformers import AutoProcessor, AutoTokenizer
|
||||
else:
|
||||
AutoProcessor = None
|
||||
AutoTokenizer = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -106,8 +103,7 @@ class TokenizerProcessorStep(ObservationProcessorStep):
|
||||
# Use provided tokenizer object directly
|
||||
self.input_tokenizer = self.tokenizer
|
||||
elif self.tokenizer_name is not None:
|
||||
if AutoTokenizer is None:
|
||||
raise ImportError("AutoTokenizer is not available")
|
||||
from transformers import AutoTokenizer # lazy import to avoid TF deadlock at module load
|
||||
self.input_tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name)
|
||||
else:
|
||||
raise ValueError(
|
||||
@@ -370,12 +366,12 @@ class ActionTokenizerProcessorStep(ActionProcessorStep):
|
||||
"Please install it with `pip install 'lerobot[transformers-dep]'` to use ActionTokenizerProcessorStep."
|
||||
)
|
||||
|
||||
from transformers import AutoProcessor, AutoTokenizer # lazy import to avoid TF deadlock at module load
|
||||
|
||||
if self.action_tokenizer_input_object is not None:
|
||||
self.action_tokenizer = self.action_tokenizer_input_object
|
||||
|
||||
elif self.action_tokenizer_name is not None:
|
||||
if AutoProcessor is None:
|
||||
raise ImportError("AutoProcessor is not available")
|
||||
self.action_tokenizer = AutoProcessor.from_pretrained(
|
||||
self.action_tokenizer_name, trust_remote_code=self.trust_remote_code
|
||||
)
|
||||
|
||||
@@ -102,11 +102,11 @@ class BiOpenArmFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -136,7 +136,7 @@ class BiOpenArmFollower(Robot):
|
||||
)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
obs_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
@@ -150,7 +150,7 @@ class BiOpenArmFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(
|
||||
def _send_action(
|
||||
self,
|
||||
action: RobotAction,
|
||||
custom_kp: dict[str, float] | None = None,
|
||||
|
||||
@@ -86,11 +86,11 @@ class BiSOFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -119,7 +119,7 @@ class BiSOFollower(Robot):
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
obs_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
@@ -133,7 +133,7 @@ class BiSOFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
# Remove "left_" prefix
|
||||
left_action = {
|
||||
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
|
||||
|
||||
@@ -147,7 +147,7 @@ class EarthRoverMiniPlus(Robot):
|
||||
pass
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
"""Define the observation space for dataset recording.
|
||||
|
||||
Returns:
|
||||
@@ -184,7 +184,7 @@ class EarthRoverMiniPlus(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
"""Define the action space.
|
||||
|
||||
Returns:
|
||||
@@ -198,7 +198,7 @@ class EarthRoverMiniPlus(Robot):
|
||||
}
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
"""Get current robot observation from SDK.
|
||||
|
||||
Returns:
|
||||
@@ -255,7 +255,7 @@ class EarthRoverMiniPlus(Robot):
|
||||
return observation
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Send action to robot via SDK.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -71,11 +71,11 @@ class HopeJrArm(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -128,7 +128,7 @@ class HopeJrArm(Robot):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position", self.other_motors)
|
||||
@@ -147,7 +147,7 @@ class HopeJrArm(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
|
||||
@@ -107,11 +107,11 @@ class HopeJrHand(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -158,7 +158,7 @@ class HopeJrHand(Robot):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
obs_dict = {}
|
||||
|
||||
# Read hand position
|
||||
@@ -178,7 +178,7 @@ class HopeJrHand(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return action
|
||||
|
||||
@@ -73,11 +73,11 @@ class KochFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -182,7 +182,7 @@ class KochFollower(Robot):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
@@ -200,7 +200,7 @@ class KochFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
|
||||
@@ -98,11 +98,11 @@ class LeKiwi(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._state_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._state_ft
|
||||
|
||||
@property
|
||||
@@ -338,7 +338,7 @@ class LeKiwi(Robot):
|
||||
} # m/s and deg/s
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
# Read actuators position for arm and vel for base
|
||||
start = time.perf_counter()
|
||||
arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
|
||||
@@ -367,7 +367,7 @@ class LeKiwi(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Command lekiwi to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
|
||||
@@ -98,11 +98,11 @@ class LeKiwiClient(Robot):
|
||||
return {name: (cfg.height, cfg.width, 3) for name, cfg in self.config.cameras.items()}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._state_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._state_ft
|
||||
|
||||
@property
|
||||
@@ -250,7 +250,7 @@ class LeKiwiClient(Robot):
|
||||
return new_frames, new_state
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
"""
|
||||
Capture observations from the remote robot: current follower arm positions,
|
||||
present wheel speeds (converted to body-frame velocities: x, y, theta),
|
||||
@@ -304,7 +304,7 @@ class LeKiwiClient(Robot):
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ
|
||||
|
||||
Args:
|
||||
|
||||
@@ -73,11 +73,11 @@ class OmxFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -165,7 +165,7 @@ class OmxFollower(Robot):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
@@ -183,7 +183,7 @@ class OmxFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
|
||||
@@ -105,12 +105,12 @@ class OpenArmFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
"""Combined observation features from motors and cameras."""
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
"""Action features."""
|
||||
return self._motors_ft
|
||||
|
||||
@@ -219,7 +219,7 @@ class OpenArmFollower(Robot):
|
||||
)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
"""
|
||||
Get current observation from robot including position, velocity, and torque.
|
||||
|
||||
@@ -251,7 +251,7 @@ class OpenArmFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(
|
||||
def _send_action(
|
||||
self,
|
||||
action: RobotAction,
|
||||
custom_kp: dict[str, float] | None = None,
|
||||
|
||||
@@ -95,11 +95,11 @@ class Reachy2Robot(Robot):
|
||||
self.joints_dict: dict[str, str] = self._generate_joints_dict()
|
||||
|
||||
@property
|
||||
def observation_features(self) -> dict[str, Any]:
|
||||
def raw_observation_features(self) -> dict[str, Any]:
|
||||
return {**self.motors_features, **self.camera_features}
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self.motors_features
|
||||
|
||||
@property
|
||||
@@ -170,7 +170,7 @@ class Reachy2Robot(Robot):
|
||||
else:
|
||||
return {}
|
||||
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
obs_dict: RobotObservation = {}
|
||||
|
||||
# Read Reachy 2 state
|
||||
@@ -184,7 +184,7 @@ class Reachy2Robot(Robot):
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
if self.reachy is not None:
|
||||
if not self.is_connected:
|
||||
raise ConnectionError()
|
||||
|
||||
+158
-34
@@ -18,8 +18,11 @@ from pathlib import Path
|
||||
|
||||
import draccus
|
||||
|
||||
from lerobot.configs.types import PipelineFeatureType
|
||||
from lerobot.motors import MotorCalibration
|
||||
from lerobot.processor import RobotAction, RobotObservation
|
||||
from lerobot.processor.core import RobotAction, RobotObservation
|
||||
from lerobot.processor.factory import _make_identity_observation_pipeline, _make_identity_robot_action_pipeline
|
||||
from lerobot.processor.pipeline import RobotProcessorPipeline
|
||||
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, ROBOTS
|
||||
|
||||
from .config import RobotConfig
|
||||
@@ -34,6 +37,10 @@ class Robot(abc.ABC):
|
||||
This class provides a standardized interface for interacting with physical robots.
|
||||
Subclasses must implement all abstract methods and properties to be usable.
|
||||
|
||||
Pipelines are first-class citizens: every robot carries an optional output pipeline
|
||||
(applied in get_observation()) and an optional input pipeline (applied in send_action()).
|
||||
Both default to identity (no-op), so existing robots work without any changes.
|
||||
|
||||
Attributes:
|
||||
config_class (RobotConfig): The expected configuration class for this robot.
|
||||
name (str): The unique robot name used to identify this robot type.
|
||||
@@ -55,6 +62,12 @@ class Robot(abc.ABC):
|
||||
if self.calibration_fpath.is_file():
|
||||
self._load_calibration()
|
||||
|
||||
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
|
||||
self._output_pipeline: RobotProcessorPipeline = _make_identity_observation_pipeline()
|
||||
self._input_pipeline: RobotProcessorPipeline = _make_identity_robot_action_pipeline()
|
||||
# Cache of most recent raw observation; used by input_pipeline for IK initial guess
|
||||
self._last_raw_obs: RobotObservation = {}
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.id} {self.__class__.__name__}"
|
||||
|
||||
@@ -84,40 +97,117 @@ class Robot(abc.ABC):
|
||||
except Exception: # nosec B110
|
||||
pass
|
||||
|
||||
# TODO(aliberts): create a proper Feature class for this that links with datasets
|
||||
# ── Pipeline interface ────────────────────────────────────────────────────
|
||||
|
||||
def output_pipeline(self) -> RobotProcessorPipeline:
|
||||
"""
|
||||
Pipeline applied inside get_observation() to transform raw hardware observations.
|
||||
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
|
||||
|
||||
Example: set a forward-kinematics pipeline to convert joint positions to EE pose.
|
||||
"""
|
||||
return self._output_pipeline
|
||||
|
||||
def input_pipeline(self) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
|
||||
"""
|
||||
Pipeline applied inside send_action() to transform incoming actions before hardware write.
|
||||
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
|
||||
|
||||
The pipeline receives a (action, last_raw_obs) tuple so IK solvers can use the
|
||||
current joint configuration as an initial guess.
|
||||
|
||||
Example: set an inverse-kinematics pipeline to convert EE commands to joint positions.
|
||||
"""
|
||||
return self._input_pipeline
|
||||
|
||||
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
|
||||
"""Set the observation output pipeline (applied in get_observation())."""
|
||||
self._output_pipeline = pipeline
|
||||
|
||||
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
|
||||
"""Set the action input pipeline (applied in send_action())."""
|
||||
self._input_pipeline = pipeline
|
||||
|
||||
# ── Feature properties ────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def observation_features(self) -> dict:
|
||||
"""
|
||||
A dictionary describing the structure and types of the observations produced by the robot.
|
||||
Its structure (keys) should match the structure of what is returned by :pymeth:`get_observation`.
|
||||
Values for the dict should either be:
|
||||
- The type of the value if it's a simple value, e.g. `float` for single proprioceptive value (a joint's position/velocity)
|
||||
- A tuple representing the shape if it's an array-type value, e.g. `(height, width, channel)` for images
|
||||
Pipeline-transformed observation features.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot is connected or not.
|
||||
Applies output_pipeline().transform_features() to raw_observation_features so the
|
||||
returned dict matches what get_observation() actually returns to callers.
|
||||
|
||||
Use raw_observation_features to inspect hardware-level feature shapes.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot
|
||||
is connected or not.
|
||||
"""
|
||||
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
|
||||
|
||||
initial = create_initial_features(observation=self.raw_observation_features)
|
||||
transformed = self.output_pipeline().transform_features(initial)
|
||||
return transformed.get(PipelineFeatureType.OBSERVATION, {})
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def raw_observation_features(self) -> dict:
|
||||
"""
|
||||
Hardware-level observation features (before any pipeline transformation).
|
||||
|
||||
A dictionary describing the structure and types of the observations produced
|
||||
directly by the robot hardware. Its structure (keys) should match the structure
|
||||
of what is returned by :pymeth:`_get_observation`. Values should be:
|
||||
- The type if it's a simple value, e.g. ``float`` for joint position
|
||||
- A tuple representing the shape for array values, e.g. ``(H, W, C)`` for images
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot
|
||||
is connected or not.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
"""
|
||||
A dictionary describing the structure and types of the actions expected by the robot. Its structure
|
||||
(keys) should match the structure of what is passed to :pymeth:`send_action`. Values for the dict
|
||||
should be the type of the value if it's a simple value, e.g. `float` for single proprioceptive value
|
||||
(a joint's goal position/velocity)
|
||||
Hardware-level action features (before any pipeline transformation).
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot is connected or not.
|
||||
A dictionary describing the structure and types of the actions accepted directly
|
||||
by the robot hardware (i.e. what :pymeth:`_send_action` receives). Its structure
|
||||
(keys) should match the structure of what is expected by :pymeth:`_send_action`.
|
||||
Values should be the type of the value if it's a simple value, e.g. ``float`` for
|
||||
single proprioceptive value (a joint's goal position/velocity).
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot
|
||||
is connected or not.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
"""
|
||||
Pipeline-transformed action features.
|
||||
|
||||
Applies input_pipeline().transform_features() to raw_action_features so the
|
||||
returned dict reflects what the input pipeline outputs to hardware.
|
||||
|
||||
Use raw_action_features to inspect hardware-level action feature shapes.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot
|
||||
is connected or not.
|
||||
"""
|
||||
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
|
||||
|
||||
initial = create_initial_features(action=self.raw_action_features)
|
||||
transformed = self.input_pipeline().transform_features(initial)
|
||||
return transformed.get(PipelineFeatureType.ACTION, {})
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def is_connected(self) -> bool:
|
||||
"""
|
||||
Whether the robot is currently connected or not. If `False`, calling :pymeth:`get_observation` or
|
||||
:pymeth:`send_action` should raise an error.
|
||||
Whether the robot is currently connected or not. If ``False``, calling
|
||||
:pymeth:`get_observation` or :pymeth:`send_action` should raise an error.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -135,7 +225,7 @@ class Robot(abc.ABC):
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def is_calibrated(self) -> bool:
|
||||
"""Whether the robot is currently calibrated or not. Should be always `True` if not applicable"""
|
||||
"""Whether the robot is currently calibrated or not. Should be always ``True`` if not applicable"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -153,7 +243,7 @@ class Robot(abc.ABC):
|
||||
Helper to load calibration data from the specified file.
|
||||
|
||||
Args:
|
||||
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
|
||||
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
|
||||
"""
|
||||
fpath = self.calibration_fpath if fpath is None else fpath
|
||||
with open(fpath) as f, draccus.config_type("json"):
|
||||
@@ -164,7 +254,7 @@ class Robot(abc.ABC):
|
||||
Helper to save calibration data to the specified file.
|
||||
|
||||
Args:
|
||||
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
|
||||
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
|
||||
"""
|
||||
fpath = self.calibration_fpath if fpath is None else fpath
|
||||
with open(fpath, "w") as f, draccus.config_type("json"):
|
||||
@@ -178,30 +268,64 @@ class Robot(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
# ── Template methods (concrete, call pipeline internally) ─────────────────
|
||||
|
||||
def get_observation(self) -> RobotObservation:
|
||||
"""
|
||||
Retrieve the current observation from the robot.
|
||||
Retrieve the current observation from the robot and apply the output pipeline.
|
||||
|
||||
Calls :pymeth:`_get_observation` to get raw hardware data, caches it for use as
|
||||
IK initial guess in :pymeth:`send_action`, then applies :pymeth:`output_pipeline`.
|
||||
|
||||
Returns:
|
||||
RobotObservation: A flat dictionary representing the robot's current sensory state. Its structure
|
||||
should match :pymeth:`observation_features`.
|
||||
RobotObservation: Pipeline-transformed observation. With the default identity
|
||||
pipeline this equals the raw observation from :pymeth:`_get_observation`.
|
||||
"""
|
||||
|
||||
pass
|
||||
raw = self._get_observation()
|
||||
self._last_raw_obs = raw
|
||||
return self.output_pipeline()(raw)
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
"""
|
||||
Send an action command to the robot.
|
||||
|
||||
Args:
|
||||
action (RobotAction): Dictionary representing the desired action. Its structure should match
|
||||
:pymeth:`action_features`.
|
||||
Retrieve the raw observation directly from robot hardware.
|
||||
|
||||
Returns:
|
||||
RobotAction: The action actually sent to the motors potentially clipped or modified, e.g. by
|
||||
safety limits on velocity.
|
||||
RobotObservation: A flat dictionary representing the robot's current sensory
|
||||
state. Its structure should match :pymeth:`raw_observation_features`.
|
||||
"""
|
||||
pass
|
||||
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""
|
||||
Apply the input pipeline and send the resulting action to robot hardware.
|
||||
|
||||
The input pipeline receives ``(action, last_raw_obs)`` so IK solvers can use the
|
||||
cached joint configuration as an initial guess. With the default identity pipeline,
|
||||
the action is forwarded unchanged.
|
||||
|
||||
Args:
|
||||
action (RobotAction): Dictionary representing the desired action. Its structure
|
||||
should match :pymeth:`action_features`.
|
||||
|
||||
Returns:
|
||||
RobotAction: The action actually sent to the motors, potentially clipped or
|
||||
modified by the pipeline or hardware safety limits.
|
||||
"""
|
||||
transformed = self.input_pipeline()((action, self._last_raw_obs))
|
||||
return self._send_action(transformed)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""
|
||||
Send an action command directly to robot hardware.
|
||||
|
||||
Args:
|
||||
action (RobotAction): Dictionary of motor-level commands. Its structure should
|
||||
match what the hardware expects (typically motor positions/velocities).
|
||||
|
||||
Returns:
|
||||
RobotAction: The action actually sent, potentially clipped by safety limits.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .ee_space import make_so10x_fk_observation_pipeline, make_so10x_ik_action_pipeline
|
||||
|
||||
__all__ = ["make_so10x_fk_observation_pipeline", "make_so10x_ik_action_pipeline"]
|
||||
@@ -0,0 +1,147 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
End-effector space pipelines for SO-100/101 follower robots.
|
||||
|
||||
These factory functions return ready-to-use pipelines that convert between joint space
|
||||
and Cartesian end-effector space. Attach them to a robot with ``set_output_pipeline`` /
|
||||
``set_input_pipeline`` to enable EE-space recording and teleoperation.
|
||||
|
||||
Example::
|
||||
|
||||
from lerobot.robots.so_follower.pipelines import (
|
||||
make_so10x_fk_observation_pipeline,
|
||||
make_so10x_ik_action_pipeline,
|
||||
)
|
||||
|
||||
motor_names = list(follower.bus.motors.keys())
|
||||
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
|
||||
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
|
||||
"""
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import (
|
||||
EEBoundsAndSafety,
|
||||
ForwardKinematicsJointsToEE,
|
||||
InverseKinematicsEEToJoints,
|
||||
)
|
||||
|
||||
_DEFAULT_EE_BOUNDS = {"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}
|
||||
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
|
||||
|
||||
|
||||
def make_so10x_fk_observation_pipeline(
|
||||
urdf_path: str,
|
||||
motor_names: list[str],
|
||||
*,
|
||||
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
|
||||
) -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
|
||||
"""
|
||||
Create a forward-kinematics observation pipeline for SO-100/101 follower robots.
|
||||
|
||||
Converts raw joint positions (observation) into end-effector pose (position + orientation).
|
||||
Attach this to a follower robot via ``set_output_pipeline`` so that ``get_observation()``
|
||||
returns EE coordinates instead of raw joint angles.
|
||||
|
||||
Args:
|
||||
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
|
||||
motor_names: Ordered list of motor names matching the URDF joint names.
|
||||
target_frame_name: Name of the end-effector frame in the URDF.
|
||||
|
||||
Returns:
|
||||
A RobotProcessorPipeline that maps joint observations to EE observations.
|
||||
|
||||
Example::
|
||||
|
||||
follower.set_output_pipeline(
|
||||
make_so10x_fk_observation_pipeline("./so101.urdf", motor_names)
|
||||
)
|
||||
obs = follower.get_observation() # now contains ee.x, ee.y, ee.z, ...
|
||||
"""
|
||||
kinematics = RobotKinematics(
|
||||
urdf_path=urdf_path,
|
||||
target_frame_name=target_frame_name,
|
||||
joint_names=motor_names,
|
||||
)
|
||||
return RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
|
||||
|
||||
def make_so10x_ik_action_pipeline(
|
||||
urdf_path: str,
|
||||
motor_names: list[str],
|
||||
*,
|
||||
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
|
||||
end_effector_bounds: dict | None = None,
|
||||
max_ee_step_m: float = 0.10,
|
||||
) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
|
||||
"""
|
||||
Create an inverse-kinematics action pipeline for SO-100/101 follower robots.
|
||||
|
||||
Converts incoming end-effector pose commands into joint positions, applying safety
|
||||
bounds and step-size limits before solving IK. The current joint positions are used
|
||||
as the IK initial guess (taken from the cached ``_last_raw_obs``).
|
||||
|
||||
Attach this to a follower robot via ``set_input_pipeline`` so that ``send_action()``
|
||||
receives EE commands and translates them to motor positions before the hardware write.
|
||||
|
||||
Args:
|
||||
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
|
||||
motor_names: Ordered list of motor names matching the URDF joint names.
|
||||
target_frame_name: Name of the end-effector frame in the URDF.
|
||||
end_effector_bounds: Dict with ``"min"`` and ``"max"`` lists (3D position bounds in metres).
|
||||
Defaults to ``{"min": [-1, -1, -1], "max": [1, 1, 1]}``.
|
||||
max_ee_step_m: Maximum allowed EE position change per step in metres.
|
||||
|
||||
Returns:
|
||||
A RobotProcessorPipeline that maps (EE action, raw obs) to joint action.
|
||||
|
||||
Example::
|
||||
|
||||
follower.set_input_pipeline(
|
||||
make_so10x_ik_action_pipeline("./so101.urdf", motor_names)
|
||||
)
|
||||
# send_action() now accepts ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
|
||||
"""
|
||||
kinematics = RobotKinematics(
|
||||
urdf_path=urdf_path,
|
||||
target_frame_name=target_frame_name,
|
||||
joint_names=motor_names,
|
||||
)
|
||||
bounds = end_effector_bounds or _DEFAULT_EE_BOUNDS
|
||||
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[
|
||||
EEBoundsAndSafety(end_effector_bounds=bounds, max_ee_step_m=max_ee_step_m),
|
||||
InverseKinematicsEEToJoints(
|
||||
kinematics=kinematics,
|
||||
motor_names=motor_names,
|
||||
initial_guess_current_joints=True,
|
||||
),
|
||||
],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
@@ -74,11 +74,11 @@ class SOFollower(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
@@ -176,7 +176,7 @@ class SOFollower(Robot):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
@@ -194,7 +194,7 @@ class SOFollower(Robot):
|
||||
return obs_dict
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
|
||||
@@ -170,7 +170,7 @@ class UnitreeG1(Robot):
|
||||
time.sleep(sleep_time)
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
|
||||
|
||||
def calibrate(self) -> None: # robot is already calibrated
|
||||
@@ -273,7 +273,7 @@ class UnitreeG1(Robot):
|
||||
for cam in self._cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
lowstate = self._lowstate
|
||||
if lowstate is None:
|
||||
return {}
|
||||
@@ -351,10 +351,10 @@ class UnitreeG1(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
for motor in G1_29_JointIndex:
|
||||
key = f"{motor.name}.q"
|
||||
if key in action:
|
||||
@@ -421,7 +421,7 @@ class UnitreeG1(Robot):
|
||||
num_steps = int(total_time / control_dt)
|
||||
|
||||
# get current state
|
||||
obs = self.get_observation()
|
||||
obs = self._get_observation()
|
||||
|
||||
# record current positions
|
||||
init_dof_pos = np.zeros(29, dtype=np.float32)
|
||||
@@ -439,7 +439,7 @@ class UnitreeG1(Robot):
|
||||
interp_pos = init_dof_pos[motor.value] * (1 - alpha) + target_pos * alpha
|
||||
action_dict[f"{motor.name}.q"] = float(interp_pos)
|
||||
|
||||
self.send_action(action_dict)
|
||||
self._send_action(action_dict)
|
||||
|
||||
# Maintain constant control rate
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
@@ -74,6 +74,8 @@ from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
|
||||
from lerobot.cameras import ( # noqa: F401
|
||||
CameraConfig, # noqa: F401
|
||||
)
|
||||
@@ -85,19 +87,16 @@ from lerobot.configs import parser
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.datasets.image_writer import safe_stop_image_writer
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
|
||||
from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts
|
||||
from lerobot.datasets.utils import build_dataset_frame
|
||||
from lerobot.datasets.video_utils import VideoEncodingManager
|
||||
from lerobot.policies.factory import make_policy, make_pre_post_processors
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy
|
||||
from lerobot.policies.rtc import ActionInterpolator
|
||||
from lerobot.policies.utils import make_robot_action
|
||||
from lerobot.processor import (
|
||||
PolicyAction,
|
||||
PolicyProcessorPipeline,
|
||||
RobotAction,
|
||||
RobotObservation,
|
||||
RobotProcessorPipeline,
|
||||
make_default_processors,
|
||||
)
|
||||
from lerobot.processor.rename_processor import rename_stats
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
@@ -140,6 +139,11 @@ from lerobot.utils.control_utils import (
|
||||
sanity_check_dataset_robot_compatibility,
|
||||
)
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.pipeline_utils import (
|
||||
build_dataset_features,
|
||||
check_action_space_compatibility,
|
||||
check_observation_space_compatibility,
|
||||
)
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
from lerobot.utils.utils import (
|
||||
get_safe_torch_device,
|
||||
@@ -155,7 +159,7 @@ class DatasetRecordConfig:
|
||||
repo_id: str
|
||||
# A short but accurate description of the task performed during the recording (e.g. "Pick the Lego block and drop it in the box on the right.")
|
||||
single_task: str
|
||||
# Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id.
|
||||
# Root directory where the dataset will be stored (e.g. 'dataset/path').
|
||||
root: str | Path | None = None
|
||||
# Limit the frames per second.
|
||||
fps: int = 30
|
||||
@@ -226,6 +230,9 @@ class RecordConfig:
|
||||
play_sounds: bool = True
|
||||
# Resume recording on an existing dataset.
|
||||
resume: bool = False
|
||||
# Action interpolation multiplier for smoother policy control (1=off, 2=2x, 3=3x)
|
||||
# Only applies when using a policy (not teleop)
|
||||
interpolation_multiplier: int = 1
|
||||
|
||||
def __post_init__(self):
|
||||
# HACK: We parse again the cli args here to get the pretrained path if there was one.
|
||||
@@ -249,28 +256,23 @@ class RecordConfig:
|
||||
""" --------------- record_loop() data flow --------------------------
|
||||
[ Robot ]
|
||||
V
|
||||
[ robot.get_observation() ] ---> raw_obs
|
||||
V
|
||||
[ robot_observation_processor ] ---> processed_obs
|
||||
[ robot.get_observation() ] → applies output_pipeline internally → obs
|
||||
V
|
||||
.-----( ACTION LOGIC )------------------.
|
||||
V V
|
||||
[ From Teleoperator ] [ From Policy ]
|
||||
| |
|
||||
| [teleop.get_action] -> raw_action | [predict_action]
|
||||
| | | |
|
||||
| V | V
|
||||
| [teleop_action_processor] | |
|
||||
| | | |
|
||||
'---> processed_teleop_action '---> processed_policy_action
|
||||
| teleop.get_action() | predict_action(obs)
|
||||
| (output_pipeline applied internally) | |
|
||||
| | | V
|
||||
'----> action '---> policy_action_dict
|
||||
| |
|
||||
'-------------------------.-------------'
|
||||
V
|
||||
[ robot_action_processor ] --> robot_action_to_send
|
||||
[ robot.send_action(action) ]
|
||||
(input_pipeline applied internally)
|
||||
V
|
||||
[ robot.send_action() ] -- (Robot Executes)
|
||||
V
|
||||
( Save to Dataset )
|
||||
( Save action + obs to Dataset )
|
||||
V
|
||||
( Rerun Log / Loop Wait )
|
||||
"""
|
||||
@@ -281,15 +283,6 @@ def record_loop(
|
||||
robot: Robot,
|
||||
events: dict,
|
||||
fps: int,
|
||||
teleop_action_processor: RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
], # runs after teleop
|
||||
robot_action_processor: RobotProcessorPipeline[
|
||||
tuple[RobotAction, RobotObservation], RobotAction
|
||||
], # runs before robot
|
||||
robot_observation_processor: RobotProcessorPipeline[
|
||||
RobotObservation, RobotObservation
|
||||
], # runs after robot
|
||||
dataset: LeRobotDataset | None = None,
|
||||
teleop: Teleoperator | list[Teleoperator] | None = None,
|
||||
policy: PreTrainedPolicy | None = None,
|
||||
@@ -298,8 +291,30 @@ def record_loop(
|
||||
control_time_s: int | None = None,
|
||||
single_task: str | None = None,
|
||||
display_data: bool = False,
|
||||
interpolator: ActionInterpolator | None = None,
|
||||
display_compressed_images: bool = False,
|
||||
):
|
||||
"""
|
||||
Core recording loop. Robot and teleoperator pipelines are applied internally —
|
||||
no explicit processor arguments are needed.
|
||||
|
||||
Args:
|
||||
robot: The robot instance. Its output_pipeline() transforms observations and
|
||||
its input_pipeline() transforms actions before hardware write.
|
||||
events: Control events dict (exit_early, stop_recording, rerecord_episode).
|
||||
fps: Target control loop frequency.
|
||||
dataset: If provided, frames are written here each step.
|
||||
teleop: Teleoperator or list of teleoperators. Its output_pipeline() transforms
|
||||
actions (e.g., joint → EE) before they are sent to the robot.
|
||||
policy: Optional pre-trained policy for closed-loop control.
|
||||
preprocessor: Policy input pre-processor.
|
||||
postprocessor: Policy output post-processor.
|
||||
control_time_s: Episode duration in seconds.
|
||||
single_task: Task description string saved with each frame.
|
||||
display_data: If True, log observations and actions to Rerun.
|
||||
interpolator: Optional action interpolator for smoother policy control.
|
||||
display_compressed_images: If True, compress images before Rerun display.
|
||||
"""
|
||||
if dataset is not None and dataset.fps != fps:
|
||||
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
|
||||
|
||||
@@ -334,6 +349,16 @@ def record_loop(
|
||||
preprocessor.reset()
|
||||
postprocessor.reset()
|
||||
|
||||
# Reset interpolator if provided
|
||||
if interpolator is not None:
|
||||
interpolator.reset()
|
||||
|
||||
# Calculate control interval based on interpolation
|
||||
use_interpolation = interpolator is not None and interpolator.enabled and policy is not None
|
||||
control_interval = interpolator.get_control_interval(fps) if interpolator else 1 / fps
|
||||
# Pre-compute once — action features don't change during a recording episode
|
||||
action_keys = sorted(robot.action_features) if use_interpolation else []
|
||||
|
||||
no_action_count = 0
|
||||
timestamp = 0
|
||||
start_episode_t = time.perf_counter()
|
||||
@@ -344,43 +369,75 @@ def record_loop(
|
||||
events["exit_early"] = False
|
||||
break
|
||||
|
||||
# Get robot observation
|
||||
# Get robot observation (output_pipeline applied internally)
|
||||
obs = robot.get_observation()
|
||||
|
||||
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
|
||||
obs_processed = robot_observation_processor(obs)
|
||||
|
||||
if policy is not None or dataset is not None:
|
||||
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
|
||||
observation_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
|
||||
|
||||
# Get action from either policy or teleop
|
||||
if policy is not None and preprocessor is not None and postprocessor is not None:
|
||||
action_values = predict_action(
|
||||
observation=observation_frame,
|
||||
policy=policy,
|
||||
device=get_safe_torch_device(policy.config.device),
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
use_amp=policy.config.use_amp,
|
||||
task=single_task,
|
||||
robot_type=robot.robot_type,
|
||||
)
|
||||
# With interpolation: only call policy when interpolator needs new action
|
||||
if use_interpolation:
|
||||
if interpolator.needs_new_action():
|
||||
action_values = predict_action(
|
||||
observation=observation_frame,
|
||||
policy=policy,
|
||||
device=get_safe_torch_device(policy.config.device),
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
use_amp=policy.config.use_amp,
|
||||
task=single_task,
|
||||
robot_type=robot.robot_type,
|
||||
)
|
||||
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
|
||||
# send_action applies input_pipeline (e.g. IK) internally;
|
||||
# capture the actually-sent joint action for interpolation
|
||||
sent_joint_action = robot.send_action(act_processed_policy)
|
||||
|
||||
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
|
||||
# Build interpolation tensor from the motor-level joint action
|
||||
action_tensor = torch.tensor([sent_joint_action[k] for k in action_keys])
|
||||
interpolator.add(action_tensor)
|
||||
|
||||
# Get interpolated action (in joint/motor space)
|
||||
interp_action = interpolator.get()
|
||||
if interp_action is not None:
|
||||
action_values = {k: interp_action[i].item() for i, k in enumerate(action_keys)}
|
||||
# Interpolated values are already in joint space; bypass IK pipeline
|
||||
robot._send_action(action_values)
|
||||
else:
|
||||
# No action available yet, skip this iteration
|
||||
continue
|
||||
else:
|
||||
action_values = predict_action(
|
||||
observation=observation_frame,
|
||||
policy=policy,
|
||||
device=get_safe_torch_device(policy.config.device),
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
use_amp=policy.config.use_amp,
|
||||
task=single_task,
|
||||
robot_type=robot.robot_type,
|
||||
)
|
||||
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
|
||||
# send_action applies input_pipeline (e.g. IK) internally
|
||||
robot.send_action(act_processed_policy)
|
||||
action_values = act_processed_policy
|
||||
|
||||
elif policy is None and isinstance(teleop, Teleoperator):
|
||||
act = teleop.get_action()
|
||||
|
||||
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
|
||||
act_processed_teleop = teleop_action_processor((act, obs))
|
||||
# get_action applies output_pipeline (e.g. FK) internally
|
||||
action_values = teleop.get_action()
|
||||
# send_action applies input_pipeline (e.g. IK) internally
|
||||
robot.send_action(action_values)
|
||||
|
||||
elif policy is None and isinstance(teleop, list):
|
||||
arm_action = teleop_arm.get_action()
|
||||
# LeKiwi multi-teleop path
|
||||
arm_action = teleop_arm.get_action() # output_pipeline applied internally
|
||||
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
|
||||
keyboard_action = teleop_keyboard.get_action()
|
||||
base_action = robot._from_keyboard_to_base_action(keyboard_action)
|
||||
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
act_processed_teleop = teleop_action_processor((act, obs))
|
||||
action_values = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
robot.send_action(action_values) # input_pipeline applied internally
|
||||
else:
|
||||
no_action_count += 1
|
||||
if no_action_count == 1 or no_action_count % 10 == 0:
|
||||
@@ -391,20 +448,6 @@ def record_loop(
|
||||
)
|
||||
continue
|
||||
|
||||
# Applies a pipeline to the action, default is IdentityProcessor
|
||||
if policy is not None and act_processed_policy is not None:
|
||||
action_values = act_processed_policy
|
||||
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
|
||||
else:
|
||||
action_values = act_processed_teleop
|
||||
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
|
||||
|
||||
# Send action to robot
|
||||
# Action can eventually be clipped using `max_relative_target`,
|
||||
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
|
||||
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
|
||||
_sent_action = robot.send_action(robot_action_to_send)
|
||||
|
||||
# Write to dataset
|
||||
if dataset is not None:
|
||||
action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
|
||||
@@ -413,12 +456,12 @@ def record_loop(
|
||||
|
||||
if display_data:
|
||||
log_rerun_data(
|
||||
observation=obs_processed, action=action_values, compress_images=display_compressed_images
|
||||
observation=obs, action=action_values, compress_images=display_compressed_images
|
||||
)
|
||||
|
||||
dt_s = time.perf_counter() - start_loop_t
|
||||
|
||||
sleep_time_s: float = 1 / fps - dt_s
|
||||
sleep_time_s: float = control_interval - dt_s
|
||||
if sleep_time_s < 0:
|
||||
logging.warning(
|
||||
f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
|
||||
@@ -444,22 +487,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
|
||||
|
||||
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
|
||||
|
||||
dataset_features = combine_feature_dicts(
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=teleop_action_processor,
|
||||
initial_features=create_initial_features(
|
||||
action=robot.action_features
|
||||
), # TODO(steven, pepijn): in future this should be come from teleop or policy
|
||||
use_videos=cfg.dataset.video,
|
||||
),
|
||||
aggregate_pipeline_dataset_features(
|
||||
pipeline=robot_observation_processor,
|
||||
initial_features=create_initial_features(observation=robot.observation_features),
|
||||
use_videos=cfg.dataset.video,
|
||||
),
|
||||
)
|
||||
# Dataset features derived automatically from robot/teleop pipelines.
|
||||
# When teleop is None (policy-only recording), only observation features are included.
|
||||
dataset_features = build_dataset_features(robot, teleop, use_videos=cfg.dataset.video)
|
||||
|
||||
dataset = None
|
||||
listener = None
|
||||
@@ -505,6 +535,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
|
||||
preprocessor = None
|
||||
postprocessor = None
|
||||
interpolator = None
|
||||
if cfg.policy is not None:
|
||||
preprocessor, postprocessor = make_pre_post_processors(
|
||||
policy_cfg=cfg.policy,
|
||||
@@ -515,11 +546,19 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map},
|
||||
},
|
||||
)
|
||||
# Create interpolator for smoother policy control
|
||||
if cfg.interpolation_multiplier > 1:
|
||||
interpolator = ActionInterpolator(multiplier=cfg.interpolation_multiplier)
|
||||
logging.info(f"Action interpolation enabled: {cfg.interpolation_multiplier}x control rate")
|
||||
|
||||
robot.connect()
|
||||
if teleop is not None:
|
||||
teleop.connect()
|
||||
|
||||
if teleop is not None:
|
||||
check_action_space_compatibility(teleop, robot)
|
||||
check_observation_space_compatibility(robot, teleop)
|
||||
|
||||
listener, events = init_keyboard_listener()
|
||||
|
||||
if not cfg.dataset.streaming_encoding:
|
||||
@@ -535,9 +574,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=cfg.dataset.fps,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
teleop=teleop,
|
||||
policy=policy,
|
||||
preprocessor=preprocessor,
|
||||
@@ -546,6 +582,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
control_time_s=cfg.dataset.episode_time_s,
|
||||
single_task=cfg.dataset.single_task,
|
||||
display_data=cfg.display_data,
|
||||
interpolator=interpolator,
|
||||
display_compressed_images=display_compressed_images,
|
||||
)
|
||||
|
||||
@@ -564,9 +601,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=cfg.dataset.fps,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
teleop=teleop,
|
||||
control_time_s=cfg.dataset.reset_time_s,
|
||||
single_task=cfg.dataset.single_task,
|
||||
|
||||
@@ -47,9 +47,6 @@ from pprint import pformat
|
||||
|
||||
from lerobot.configs import parser
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.processor import (
|
||||
make_default_robot_action_processor,
|
||||
)
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
Robot,
|
||||
RobotConfig,
|
||||
@@ -99,8 +96,6 @@ def replay(cfg: ReplayConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
|
||||
robot_action_processor = make_default_robot_action_processor()
|
||||
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
|
||||
|
||||
@@ -120,11 +115,10 @@ def replay(cfg: ReplayConfig):
|
||||
for i, name in enumerate(dataset.features[ACTION]["names"]):
|
||||
action[name] = action_array[i]
|
||||
|
||||
robot_obs = robot.get_observation()
|
||||
# Update cached observation so the robot's input pipeline can use it (e.g. for IK)
|
||||
robot.get_observation()
|
||||
|
||||
processed_action = robot_action_processor((action, robot_obs))
|
||||
|
||||
_ = robot.send_action(processed_action)
|
||||
_ = robot.send_action(action)
|
||||
|
||||
dt_s = time.perf_counter() - start_episode_t
|
||||
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))
|
||||
|
||||
@@ -61,12 +61,6 @@ import rerun as rr
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
|
||||
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
|
||||
from lerobot.configs import parser
|
||||
from lerobot.processor import (
|
||||
RobotAction,
|
||||
RobotObservation,
|
||||
RobotProcessorPipeline,
|
||||
make_default_processors,
|
||||
)
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
Robot,
|
||||
RobotConfig,
|
||||
@@ -100,6 +94,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
unitree_g1,
|
||||
)
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.pipeline_utils import check_action_space_compatibility, check_observation_space_compatibility
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
from lerobot.utils.utils import init_logging, move_cursor_up
|
||||
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
|
||||
@@ -127,28 +122,28 @@ def teleop_loop(
|
||||
teleop: Teleoperator,
|
||||
robot: Robot,
|
||||
fps: int,
|
||||
teleop_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
|
||||
robot_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
|
||||
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
|
||||
display_data: bool = False,
|
||||
duration: float | None = None,
|
||||
display_compressed_images: bool = False,
|
||||
):
|
||||
"""
|
||||
This function continuously reads actions from a teleoperation device, processes them through optional
|
||||
pipelines, sends them to a robot, and optionally displays the robot's state. The loop runs at a
|
||||
specified frequency until a set duration is reached or it is manually interrupted.
|
||||
Continuously reads actions from a teleoperation device, sends them to a robot,
|
||||
and optionally displays the robot's state. Pipelines are applied internally by
|
||||
the robot and teleoperator objects.
|
||||
|
||||
The loop runs at the specified frequency until a set duration is reached or it
|
||||
is manually interrupted.
|
||||
|
||||
Args:
|
||||
teleop: The teleoperator device instance providing control actions.
|
||||
robot: The robot instance being controlled.
|
||||
fps: The target frequency for the control loop in frames per second.
|
||||
display_data: If True, fetches robot observations and displays them in the console and Rerun.
|
||||
display_compressed_images: If True, compresses images before sending them to Rerun for display.
|
||||
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
|
||||
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
|
||||
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
|
||||
robot_observation_processor: An optional pipeline to process raw observations from the robot.
|
||||
display_data: If True, fetches robot observations and displays them in the
|
||||
console and Rerun.
|
||||
display_compressed_images: If True, compresses images before sending them
|
||||
to Rerun for display.
|
||||
duration: The maximum duration of the teleoperation loop in seconds.
|
||||
If None, the loop runs indefinitely.
|
||||
"""
|
||||
|
||||
display_len = max(len(key) for key in robot.action_features)
|
||||
@@ -157,40 +152,29 @@ def teleop_loop(
|
||||
while True:
|
||||
loop_start = time.perf_counter()
|
||||
|
||||
# Get robot observation
|
||||
# Not really needed for now other than for visualization
|
||||
# teleop_action_processor can take None as an observation
|
||||
# given that it is the identity processor as default
|
||||
obs = robot.get_observation()
|
||||
# Get teleop action (output_pipeline applied internally)
|
||||
action = teleop.get_action()
|
||||
|
||||
# Get teleop action
|
||||
raw_action = teleop.get_action()
|
||||
|
||||
# Process teleop action through pipeline
|
||||
teleop_action = teleop_action_processor((raw_action, obs))
|
||||
|
||||
# Process action for robot through pipeline
|
||||
robot_action_to_send = robot_action_processor((teleop_action, obs))
|
||||
|
||||
# Send processed action to robot (robot_action_processor.to_output should return RobotAction)
|
||||
_ = robot.send_action(robot_action_to_send)
|
||||
# Send action to robot (input_pipeline applied internally)
|
||||
robot_action_sent = robot.send_action(action)
|
||||
|
||||
if display_data:
|
||||
# Process robot observation through pipeline
|
||||
obs_transition = robot_observation_processor(obs)
|
||||
# Get robot observation (output_pipeline applied internally)
|
||||
obs = robot.get_observation()
|
||||
teleop.send_feedback(obs)
|
||||
|
||||
log_rerun_data(
|
||||
observation=obs_transition,
|
||||
action=teleop_action,
|
||||
observation=obs,
|
||||
action=action,
|
||||
compress_images=display_compressed_images,
|
||||
)
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
|
||||
# Display the final robot action that was sent
|
||||
for motor, value in robot_action_to_send.items():
|
||||
print(f"{motor:<{display_len}} | {value:>7.2f}")
|
||||
move_cursor_up(len(robot_action_to_send) + 3)
|
||||
for motor, value in robot_action_sent.items():
|
||||
if isinstance(value, float | int):
|
||||
print(f"{motor:<{display_len}} | {value:>7.2f}")
|
||||
move_cursor_up(len(robot_action_sent) + 3)
|
||||
|
||||
dt_s = time.perf_counter() - loop_start
|
||||
precise_sleep(max(1 / fps - dt_s, 0.0))
|
||||
@@ -216,11 +200,13 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
|
||||
teleop = make_teleoperator_from_config(cfg.teleop)
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
|
||||
|
||||
teleop.connect()
|
||||
robot.connect()
|
||||
|
||||
check_action_space_compatibility(teleop, robot)
|
||||
check_observation_space_compatibility(robot, teleop)
|
||||
|
||||
try:
|
||||
teleop_loop(
|
||||
teleop=teleop,
|
||||
@@ -228,9 +214,6 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
fps=cfg.fps,
|
||||
display_data=cfg.display_data,
|
||||
duration=cfg.teleop_time_s,
|
||||
teleop_action_processor=teleop_action_processor,
|
||||
robot_action_processor=robot_action_processor,
|
||||
robot_observation_processor=robot_observation_processor,
|
||||
display_compressed_images=display_compressed_images,
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
|
||||
@@ -72,9 +72,9 @@ class BiOpenArmLeader(Teleoperator):
|
||||
self.right_arm = OpenArmLeader(right_arm_config)
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
left_arm_features = self.left_arm.action_features
|
||||
right_arm_features = self.right_arm.action_features
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
left_arm_features = self.left_arm.raw_action_features
|
||||
right_arm_features = self.right_arm.raw_action_features
|
||||
|
||||
return {
|
||||
**{f"left_{k}": v for k, v in left_arm_features.items()},
|
||||
@@ -82,7 +82,7 @@ class BiOpenArmLeader(Teleoperator):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -112,7 +112,7 @@ class BiOpenArmLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
action_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
@@ -125,7 +125,7 @@ class BiOpenArmLeader(Teleoperator):
|
||||
|
||||
return action_dict
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO: Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -55,9 +55,9 @@ class BiSOLeader(Teleoperator):
|
||||
self.right_arm = SOLeader(right_arm_config)
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
left_arm_features = self.left_arm.action_features
|
||||
right_arm_features = self.right_arm.action_features
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
left_arm_features = self.left_arm.raw_action_features
|
||||
right_arm_features = self.right_arm.raw_action_features
|
||||
|
||||
return {
|
||||
**{f"left_{k}": v for k, v in left_arm_features.items()},
|
||||
@@ -65,7 +65,7 @@ class BiSOLeader(Teleoperator):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -94,7 +94,7 @@ class BiSOLeader(Teleoperator):
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
action_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
@@ -107,7 +107,7 @@ class BiSOLeader(Teleoperator):
|
||||
|
||||
return action_dict
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO: Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ class GamepadTeleop(Teleoperator):
|
||||
self.gamepad = None
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
if self.config.use_gripper:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
@@ -72,7 +72,7 @@ class GamepadTeleop(Teleoperator):
|
||||
}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
def raw_feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
def connect(self) -> None:
|
||||
@@ -87,7 +87,7 @@ class GamepadTeleop(Teleoperator):
|
||||
self.gamepad.start()
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
# Update the controller to get fresh inputs
|
||||
self.gamepad.update()
|
||||
|
||||
@@ -180,7 +180,7 @@ class GamepadTeleop(Teleoperator):
|
||||
# No additional configuration needed
|
||||
pass
|
||||
|
||||
def send_feedback(self, feedback: dict) -> None:
|
||||
def _send_feedback(self, feedback: dict) -> None:
|
||||
"""Send feedback to the gamepad."""
|
||||
# Gamepad doesn't support feedback
|
||||
pass
|
||||
|
||||
@@ -81,11 +81,11 @@ class HomunculusArm(Teleoperator):
|
||||
self.state_lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
def raw_feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -298,11 +298,11 @@ class HomunculusArm(Teleoperator):
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@check_if_not_connected
|
||||
|
||||
@@ -107,11 +107,11 @@ class HomunculusGlove(Teleoperator):
|
||||
self.state_lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
def raw_feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -324,13 +324,13 @@ class HomunculusGlove(Teleoperator):
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return homunculus_glove_to_hope_jr_hand(
|
||||
{f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
)
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@check_if_not_connected
|
||||
|
||||
@@ -67,7 +67,7 @@ class KeyboardTeleop(Teleoperator):
|
||||
self.logs = {}
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (len(self.arm),),
|
||||
@@ -75,7 +75,7 @@ class KeyboardTeleop(Teleoperator):
|
||||
}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
def raw_feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -122,7 +122,7 @@ class KeyboardTeleop(Teleoperator):
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
before_read_t = time.perf_counter()
|
||||
|
||||
self._drain_pressed_keys()
|
||||
@@ -133,7 +133,7 @@ class KeyboardTeleop(Teleoperator):
|
||||
|
||||
return dict.fromkeys(action, None)
|
||||
|
||||
def send_feedback(self, feedback: dict[str, Any]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, Any]) -> None:
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
@@ -157,7 +157,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
self.misc_keys_queue = Queue()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
if self.config.use_gripper:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
@@ -172,7 +172,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
}
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
self._drain_pressed_keys()
|
||||
delta_x = 0.0
|
||||
delta_y = 0.0
|
||||
@@ -338,7 +338,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
|
||||
self.current_angular_speed = config.angular_speed
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
def raw_action_features(self) -> dict:
|
||||
"""Return action format for rover (linear and angular velocities)."""
|
||||
return {
|
||||
"linear.vel": float,
|
||||
@@ -361,7 +361,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
|
||||
self.current_pressed.pop(key_char, None)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
"""
|
||||
Get the current action based on pressed keys.
|
||||
|
||||
|
||||
@@ -58,11 +58,11 @@ class KochLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -160,7 +160,7 @@ class KochLeader(Teleoperator):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
start = time.perf_counter()
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
@@ -168,7 +168,7 @@ class KochLeader(Teleoperator):
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO(rcadene, aliberts): Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -57,11 +57,11 @@ class OmxLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -149,7 +149,7 @@ class OmxLeader(Teleoperator):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
start = time.perf_counter()
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
@@ -157,7 +157,7 @@ class OmxLeader(Teleoperator):
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO(rcadene, aliberts): Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ class OpenArmLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
"""Features produced by this teleoperator."""
|
||||
features: dict[str, type] = {}
|
||||
for motor in self.bus.motors:
|
||||
@@ -75,7 +75,7 @@ class OpenArmLeader(Teleoperator):
|
||||
return features
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
"""Feedback features (not implemented for OpenArms)."""
|
||||
return {}
|
||||
|
||||
@@ -183,7 +183,7 @@ class OpenArmLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
"""
|
||||
Get current action from the leader arm.
|
||||
|
||||
@@ -209,7 +209,7 @@ class OpenArmLeader(Teleoperator):
|
||||
|
||||
return action_dict
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
|
||||
|
||||
@check_if_not_connected
|
||||
|
||||
@@ -31,9 +31,17 @@ from .config_openarm_mini import OpenArmMiniConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Motors whose direction is inverted during readout
|
||||
RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
|
||||
LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
|
||||
# Motors whose direction is inverted on the leader side.
|
||||
LEFT_MOTORS_TO_FLIP = {"joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"}
|
||||
RIGHT_MOTORS_TO_FLIP = {"joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"}
|
||||
# Leader(OpenArmMini) -> Follower(OpenArms) joint remap
|
||||
JOINT_REMAP_TO_OPENARMS = {"joint_6": "joint_7", "joint_7": "joint_6"}
|
||||
# Follower(OpenArms) -> Leader(OpenArmMini) joint remap
|
||||
JOINT_REMAP_TO_MINI = {"joint_7": "joint_6", "joint_6": "joint_7"}
|
||||
OPENARMS_GRIPPER_MIN = -65.0
|
||||
OPENARMS_GRIPPER_MAX = 0.0
|
||||
MINI_GRIPPER_MIN = 0.0
|
||||
MINI_GRIPPER_MAX = 100.0
|
||||
|
||||
|
||||
class OpenArmMini(Teleoperator):
|
||||
@@ -93,8 +101,28 @@ class OpenArmMini(Teleoperator):
|
||||
calibration=cal_left,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _mini_gripper_to_openarms(value: float) -> float:
|
||||
"""Convert OpenArmMini gripper range [0, 100] to OpenArms gripper range [-65, 0]."""
|
||||
mapped = OPENARMS_GRIPPER_MAX + (
|
||||
(value - MINI_GRIPPER_MIN)
|
||||
* (OPENARMS_GRIPPER_MIN - OPENARMS_GRIPPER_MAX)
|
||||
/ (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
|
||||
)
|
||||
return max(min(mapped, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
|
||||
|
||||
@staticmethod
|
||||
def _openarms_gripper_to_mini(value: float) -> float:
|
||||
"""Convert OpenArms gripper range [-65, 0] to OpenArmMini gripper range [0, 100]."""
|
||||
clipped = max(min(value, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
|
||||
return MINI_GRIPPER_MIN + (
|
||||
(OPENARMS_GRIPPER_MAX - clipped)
|
||||
* (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
|
||||
/ (OPENARMS_GRIPPER_MAX - OPENARMS_GRIPPER_MIN)
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
features: dict[str, type] = {}
|
||||
for motor in self.bus_right.motors:
|
||||
features[f"right_{motor}.pos"] = float
|
||||
@@ -103,7 +131,7 @@ class OpenArmMini(Teleoperator):
|
||||
return features
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -269,7 +297,7 @@ class OpenArmMini(Teleoperator):
|
||||
print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
"""Get current action from both arms (read positions from all motors)."""
|
||||
start = time.perf_counter()
|
||||
|
||||
@@ -278,16 +306,64 @@ class OpenArmMini(Teleoperator):
|
||||
|
||||
action: dict[str, Any] = {}
|
||||
for motor, val in right_positions.items():
|
||||
action[f"right_{motor}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val
|
||||
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
|
||||
mapped_val = -val if motor in RIGHT_MOTORS_TO_FLIP else val
|
||||
if target_motor == "gripper":
|
||||
mapped_val = self._mini_gripper_to_openarms(mapped_val)
|
||||
action[f"right_{target_motor}.pos"] = mapped_val
|
||||
for motor, val in left_positions.items():
|
||||
action[f"left_{motor}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val
|
||||
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
|
||||
mapped_val = -val if motor in LEFT_MOTORS_TO_FLIP else val
|
||||
if target_motor == "gripper":
|
||||
mapped_val = self._mini_gripper_to_openarms(mapped_val)
|
||||
action[f"left_{target_motor}.pos"] = mapped_val
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")
|
||||
@check_if_not_connected
|
||||
def enable_torque(self) -> None:
|
||||
"""Enable torque on both arms for active motion commands."""
|
||||
self.bus_right.enable_torque()
|
||||
self.bus_left.enable_torque()
|
||||
|
||||
@check_if_not_connected
|
||||
def disable_torque(self) -> None:
|
||||
"""Disable torque on both arms for manual teleoperation."""
|
||||
self.bus_right.disable_torque()
|
||||
self.bus_left.disable_torque()
|
||||
|
||||
@check_if_not_connected
|
||||
def write_goal_positions(self, action: dict[str, float]) -> None:
|
||||
"""Send normalized bilateral goal positions to the underlying Feetech buses."""
|
||||
right_goals: dict[str, float] = {}
|
||||
left_goals: dict[str, float] = {}
|
||||
|
||||
for key, value in action.items():
|
||||
if not key.endswith(".pos"):
|
||||
continue
|
||||
|
||||
if key.startswith("right_"):
|
||||
openarms_motor = key.removeprefix("right_").removesuffix(".pos")
|
||||
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
|
||||
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
|
||||
right_goals[mini_motor] = -mapped_val if mini_motor in RIGHT_MOTORS_TO_FLIP else mapped_val
|
||||
elif key.startswith("left_"):
|
||||
openarms_motor = key.removeprefix("left_").removesuffix(".pos")
|
||||
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
|
||||
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
|
||||
left_goals[mini_motor] = -mapped_val if mini_motor in LEFT_MOTORS_TO_FLIP else mapped_val
|
||||
|
||||
if right_goals:
|
||||
self.bus_right.sync_write("Goal_Position", right_goals)
|
||||
if left_goals:
|
||||
self.bus_left.sync_write("Goal_Position", left_goals)
|
||||
|
||||
@check_if_not_connected
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
"""Route feedback position commands through the same OpenArms/OpenArmMini mapping."""
|
||||
self.write_goal_positions(feedback)
|
||||
|
||||
@check_if_not_connected
|
||||
def disconnect(self) -> None:
|
||||
|
||||
@@ -47,7 +47,7 @@ class BasePhone:
|
||||
return (self._calib_pos is not None) and (self._calib_rot_inv is not None)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {
|
||||
"phone.pos": np.ndarray, # shape (3,)
|
||||
"phone.rot": Rotation, # scipy.spatial.transform.Rotation
|
||||
@@ -56,15 +56,15 @@ class BasePhone:
|
||||
}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
# No haptic or other feedback implemented yet
|
||||
pass
|
||||
return {}
|
||||
|
||||
def configure(self) -> None:
|
||||
# No additional configuration required for phone teleop
|
||||
pass
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# We could add haptic feedback (vibrations) here, but it's not implemented yet
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -163,7 +163,7 @@ class IOSPhone(BasePhone, Teleoperator):
|
||||
return True, pos, rot, pose
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict:
|
||||
def _get_action(self) -> dict:
|
||||
has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose()
|
||||
if not has_pose or not self.is_calibrated:
|
||||
return {}
|
||||
@@ -314,7 +314,7 @@ class AndroidPhone(BasePhone, Teleoperator):
|
||||
self._latest_message = message
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict:
|
||||
def _get_action(self) -> dict:
|
||||
ok, raw_pos, raw_rot, pose = self._read_current_pose()
|
||||
if not ok or not self.is_calibrated:
|
||||
return {}
|
||||
@@ -395,21 +395,21 @@ class Phone(Teleoperator):
|
||||
return self._phone_impl.is_calibrated
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._phone_impl.action_features
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return self._phone_impl.raw_action_features
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
return self._phone_impl.feedback_features
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return self._phone_impl.raw_feedback_features
|
||||
|
||||
def configure(self) -> None:
|
||||
return self._phone_impl.configure()
|
||||
|
||||
def get_action(self) -> dict:
|
||||
return self._phone_impl.get_action()
|
||||
def _get_action(self) -> dict:
|
||||
return self._phone_impl._get_action()
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
return self._phone_impl.send_feedback(feedback)
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
return self._phone_impl._send_feedback(feedback)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
return self._phone_impl.disconnect()
|
||||
|
||||
@@ -104,7 +104,7 @@ class Reachy2Teleoperator(Teleoperator):
|
||||
return joints
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
if self.config.with_mobile_base:
|
||||
return {
|
||||
**dict.fromkeys(
|
||||
@@ -120,7 +120,7 @@ class Reachy2Teleoperator(Teleoperator):
|
||||
return dict.fromkeys(self.joints_dict.keys(), float)
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -146,7 +146,7 @@ class Reachy2Teleoperator(Teleoperator):
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
start = time.perf_counter()
|
||||
|
||||
joint_action: dict[str, float] = {}
|
||||
@@ -168,7 +168,7 @@ class Reachy2Teleoperator(Teleoperator):
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return {**joint_action, **vel_action}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from .ee_space import make_so10x_leader_fk_pipeline
|
||||
|
||||
__all__ = ["make_so10x_leader_fk_pipeline"]
|
||||
@@ -0,0 +1,82 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Forward-kinematics pipeline for SO-100/101 leader (teleoperator) arm.
|
||||
|
||||
Converts raw leader joint positions into end-effector pose. Attach this to a leader
|
||||
via ``set_output_pipeline`` so that ``get_action()`` returns EE coordinates instead of
|
||||
raw joint angles.
|
||||
|
||||
Example::
|
||||
|
||||
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
|
||||
|
||||
motor_names = list(leader.bus.motors.keys())
|
||||
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, motor_names))
|
||||
action = leader.get_action() # now contains ee.x, ee.y, ee.z, ...
|
||||
"""
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor import RobotAction, RobotProcessorPipeline
|
||||
from lerobot.processor.converters import (
|
||||
robot_action_to_transition,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.robots.so_follower.robot_kinematic_processor import ForwardKinematicsJointsToEE
|
||||
|
||||
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
|
||||
|
||||
|
||||
def make_so10x_leader_fk_pipeline(
|
||||
urdf_path: str,
|
||||
motor_names: list[str],
|
||||
*,
|
||||
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
|
||||
) -> RobotProcessorPipeline[RobotAction, RobotAction]:
|
||||
"""
|
||||
Create a forward-kinematics action pipeline for SO-100/101 leader teleoperators.
|
||||
|
||||
Converts raw leader joint positions (action) into end-effector pose (position +
|
||||
orientation + gripper). Attach this to a leader via ``set_output_pipeline`` so that
|
||||
``get_action()`` returns EE coordinates instead of raw joint angles.
|
||||
|
||||
Args:
|
||||
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
|
||||
motor_names: Ordered list of motor names matching the URDF joint names.
|
||||
target_frame_name: Name of the end-effector frame in the URDF.
|
||||
|
||||
Returns:
|
||||
A RobotProcessorPipeline that maps joint actions to EE actions.
|
||||
|
||||
Example::
|
||||
|
||||
motor_names = list(leader.bus.motors.keys())
|
||||
leader.set_output_pipeline(
|
||||
make_so10x_leader_fk_pipeline("./so101.urdf", motor_names)
|
||||
)
|
||||
action = leader.get_action() # returns ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
|
||||
"""
|
||||
kinematics = RobotKinematics(
|
||||
urdf_path=urdf_path,
|
||||
target_frame_name=target_frame_name,
|
||||
joint_names=motor_names,
|
||||
)
|
||||
return RobotProcessorPipeline[RobotAction, RobotAction](
|
||||
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
|
||||
to_transition=robot_action_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
@@ -55,11 +55,11 @@ class SOLeader(Teleoperator):
|
||||
)
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -138,7 +138,7 @@ class SOLeader(Teleoperator):
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
start = time.perf_counter()
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
@@ -146,7 +146,7 @@ class SOLeader(Teleoperator):
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# TODO: Implement force feedback
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -12,17 +12,23 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import builtins
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import draccus
|
||||
|
||||
from lerobot.configs.types import PipelineFeatureType
|
||||
from lerobot.motors.motors_bus import MotorCalibration
|
||||
from lerobot.processor import RobotAction
|
||||
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from lerobot.processor.core import RobotAction
|
||||
from lerobot.processor.pipeline import RobotProcessorPipeline
|
||||
|
||||
from .config import TeleoperatorConfig
|
||||
|
||||
|
||||
@@ -33,6 +39,10 @@ class Teleoperator(abc.ABC):
|
||||
This class provides a standardized interface for interacting with physical teleoperators.
|
||||
Subclasses must implement all abstract methods and properties to be usable.
|
||||
|
||||
Pipelines are first-class citizens: every teleoperator carries an optional output pipeline
|
||||
(applied in get_action()) and an optional input pipeline (applied in send_feedback()).
|
||||
Both default to identity (no-op), so existing teleoperators work without any changes.
|
||||
|
||||
Attributes:
|
||||
config_class (RobotConfig): The expected configuration class for this teleoperator.
|
||||
name (str): The unique name used to identify this teleoperator type.
|
||||
@@ -55,6 +65,14 @@ class Teleoperator(abc.ABC):
|
||||
if self.calibration_fpath.is_file():
|
||||
self._load_calibration()
|
||||
|
||||
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
|
||||
# Lazy import: factory is in lerobot.processor which loads after teleoperators at module init time,
|
||||
# but __init__ runs at instance-creation time when lerobot.processor is fully loaded.
|
||||
from lerobot.processor.factory import _make_identity_feedback_pipeline, _make_identity_teleop_action_pipeline
|
||||
|
||||
self._output_pipeline: RobotProcessorPipeline = _make_identity_teleop_action_pipeline()
|
||||
self._input_pipeline: RobotProcessorPipeline = _make_identity_feedback_pipeline()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.id} {self.__class__.__name__}"
|
||||
|
||||
@@ -84,38 +102,114 @@ class Teleoperator(abc.ABC):
|
||||
except Exception: # nosec B110
|
||||
pass
|
||||
|
||||
# ── Pipeline interface ────────────────────────────────────────────────────
|
||||
|
||||
def output_pipeline(self) -> RobotProcessorPipeline:
|
||||
"""
|
||||
Pipeline applied inside get_action() to transform raw hardware actions.
|
||||
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
|
||||
|
||||
Example: set a forward-kinematics pipeline to convert leader joint positions to EE pose.
|
||||
"""
|
||||
return self._output_pipeline
|
||||
|
||||
def input_pipeline(self) -> RobotProcessorPipeline:
|
||||
"""
|
||||
Pipeline applied inside send_feedback() to transform incoming feedback.
|
||||
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
|
||||
"""
|
||||
return self._input_pipeline
|
||||
|
||||
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
|
||||
"""Set the action output pipeline (applied in get_action())."""
|
||||
self._output_pipeline = pipeline
|
||||
|
||||
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
|
||||
"""Set the feedback input pipeline (applied in send_feedback())."""
|
||||
self._input_pipeline = pipeline
|
||||
|
||||
# ── Feature properties ────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def action_features(self) -> dict:
|
||||
"""
|
||||
A dictionary describing the structure and types of the actions produced by the teleoperator. Its
|
||||
structure (keys) should match the structure of what is returned by :pymeth:`get_action`. Values for
|
||||
the dict should be the type of the value if it's a simple value, e.g. `float` for single
|
||||
proprioceptive value (a joint's goal position/velocity)
|
||||
Pipeline-transformed action features.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot is connected or not.
|
||||
Applies output_pipeline().transform_features() to raw_action_features so the
|
||||
returned dict matches what get_action() actually produces for callers.
|
||||
|
||||
Use raw_action_features to inspect hardware-level feature shapes.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the
|
||||
teleoperator is connected or not.
|
||||
"""
|
||||
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
|
||||
|
||||
initial = create_initial_features(action=self.raw_action_features)
|
||||
transformed = self.output_pipeline().transform_features(initial)
|
||||
return transformed.get(PipelineFeatureType.ACTION, {})
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def raw_action_features(self) -> dict:
|
||||
"""
|
||||
Hardware-level action features (before any pipeline transformation).
|
||||
|
||||
A dictionary describing the structure and types of the actions produced
|
||||
directly by the teleoperator hardware. Its structure (keys) should match
|
||||
the structure of what is returned by :pymeth:`_get_action`. Values should be
|
||||
the type of the value if it's a simple value, e.g. ``float`` for single
|
||||
proprioceptive value (a joint's goal position/velocity).
|
||||
|
||||
Note: this property should be able to be called regardless of whether the
|
||||
teleoperator is connected or not.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def feedback_features(self) -> dict:
|
||||
def raw_feedback_features(self) -> dict:
|
||||
"""
|
||||
A dictionary describing the structure and types of the feedback actions expected by the robot. Its
|
||||
structure (keys) should match the structure of what is passed to :pymeth:`send_feedback`. Values for
|
||||
the dict should be the type of the value if it's a simple value, e.g. `float` for single
|
||||
proprioceptive value (a joint's goal position/velocity)
|
||||
Hardware-level feedback features (before any pipeline transformation).
|
||||
|
||||
Note: this property should be able to be called regardless of whether the robot is connected or not.
|
||||
A dictionary describing the structure and types of the feedback accepted directly
|
||||
by the teleoperator hardware (i.e. what :pymeth:`_send_feedback` receives). Its
|
||||
structure (keys) should match the structure of what is expected by
|
||||
:pymeth:`_send_feedback`. Values should be the type of the value if it's a simple
|
||||
value, e.g. ``float`` for single proprioceptive value.
|
||||
|
||||
Return an empty dict if this teleoperator does not support feedback.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the
|
||||
teleoperator is connected or not.
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
"""
|
||||
Pipeline-transformed feedback features.
|
||||
|
||||
Applies input_pipeline().transform_features() to raw_feedback_features so the
|
||||
returned dict reflects what the input pipeline outputs to the teleoperator hardware.
|
||||
|
||||
Use raw_feedback_features to inspect hardware-level feedback feature shapes.
|
||||
|
||||
Note: this property should be able to be called regardless of whether the
|
||||
teleoperator is connected or not.
|
||||
"""
|
||||
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
|
||||
|
||||
initial = create_initial_features(observation=self.raw_feedback_features)
|
||||
transformed = self.input_pipeline().transform_features(initial)
|
||||
return transformed.get(PipelineFeatureType.OBSERVATION, {})
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def is_connected(self) -> bool:
|
||||
"""
|
||||
Whether the teleoperator is currently connected or not. If `False`, calling :pymeth:`get_action`
|
||||
or :pymeth:`send_feedback` should raise an error.
|
||||
Whether the teleoperator is currently connected or not. If ``False``, calling
|
||||
:pymeth:`get_action` or :pymeth:`send_feedback` should raise an error.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -133,7 +227,7 @@ class Teleoperator(abc.ABC):
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def is_calibrated(self) -> bool:
|
||||
"""Whether the teleoperator is currently calibrated or not. Should be always `True` if not applicable"""
|
||||
"""Whether the teleoperator is currently calibrated or not. Should be always ``True`` if not applicable"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -151,7 +245,7 @@ class Teleoperator(abc.ABC):
|
||||
Helper to load calibration data from the specified file.
|
||||
|
||||
Args:
|
||||
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
|
||||
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
|
||||
"""
|
||||
fpath = self.calibration_fpath if fpath is None else fpath
|
||||
with open(fpath) as f, draccus.config_type("json"):
|
||||
@@ -162,7 +256,7 @@ class Teleoperator(abc.ABC):
|
||||
Helper to save calibration data to the specified file.
|
||||
|
||||
Args:
|
||||
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
|
||||
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
|
||||
"""
|
||||
fpath = self.calibration_fpath if fpath is None else fpath
|
||||
with open(fpath, "w") as f, draccus.config_type("json"):
|
||||
@@ -176,29 +270,51 @@ class Teleoperator(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
# ── Template methods (concrete, call pipeline internally) ─────────────────
|
||||
|
||||
def get_action(self) -> RobotAction:
|
||||
"""
|
||||
Retrieve the current action from the teleoperator.
|
||||
Retrieve the current action from the teleoperator and apply the output pipeline.
|
||||
|
||||
Calls :pymeth:`_get_action` to get raw hardware data, then applies
|
||||
:pymeth:`output_pipeline`.
|
||||
|
||||
Returns:
|
||||
RobotAction: A flat dictionary representing the teleoperator's current actions. Its
|
||||
structure should match :pymeth:`observation_features`.
|
||||
RobotAction: Pipeline-transformed action. With the default identity pipeline
|
||||
this equals the raw action from :pymeth:`_get_action`.
|
||||
"""
|
||||
raw = self._get_action()
|
||||
return self.output_pipeline()(raw)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_action(self) -> RobotAction:
|
||||
"""
|
||||
Retrieve the raw action directly from teleoperator hardware.
|
||||
|
||||
Returns:
|
||||
RobotAction: A flat dictionary representing the teleoperator's current actions.
|
||||
Its structure should match :pymeth:`raw_action_features`.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_feedback(self, feedback: dict[str, Any]) -> None:
|
||||
"""
|
||||
Send a feedback action command to the teleoperator.
|
||||
Apply the input pipeline and send the resulting feedback to teleoperator hardware.
|
||||
|
||||
Args:
|
||||
feedback (dict[str, Any]): Dictionary representing the desired feedback. Its structure should match
|
||||
:pymeth:`feedback_features`.
|
||||
feedback (dict[str, Any]): Dictionary representing the desired feedback.
|
||||
Its structure should match :pymeth:`feedback_features`.
|
||||
"""
|
||||
transformed = self.input_pipeline()(feedback)
|
||||
self._send_feedback(transformed)
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by
|
||||
safety limits on velocity.
|
||||
@abc.abstractmethod
|
||||
def _send_feedback(self, feedback: dict[str, Any]) -> None:
|
||||
"""
|
||||
Send feedback directly to teleoperator hardware.
|
||||
|
||||
Args:
|
||||
feedback (dict[str, Any]): Dictionary of hardware-level feedback commands.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -72,11 +72,11 @@ class UnitreeG1Teleoperator(Teleoperator):
|
||||
self.ik_helper: ExoskeletonIKHelper | None = None
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{name}.q": float for name in self._g1_joint_names}
|
||||
|
||||
@cached_property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
def raw_feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
@@ -114,12 +114,12 @@ class UnitreeG1Teleoperator(Teleoperator):
|
||||
def configure(self) -> None:
|
||||
pass
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
def _get_action(self) -> dict[str, float]:
|
||||
left_angles = self.left_arm.get_angles()
|
||||
right_angles = self.right_arm.get_angles()
|
||||
return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
def _send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError("Exoskeleton arms do not support feedback")
|
||||
|
||||
def disconnect(self) -> None:
|
||||
|
||||
@@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Utilities for building dataset features from robot/teleoperator pipelines and for
|
||||
checking action/observation space compatibility between teleops and robots.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import Sequence
|
||||
|
||||
from lerobot.datasets.utils import combine_feature_dicts, hw_to_dataset_features
|
||||
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
|
||||
|
||||
# Prefixes stripped from feature keys to produce clean dataset names.
|
||||
# Handles both fully-qualified (e.g. "observation.state.ee.x") and short (e.g. "state.ee.x") forms.
|
||||
_PREFIXES_TO_STRIP = tuple(
|
||||
f"{token}."
|
||||
for const in (ACTION, OBS_STATE, OBS_IMAGES)
|
||||
for token in (const, const.split(".")[-1])
|
||||
)
|
||||
|
||||
_IMAGES_TOKEN = OBS_IMAGES.split(".")[-1]
|
||||
|
||||
|
||||
def _should_keep(key: str, patterns: Sequence[str] | None) -> bool:
|
||||
if patterns is None:
|
||||
return True
|
||||
return any(re.search(pat, key) for pat in patterns)
|
||||
|
||||
|
||||
def _strip_prefix(key: str) -> str:
|
||||
for prefix in _PREFIXES_TO_STRIP:
|
||||
if key.startswith(prefix):
|
||||
return key[len(prefix) :]
|
||||
return key
|
||||
|
||||
|
||||
def _features_to_dataset_spec(
|
||||
features: dict,
|
||||
*,
|
||||
is_action: bool,
|
||||
use_videos: bool,
|
||||
patterns: Sequence[str] | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Convert a flat feature dict (as returned by ``robot.observation_features`` or
|
||||
``teleop.action_features``) into a LeRobot dataset feature specification.
|
||||
|
||||
Args:
|
||||
features: Flat dict mapping feature key → type or shape.
|
||||
is_action: True when ``features`` describes actions; False for observations.
|
||||
use_videos: When False, image observation features are excluded entirely.
|
||||
patterns: Optional regex patterns to filter state/action features.
|
||||
Image features are not affected by this filter.
|
||||
|
||||
Returns:
|
||||
A dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
|
||||
"""
|
||||
categorized: dict = {}
|
||||
for key, value in features.items():
|
||||
is_image = not is_action and (
|
||||
(isinstance(value, tuple) and len(value) == 3)
|
||||
or key.startswith(f"{OBS_IMAGES}.")
|
||||
or key.startswith(f"{_IMAGES_TOKEN}.")
|
||||
or f".{_IMAGES_TOKEN}." in key
|
||||
)
|
||||
|
||||
if is_image and not use_videos:
|
||||
continue
|
||||
if not is_image and not _should_keep(key, patterns):
|
||||
continue
|
||||
|
||||
categorized[_strip_prefix(key)] = value
|
||||
|
||||
if not categorized:
|
||||
return {}
|
||||
|
||||
prefix = ACTION if is_action else OBS_STR
|
||||
return hw_to_dataset_features(categorized, prefix, use_videos)
|
||||
|
||||
|
||||
def build_dataset_features(
|
||||
robot,
|
||||
teleop=None,
|
||||
*,
|
||||
use_videos: bool = True,
|
||||
action_features: dict | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Derive dataset feature specifications from robot and teleoperator pipelines.
|
||||
|
||||
Reads ``robot.observation_features`` (which already reflects the robot's output
|
||||
pipeline transformation) and, when provided, ``teleop.action_features`` or an
|
||||
explicit ``action_features`` dict to determine what the dataset will store.
|
||||
|
||||
This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features``
|
||||
with explicit processor objects.
|
||||
|
||||
Args:
|
||||
robot: The robot instance (must have ``observation_features``).
|
||||
teleop: The teleoperator instance. When ``None`` and ``action_features`` is also
|
||||
``None`` (policy-only recording), only observation features are returned.
|
||||
use_videos: If True, image observations are included as video features.
|
||||
action_features: Explicit action feature dict, used when no teleop is available
|
||||
(e.g. evaluate/inference mode) but the dataset must match a specific action
|
||||
space (e.g. EE coordinates from a previously recorded dataset).
|
||||
|
||||
Returns:
|
||||
A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
|
||||
|
||||
Example::
|
||||
|
||||
# Teleop recording
|
||||
features = build_dataset_features(follower, leader, use_videos=True)
|
||||
|
||||
# Policy-only recording (no teleop)
|
||||
features = build_dataset_features(robot, use_videos=True)
|
||||
|
||||
# Evaluate with explicit EE action space
|
||||
features = build_dataset_features(
|
||||
robot,
|
||||
use_videos=True,
|
||||
action_features={
|
||||
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
|
||||
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
|
||||
},
|
||||
)
|
||||
"""
|
||||
obs_ds = _features_to_dataset_spec(robot.observation_features, is_action=False, use_videos=use_videos)
|
||||
|
||||
if action_features is not None:
|
||||
act_ds = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
|
||||
elif teleop is not None:
|
||||
act_ds = _features_to_dataset_spec(teleop.action_features, is_action=True, use_videos=False)
|
||||
else:
|
||||
return obs_ds
|
||||
|
||||
return combine_feature_dicts(act_ds, obs_ds)
|
||||
|
||||
|
||||
def check_action_space_compatibility(teleop, robot) -> None:
|
||||
"""
|
||||
Warn if the teleoperator's pipeline-transformed action features don't match the robot's
|
||||
declared ``action_features``.
|
||||
|
||||
This is a soft check — a mismatch produces a warning but does not raise. It is intended
|
||||
to catch obvious misconfigurations (e.g., sending EE actions to a robot expecting joints)
|
||||
before the control loop starts.
|
||||
|
||||
Args:
|
||||
teleop: The teleoperator whose ``action_features`` describe what it sends.
|
||||
robot: The robot whose ``action_features`` describe what it expects.
|
||||
"""
|
||||
teleop_out = set(teleop.action_features.keys())
|
||||
robot_in = set(robot.action_features.keys())
|
||||
if teleop_out != robot_in:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
f"Action space mismatch between teleop and robot.\n"
|
||||
f" Teleop sends: {sorted(teleop_out)}\n"
|
||||
f" Robot expects: {sorted(robot_in)}\n"
|
||||
"Ensure pipelines map between these spaces correctly.",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
else:
|
||||
logging.debug("Action space compatibility check passed.")
|
||||
|
||||
|
||||
def check_observation_space_compatibility(robot, teleop) -> None:
|
||||
"""
|
||||
Warn if the robot's observation features don't cover what the teleoperator's
|
||||
``feedback_features`` expects.
|
||||
|
||||
A non-empty ``feedback_features`` that is not a subset of the robot's observation keys
|
||||
will produce a warning.
|
||||
|
||||
Args:
|
||||
robot: The robot whose ``observation_features`` describe what it produces.
|
||||
teleop: The teleoperator whose ``feedback_features`` describe what it expects as feedback.
|
||||
"""
|
||||
robot_obs = set(robot.observation_features.keys())
|
||||
teleop_feedback = set(teleop.feedback_features.keys())
|
||||
if teleop_feedback and not teleop_feedback.issubset(robot_obs):
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
f"Observation/feedback space mismatch.\n"
|
||||
f" Robot obs: {sorted(robot_obs)}\n"
|
||||
f" Teleop feedback expects: {sorted(teleop_feedback)}\n"
|
||||
"Ensure the robot observation pipeline covers all feedback keys.",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
else:
|
||||
logging.debug("Observation/feedback space compatibility check passed.")
|
||||
@@ -87,7 +87,7 @@ class MockRobot(Robot):
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
def raw_observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
@@ -116,7 +116,7 @@ class MockRobot(Robot):
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
def get_observation(self) -> RobotObservation:
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
if self.config.random_values:
|
||||
return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors}
|
||||
else:
|
||||
@@ -125,7 +125,7 @@ class MockRobot(Robot):
|
||||
}
|
||||
|
||||
@check_if_not_connected
|
||||
def send_action(self, action: RobotAction) -> RobotAction:
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
return action
|
||||
|
||||
@check_if_not_connected
|
||||
|
||||
@@ -57,7 +57,7 @@ class MockTeleop(Teleoperator):
|
||||
self.motors = [f"motor_{i + 1}" for i in range(config.n_motors)]
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
def raw_action_features(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.motors}
|
||||
|
||||
@cached_property
|
||||
@@ -86,7 +86,7 @@ class MockTeleop(Teleoperator):
|
||||
pass
|
||||
|
||||
@check_if_not_connected
|
||||
def get_action(self) -> RobotAction:
|
||||
def _get_action(self) -> RobotAction:
|
||||
if self.config.random_values:
|
||||
return {f"{motor}.pos": random.uniform(-100, 100) for motor in self.motors}
|
||||
else:
|
||||
@@ -95,7 +95,7 @@ class MockTeleop(Teleoperator):
|
||||
}
|
||||
|
||||
@check_if_not_connected
|
||||
def send_feedback(self, feedback: dict[str, Any]) -> None: ...
|
||||
def _send_feedback(self, feedback: dict[str, Any]) -> None: ...
|
||||
|
||||
@check_if_not_connected
|
||||
def disconnect(self) -> None:
|
||||
|
||||
@@ -26,7 +26,7 @@ import torch
|
||||
import torch.nn as nn
|
||||
|
||||
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
|
||||
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
|
||||
from lerobot.utils.pipeline_utils import _features_to_dataset_spec
|
||||
from lerobot.processor import (
|
||||
DataProcessorPipeline,
|
||||
EnvTransition,
|
||||
@@ -2040,102 +2040,68 @@ def test_features_remove_from_initial(policy_feature_factory):
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AddActionEEAndJointFeatures(ProcessorStep):
|
||||
"""Adds both EE and JOINT action features."""
|
||||
|
||||
def __call__(self, tr):
|
||||
return tr
|
||||
|
||||
def transform_features(
|
||||
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
|
||||
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
|
||||
# EE features
|
||||
features[PipelineFeatureType.ACTION]["action.ee.x"] = float
|
||||
features[PipelineFeatureType.ACTION]["action.ee.y"] = float
|
||||
# JOINT features
|
||||
features[PipelineFeatureType.ACTION]["action.j1.pos"] = float
|
||||
features[PipelineFeatureType.ACTION]["action.j2.pos"] = float
|
||||
return features
|
||||
# ── Tests for _features_to_dataset_spec ──────────────────────────────────────────────────────────
|
||||
# These replace the old aggregate_pipeline_dataset_features tests, covering the same categorisation
|
||||
# / filtering / prefix-stripping / HF-format logic via the private helper directly.
|
||||
|
||||
|
||||
@dataclass
|
||||
class AddObservationStateFeatures(ProcessorStep):
|
||||
"""Adds state features (and optionally an image spec to test precedence)."""
|
||||
|
||||
add_front_image: bool = False
|
||||
front_image_shape: tuple = (240, 320, 3)
|
||||
|
||||
def __call__(self, tr):
|
||||
return tr
|
||||
|
||||
def transform_features(
|
||||
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
|
||||
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
|
||||
# State features (mix EE and a joint state)
|
||||
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.ee.x"] = float
|
||||
features[PipelineFeatureType.OBSERVATION][f"{OBS_STATE}.j1.pos"] = float
|
||||
if self.add_front_image:
|
||||
features[PipelineFeatureType.OBSERVATION][f"{OBS_IMAGES}.front"] = self.front_image_shape
|
||||
return features
|
||||
|
||||
|
||||
def test_aggregate_joint_action_only():
|
||||
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
|
||||
initial = {PipelineFeatureType.OBSERVATION: {"front": (480, 640, 3)}, PipelineFeatureType.ACTION: {}}
|
||||
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features=initial,
|
||||
use_videos=True,
|
||||
patterns=["action.j1.pos", "action.j2.pos"],
|
||||
def test_dataset_spec_action_with_patterns():
|
||||
"""Action features are filtered by pattern; unmatched keys are excluded."""
|
||||
features = {
|
||||
"action.ee.x": float,
|
||||
"action.ee.y": float,
|
||||
"action.j1.pos": float,
|
||||
"action.j2.pos": float,
|
||||
}
|
||||
out = _features_to_dataset_spec(
|
||||
features, is_action=True, use_videos=True, patterns=["action.j1.pos", "action.j2.pos"]
|
||||
)
|
||||
|
||||
# Expect only ACTION with joint names
|
||||
assert ACTION in out and OBS_STATE not in out
|
||||
assert ACTION in out
|
||||
assert out[ACTION]["dtype"] == "float32"
|
||||
assert set(out[ACTION]["names"]) == {"j1.pos", "j2.pos"}
|
||||
assert out[ACTION]["shape"] == (len(out[ACTION]["names"]),)
|
||||
assert OBS_STATE not in out
|
||||
|
||||
|
||||
def test_aggregate_ee_action_and_observation_with_videos():
|
||||
rp = DataProcessorPipeline([AddActionEEAndJointFeatures(), AddObservationStateFeatures()])
|
||||
initial = {"front": (480, 640, 3), "side": (720, 1280, 3)}
|
||||
def test_dataset_spec_action_and_observation_with_videos():
|
||||
"""EE action + state obs + image obs; all appear with correct dtypes."""
|
||||
action_features = {"action.ee.x": float, "action.ee.y": float}
|
||||
obs_features = {
|
||||
f"{OBS_STATE}.ee.x": float,
|
||||
f"{OBS_STATE}.j1.pos": float,
|
||||
"front": (480, 640, 3),
|
||||
"side": (720, 1280, 3),
|
||||
}
|
||||
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
|
||||
use_videos=True,
|
||||
patterns=["action.ee", OBS_STATE],
|
||||
)
|
||||
act_out = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
|
||||
obs_out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
|
||||
|
||||
# Action should pack only EE names
|
||||
assert ACTION in out
|
||||
assert set(out[ACTION]["names"]) == {"ee.x", "ee.y"}
|
||||
assert out[ACTION]["dtype"] == "float32"
|
||||
assert ACTION in act_out
|
||||
assert set(act_out[ACTION]["names"]) == {"ee.x", "ee.y"}
|
||||
assert act_out[ACTION]["dtype"] == "float32"
|
||||
|
||||
# Observation state should pack both ee.x and j1.pos as a vector
|
||||
assert OBS_STATE in out
|
||||
assert set(out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
|
||||
assert out[OBS_STATE]["dtype"] == "float32"
|
||||
assert OBS_STATE in obs_out
|
||||
assert set(obs_out[OBS_STATE]["names"]) == {"ee.x", "j1.pos"}
|
||||
assert obs_out[OBS_STATE]["dtype"] == "float32"
|
||||
|
||||
# Cameras from initial_features appear as videos
|
||||
for cam in ("front", "side"):
|
||||
for cam, shape in [("front", (480, 640, 3)), ("side", (720, 1280, 3))]:
|
||||
key = f"{OBS_IMAGES}.{cam}"
|
||||
assert key in out
|
||||
assert out[key]["dtype"] == "video"
|
||||
assert out[key]["shape"] == initial[cam]
|
||||
assert out[key]["names"] == ["height", "width", "channels"]
|
||||
assert key in obs_out, f"missing camera key {key}"
|
||||
assert obs_out[key]["dtype"] == "video"
|
||||
assert obs_out[key]["shape"] == shape
|
||||
assert obs_out[key]["names"] == ["height", "width", "channels"]
|
||||
|
||||
|
||||
def test_aggregate_both_action_types():
|
||||
rp = DataProcessorPipeline([AddActionEEAndJointFeatures()])
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: {}},
|
||||
use_videos=True,
|
||||
patterns=["action.ee", "action.j1", "action.j2.pos"],
|
||||
)
|
||||
def test_dataset_spec_all_action_types():
|
||||
"""EE and joint action features are both included when no pattern filter."""
|
||||
features = {
|
||||
"action.ee.x": float,
|
||||
"action.ee.y": float,
|
||||
"action.j1.pos": float,
|
||||
"action.j2.pos": float,
|
||||
}
|
||||
out = _features_to_dataset_spec(features, is_action=True, use_videos=True, patterns=None)
|
||||
|
||||
assert ACTION in out
|
||||
expected = {"ee.x", "ee.y", "j1.pos", "j2.pos"}
|
||||
@@ -2143,58 +2109,40 @@ def test_aggregate_both_action_types():
|
||||
assert out[ACTION]["shape"] == (len(expected),)
|
||||
|
||||
|
||||
def test_aggregate_images_when_use_videos_false():
|
||||
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
|
||||
initial = {"back": (480, 640, 3)}
|
||||
def test_dataset_spec_images_excluded_when_no_videos():
|
||||
"""Image observation features are dropped entirely when use_videos=False."""
|
||||
obs_features = {
|
||||
f"{OBS_STATE}.j1.pos": float,
|
||||
"back": (480, 640, 3),
|
||||
f"{OBS_IMAGES}.front": (240, 320, 3),
|
||||
}
|
||||
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=False)
|
||||
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
|
||||
use_videos=False, # expect "image" dtype
|
||||
patterns=None,
|
||||
)
|
||||
|
||||
key = f"{OBS_IMAGES}.back"
|
||||
key_front = f"{OBS_IMAGES}.front"
|
||||
assert key not in out
|
||||
assert key_front not in out
|
||||
assert f"{OBS_IMAGES}.back" not in out
|
||||
assert f"{OBS_IMAGES}.front" not in out
|
||||
# Non-image state feature is still present
|
||||
assert OBS_STATE in out
|
||||
assert "j1.pos" in out[OBS_STATE]["names"]
|
||||
|
||||
|
||||
def test_aggregate_images_when_use_videos_true():
|
||||
rp = DataProcessorPipeline([AddObservationStateFeatures(add_front_image=True)])
|
||||
initial = {"back": (480, 640, 3)}
|
||||
def test_dataset_spec_images_included_when_use_videos():
|
||||
"""Image features appear as video entries when use_videos=True."""
|
||||
obs_features = {
|
||||
"back": (480, 640, 3),
|
||||
f"{OBS_IMAGES}.front": (240, 320, 3),
|
||||
}
|
||||
out = _features_to_dataset_spec(obs_features, is_action=False, use_videos=True)
|
||||
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features={PipelineFeatureType.OBSERVATION: initial, PipelineFeatureType.ACTION: {}},
|
||||
use_videos=True,
|
||||
patterns=None,
|
||||
)
|
||||
assert f"{OBS_IMAGES}.back" in out
|
||||
assert out[f"{OBS_IMAGES}.back"]["dtype"] == "video"
|
||||
assert out[f"{OBS_IMAGES}.back"]["shape"] == (480, 640, 3)
|
||||
|
||||
key = f"{OBS_IMAGES}.front"
|
||||
key_back = f"{OBS_IMAGES}.back"
|
||||
assert key in out
|
||||
assert key_back in out
|
||||
assert out[key]["dtype"] == "video"
|
||||
assert out[key_back]["dtype"] == "video"
|
||||
assert out[key_back]["shape"] == initial["back"]
|
||||
assert f"{OBS_IMAGES}.front" in out
|
||||
assert out[f"{OBS_IMAGES}.front"]["dtype"] == "video"
|
||||
assert out[f"{OBS_IMAGES}.front"]["shape"] == (240, 320, 3)
|
||||
|
||||
|
||||
def test_initial_camera_not_overridden_by_step_image():
|
||||
# Step explicitly sets a different front image shape; initial has another shape.
|
||||
# aggregate_pipeline_dataset_features should keep the step's value (setdefault behavior on initial cams).
|
||||
rp = DataProcessorPipeline(
|
||||
[AddObservationStateFeatures(add_front_image=True, front_image_shape=(240, 320, 3))]
|
||||
)
|
||||
initial = {"front": (480, 640, 3)} # should NOT override the step-provided (240, 320, 3)
|
||||
|
||||
out = aggregate_pipeline_dataset_features(
|
||||
pipeline=rp,
|
||||
initial_features={PipelineFeatureType.ACTION: {}, PipelineFeatureType.OBSERVATION: initial},
|
||||
use_videos=True,
|
||||
patterns=[f"{OBS_IMAGES}.front"],
|
||||
)
|
||||
|
||||
key = f"{OBS_IMAGES}.front"
|
||||
assert key in out
|
||||
assert out[key]["shape"] == (240, 320, 3) # from the step, not from initial
|
||||
def test_dataset_spec_empty_features_returns_empty():
|
||||
"""Empty feature dict returns an empty output dict."""
|
||||
assert _features_to_dataset_spec({}, is_action=True, use_videos=True) == {}
|
||||
assert _features_to_dataset_spec({}, is_action=False, use_videos=True) == {}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Integration tests for loading robot/teleop pipelines from the Hugging Face Hub.
|
||||
|
||||
These tests require network access and are marked with ``@pytest.mark.integration``.
|
||||
Run with::
|
||||
|
||||
pytest tests/test_pipeline_hub.py -m integration -v
|
||||
|
||||
The tests verify the full end-to-end flow of:
|
||||
1. Loading a pipeline from the Hub via ``RobotProcessorPipeline.from_pretrained(...)``
|
||||
2. Attaching it to a robot or teleoperator via ``set_output_pipeline`` / ``set_input_pipeline``
|
||||
3. Verifying that ``observation_features`` / ``action_features`` differ from the raw versions
|
||||
|
||||
Note: The Hub repos referenced below are placeholders. Update them once actual pipelines
|
||||
are published to the Hub.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ─── Shared mock infrastructure (mirrors test_robot_pipeline.py) ──────────────
|
||||
|
||||
try:
|
||||
from tests.test_robot_pipeline import MockRobot, MockTeleop # type: ignore[import]
|
||||
except ImportError:
|
||||
# Fallback if tests are run from a different working directory
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from test_robot_pipeline import MockRobot, MockTeleop
|
||||
|
||||
|
||||
# ─── Integration tests ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_load_robot_pipeline_from_hub(tmp_path):
|
||||
"""
|
||||
Full end-to-end: load a FK observation pipeline for SO-101 from the Hub,
|
||||
attach it to a robot, and verify that observation_features are transformed.
|
||||
|
||||
Prerequisites:
|
||||
- A pipeline must be published at ``lerobot/so101-fk-observation-pipeline`` on the Hub.
|
||||
- A URDF file must be available locally (update ``local_urdf_path`` to point to it).
|
||||
"""
|
||||
pytest.importorskip("huggingface_hub")
|
||||
from lerobot.processor.pipeline import RobotProcessorPipeline
|
||||
|
||||
local_urdf_path = tmp_path / "so101.urdf"
|
||||
# NOTE: In a real test environment, provide an actual URDF or mock the kinematics.
|
||||
# For now, this test validates the Hub loading mechanism only if a URDF is provided.
|
||||
if not local_urdf_path.exists():
|
||||
pytest.skip("URDF not available; skipping Hub loading test")
|
||||
|
||||
pipeline = RobotProcessorPipeline.from_pretrained(
|
||||
"lerobot/so101-fk-observation-pipeline",
|
||||
overrides={"step_0": {"urdf_path": str(local_urdf_path)}},
|
||||
)
|
||||
robot = MockRobot()
|
||||
robot.set_output_pipeline(pipeline)
|
||||
|
||||
# Pipeline-transformed features should differ from raw features (EE vs joints)
|
||||
assert robot.observation_features != robot.raw_observation_features
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_load_teleop_pipeline_from_hub(tmp_path):
|
||||
"""
|
||||
Full end-to-end: load a FK action pipeline for SO-101 leader from the Hub,
|
||||
attach it to a teleoperator, and verify that action_features are transformed.
|
||||
|
||||
Prerequisites:
|
||||
- A pipeline must be published at ``lerobot/so101-leader-fk-action-pipeline`` on the Hub.
|
||||
- A URDF file must be available locally (update ``local_urdf_path`` to point to it).
|
||||
"""
|
||||
pytest.importorskip("huggingface_hub")
|
||||
from lerobot.processor.pipeline import RobotProcessorPipeline
|
||||
|
||||
local_urdf_path = tmp_path / "so101.urdf"
|
||||
if not local_urdf_path.exists():
|
||||
pytest.skip("URDF not available; skipping Hub loading test")
|
||||
|
||||
pipeline = RobotProcessorPipeline.from_pretrained(
|
||||
"lerobot/so101-leader-fk-action-pipeline",
|
||||
overrides={"step_0": {"urdf_path": str(local_urdf_path)}},
|
||||
)
|
||||
teleop = MockTeleop()
|
||||
teleop.set_output_pipeline(pipeline)
|
||||
|
||||
# Pipeline-transformed features should differ from raw features (EE vs joints)
|
||||
assert teleop.action_features != teleop.raw_action_features
|
||||
@@ -0,0 +1,433 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Unit tests for the robot/teleoperator pipeline interface.
|
||||
|
||||
Tests cover:
|
||||
- Default (identity) pipeline behaviour
|
||||
- Custom pipeline attachment via set_output_pipeline / set_input_pipeline
|
||||
- Auto-derived observation_features / action_features via pipelines
|
||||
- Compatibility checks
|
||||
- build_dataset_features utility
|
||||
"""
|
||||
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from lerobot.configs.types import PipelineFeatureType
|
||||
from lerobot.processor import RobotAction, RobotObservation
|
||||
from lerobot.processor.converters import (
|
||||
observation_to_transition,
|
||||
robot_action_observation_to_transition,
|
||||
robot_action_to_transition,
|
||||
transition_to_observation,
|
||||
transition_to_robot_action,
|
||||
)
|
||||
from lerobot.processor.factory import (
|
||||
_make_identity_feedback_pipeline,
|
||||
_make_identity_observation_pipeline,
|
||||
_make_identity_robot_action_pipeline,
|
||||
_make_identity_teleop_action_pipeline,
|
||||
)
|
||||
from lerobot.processor.pipeline import (
|
||||
IdentityProcessorStep,
|
||||
ObservationProcessorStep,
|
||||
RobotActionProcessorStep,
|
||||
RobotProcessorPipeline,
|
||||
)
|
||||
from lerobot.robots.robot import Robot
|
||||
from lerobot.teleoperators.teleoperator import Teleoperator
|
||||
from lerobot.utils.pipeline_utils import (
|
||||
build_dataset_features,
|
||||
check_action_space_compatibility,
|
||||
check_observation_space_compatibility,
|
||||
)
|
||||
|
||||
|
||||
# ─── Mock hardware classes ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockRobotConfig:
|
||||
id: str = "mock_robot"
|
||||
calibration_dir: Path | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockTeleopConfig:
|
||||
id: str = "mock_teleop"
|
||||
calibration_dir: Path | None = None
|
||||
|
||||
|
||||
_JOINT_NAMES = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
|
||||
_JOINT_FEATURES = {f"{j}.pos": float for j in _JOINT_NAMES}
|
||||
_EE_FEATURES = {"ee.x": float, "ee.y": float, "ee.z": float, "ee.wx": float, "ee.wy": float, "ee.wz": float, "ee.gripper_vel": float}
|
||||
|
||||
|
||||
class MockRobot(Robot):
|
||||
"""Minimal Robot that stores last action for assertion."""
|
||||
|
||||
config_class = MockRobotConfig
|
||||
name = "mock_robot"
|
||||
|
||||
def __init__(self):
|
||||
# bypass filesystem calibration setup; initialize with identity pipelines directly
|
||||
self._output_pipeline = _make_identity_observation_pipeline()
|
||||
self._input_pipeline = _make_identity_robot_action_pipeline()
|
||||
self._last_raw_obs: RobotObservation = {}
|
||||
self._last_sent: RobotAction = {}
|
||||
|
||||
@property
|
||||
def raw_observation_features(self) -> dict:
|
||||
return {**_JOINT_FEATURES, "camera": (480, 640, 3)}
|
||||
|
||||
@property
|
||||
def raw_action_features(self) -> dict:
|
||||
return _JOINT_FEATURES
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return True
|
||||
|
||||
def connect(self, calibrate=True):
|
||||
pass
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return True
|
||||
|
||||
def calibrate(self):
|
||||
pass
|
||||
|
||||
def configure(self):
|
||||
pass
|
||||
|
||||
def _get_observation(self) -> RobotObservation:
|
||||
return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)} | {"camera": None}
|
||||
|
||||
def _send_action(self, action: RobotAction) -> RobotAction:
|
||||
self._last_sent = action
|
||||
return action
|
||||
|
||||
def disconnect(self):
|
||||
pass
|
||||
|
||||
|
||||
class MockTeleop(Teleoperator):
|
||||
"""Minimal Teleoperator."""
|
||||
|
||||
config_class = MockTeleopConfig
|
||||
name = "mock_teleop"
|
||||
|
||||
def __init__(self):
|
||||
# bypass filesystem calibration setup; initialize with identity pipelines directly
|
||||
self._output_pipeline = _make_identity_teleop_action_pipeline()
|
||||
self._input_pipeline = _make_identity_feedback_pipeline()
|
||||
|
||||
@property
|
||||
def raw_action_features(self) -> dict:
|
||||
return _JOINT_FEATURES
|
||||
|
||||
@property
|
||||
def raw_feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return True
|
||||
|
||||
def connect(self, calibrate=True):
|
||||
pass
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return True
|
||||
|
||||
def calibrate(self):
|
||||
pass
|
||||
|
||||
def configure(self):
|
||||
pass
|
||||
|
||||
def _get_action(self) -> RobotAction:
|
||||
return {f"{j}.pos": float(i) for i, j in enumerate(_JOINT_NAMES)}
|
||||
|
||||
def _send_feedback(self, feedback):
|
||||
pass
|
||||
|
||||
def disconnect(self):
|
||||
pass
|
||||
|
||||
|
||||
# ─── Simple transform step (doubles all float values) ────────────────────────
|
||||
|
||||
|
||||
class DoubleActionStep(RobotActionProcessorStep):
|
||||
"""Doubles all float action values."""
|
||||
|
||||
def action(self, action: RobotAction) -> RobotAction:
|
||||
return {k: v * 2 for k, v in action.items()}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
|
||||
class RenameToEEObsStep(ObservationProcessorStep):
|
||||
"""Renames joint obs keys to EE-like keys for testing transform_features."""
|
||||
|
||||
def observation(self, obs: RobotObservation) -> RobotObservation:
|
||||
return {f"ee.{i}": v for i, v in enumerate(obs.values()) if isinstance(v, float)}
|
||||
|
||||
def transform_features(self, features):
|
||||
obs = features.get(PipelineFeatureType.OBSERVATION, {})
|
||||
new_obs = {f"ee.{i}": float for i in range(len([v for v in obs.values() if v == float]))}
|
||||
return {**features, PipelineFeatureType.OBSERVATION: new_obs}
|
||||
|
||||
|
||||
# ─── Tests: Robot pipeline interface ─────────────────────────────────────────
|
||||
|
||||
|
||||
def test_robot_default_pipeline_is_identity():
|
||||
"""With no custom pipeline, get_observation returns the same as _get_observation."""
|
||||
robot = MockRobot()
|
||||
raw = robot._get_observation()
|
||||
obs = robot.get_observation()
|
||||
assert obs == raw
|
||||
|
||||
|
||||
def test_robot_observation_caches_last_raw():
|
||||
"""get_observation caches raw result for IK use in send_action."""
|
||||
robot = MockRobot()
|
||||
robot.get_observation()
|
||||
assert robot._last_raw_obs is not None
|
||||
assert "shoulder_pan.pos" in robot._last_raw_obs
|
||||
|
||||
|
||||
def test_robot_default_send_action_is_identity():
|
||||
"""With no custom pipeline, send_action passes action unchanged to _send_action."""
|
||||
robot = MockRobot()
|
||||
robot.get_observation() # populate _last_raw_obs
|
||||
action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES}
|
||||
sent = robot.send_action(action)
|
||||
assert sent == action
|
||||
assert robot._last_sent == action
|
||||
|
||||
|
||||
def test_robot_custom_output_pipeline_applied():
|
||||
"""A custom action pipeline is applied to the action before _send_action."""
|
||||
robot = MockRobot()
|
||||
double_pipeline = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[DoubleActionStep()],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
robot.set_input_pipeline(double_pipeline)
|
||||
robot.get_observation() # populate _last_raw_obs
|
||||
action = {f"{j}.pos": 1.0 for j in _JOINT_NAMES}
|
||||
robot.send_action(action)
|
||||
assert all(v == 2.0 for v in robot._last_sent.values())
|
||||
|
||||
|
||||
def test_robot_observation_features_identity_matches_raw():
|
||||
"""observation_features equals raw_observation_features with identity pipeline."""
|
||||
robot = MockRobot()
|
||||
assert robot.observation_features == robot.raw_observation_features
|
||||
|
||||
|
||||
def test_robot_raw_observation_features_unchanged_after_pipeline():
|
||||
"""raw_observation_features is unaffected by the output pipeline."""
|
||||
robot = MockRobot()
|
||||
# Even with an FK-like renaming pipeline, raw_observation_features stays the same
|
||||
transform_pipeline = RobotProcessorPipeline[RobotObservation, RobotObservation](
|
||||
steps=[RenameToEEObsStep()],
|
||||
to_transition=observation_to_transition,
|
||||
to_output=transition_to_observation,
|
||||
)
|
||||
robot.set_output_pipeline(transform_pipeline)
|
||||
# raw should still be joints + camera
|
||||
raw = robot.raw_observation_features
|
||||
assert "shoulder_pan.pos" in raw
|
||||
assert "camera" in raw
|
||||
|
||||
|
||||
def test_robot_action_features_identity_matches_raw():
|
||||
"""action_features equals raw_action_features with identity input pipeline."""
|
||||
robot = MockRobot()
|
||||
assert robot.action_features == robot.raw_action_features
|
||||
|
||||
|
||||
def test_robot_raw_action_features_unchanged_after_pipeline():
|
||||
"""raw_action_features is unaffected by any pipeline."""
|
||||
robot = MockRobot()
|
||||
double_pipeline = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
|
||||
steps=[DoubleActionStep()],
|
||||
to_transition=robot_action_observation_to_transition,
|
||||
to_output=transition_to_robot_action,
|
||||
)
|
||||
robot.set_input_pipeline(double_pipeline)
|
||||
assert robot.raw_action_features == _JOINT_FEATURES
|
||||
|
||||
|
||||
def test_robot_set_output_pipeline_replaces_identity():
|
||||
"""set_output_pipeline replaces the default identity."""
|
||||
robot = MockRobot()
|
||||
p = _make_identity_observation_pipeline()
|
||||
robot.set_output_pipeline(p)
|
||||
assert robot._output_pipeline is p
|
||||
|
||||
|
||||
def test_robot_set_input_pipeline_replaces_identity():
|
||||
robot = MockRobot()
|
||||
p = _make_identity_robot_action_pipeline()
|
||||
robot.set_input_pipeline(p)
|
||||
assert robot._input_pipeline is p
|
||||
|
||||
|
||||
# ─── Tests: Teleoperator pipeline interface ───────────────────────────────────
|
||||
|
||||
|
||||
def test_teleop_default_get_action_is_identity():
|
||||
"""With no custom pipeline, get_action returns the same as _get_action."""
|
||||
teleop = MockTeleop()
|
||||
raw = teleop._get_action()
|
||||
action = teleop.get_action()
|
||||
assert action == raw
|
||||
|
||||
|
||||
def test_teleop_action_features_identity_matches_raw():
|
||||
"""action_features equals raw_action_features with identity pipeline."""
|
||||
teleop = MockTeleop()
|
||||
assert teleop.action_features == teleop.raw_action_features
|
||||
|
||||
|
||||
def test_teleop_feedback_features_identity_matches_raw():
|
||||
"""feedback_features equals raw_feedback_features with identity input pipeline."""
|
||||
teleop = MockTeleop()
|
||||
assert teleop.feedback_features == teleop.raw_feedback_features
|
||||
|
||||
|
||||
def test_teleop_feedback_features_empty_when_raw_empty():
|
||||
"""feedback_features returns empty dict when raw_feedback_features is empty."""
|
||||
teleop = MockTeleop()
|
||||
assert teleop.feedback_features == {}
|
||||
|
||||
|
||||
def test_teleop_set_output_pipeline():
|
||||
teleop = MockTeleop()
|
||||
p = _make_identity_teleop_action_pipeline()
|
||||
teleop.set_output_pipeline(p)
|
||||
assert teleop._output_pipeline is p
|
||||
|
||||
|
||||
def test_teleop_send_feedback_calls_send_feedback_impl():
|
||||
"""send_feedback applies identity pipeline and delegates to _send_feedback."""
|
||||
teleop = MockTeleop()
|
||||
received = {}
|
||||
|
||||
def capture(fb):
|
||||
received.update(fb)
|
||||
|
||||
teleop._send_feedback = capture
|
||||
teleop.send_feedback({"key": 1.0})
|
||||
assert received == {"key": 1.0}
|
||||
|
||||
|
||||
# ─── Tests: Compatibility checks ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_check_action_space_compatibility_matching():
|
||||
"""No warning when teleop output and robot action features match."""
|
||||
teleop = MockTeleop()
|
||||
robot = MockRobot()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("error")
|
||||
check_action_space_compatibility(teleop, robot) # should not warn
|
||||
|
||||
|
||||
def test_check_action_space_compatibility_mismatch_warns():
|
||||
"""Warning issued when teleop and robot action features differ."""
|
||||
|
||||
class EETeleop(MockTeleop):
|
||||
@property
|
||||
def raw_action_features(self):
|
||||
return _EE_FEATURES
|
||||
|
||||
teleop = EETeleop()
|
||||
robot = MockRobot() # still returns joint features
|
||||
with pytest.warns(UserWarning, match="Action space mismatch"):
|
||||
check_action_space_compatibility(teleop, robot)
|
||||
|
||||
|
||||
def test_check_observation_space_compatibility_no_feedback():
|
||||
"""No warning when teleop has empty feedback_features."""
|
||||
robot = MockRobot()
|
||||
teleop = MockTeleop()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("error")
|
||||
check_observation_space_compatibility(robot, teleop) # empty feedback → no warning
|
||||
|
||||
|
||||
# ─── Tests: build_dataset_features ───────────────────────────────────────────
|
||||
|
||||
|
||||
def test_build_dataset_features_identity():
|
||||
"""With identity pipelines, dataset features contain joint keys."""
|
||||
robot = MockRobot()
|
||||
teleop = MockTeleop()
|
||||
features = build_dataset_features(robot, teleop, use_videos=False)
|
||||
# Should contain action features (joint names)
|
||||
action_keys = {k for k in features if "action" in k or any(j in k for j in _JOINT_NAMES)}
|
||||
assert len(action_keys) > 0
|
||||
|
||||
|
||||
def test_build_dataset_features_includes_images_when_use_videos_true():
|
||||
"""Image features are included when use_videos=True."""
|
||||
robot = MockRobot()
|
||||
teleop = MockTeleop()
|
||||
feats_with = build_dataset_features(robot, teleop, use_videos=True)
|
||||
feats_without = build_dataset_features(robot, teleop, use_videos=False)
|
||||
# With videos should have more features (camera)
|
||||
assert len(feats_with) >= len(feats_without)
|
||||
|
||||
|
||||
# ─── Tests: Factory identity pipeline helpers ─────────────────────────────────
|
||||
|
||||
|
||||
def test_make_identity_observation_pipeline_is_noop():
|
||||
pipeline = _make_identity_observation_pipeline()
|
||||
obs = {"shoulder_pan.pos": 1.0, "camera": None}
|
||||
result = pipeline(obs)
|
||||
assert result == obs
|
||||
|
||||
|
||||
def test_make_identity_robot_action_pipeline_is_noop():
|
||||
pipeline = _make_identity_robot_action_pipeline()
|
||||
action = {"shoulder_pan.pos": 1.0}
|
||||
obs = {"shoulder_pan.pos": 0.0}
|
||||
result = pipeline((action, obs))
|
||||
assert result == action
|
||||
|
||||
|
||||
def test_make_identity_teleop_action_pipeline_is_noop():
|
||||
pipeline = _make_identity_teleop_action_pipeline()
|
||||
action = {"shoulder_pan.pos": 1.0}
|
||||
result = pipeline(action)
|
||||
assert result == action
|
||||
Reference in New Issue
Block a user