Merge branch 'main' into feature/add-multitask-dit

This commit is contained in:
Bryson Jones
2026-01-28 09:20:43 -08:00
committed by GitHub
37 changed files with 3332 additions and 45 deletions
+2
View File
@@ -103,6 +103,8 @@
title: Earth Rover Mini
- local: omx
title: OMX
- local: openarm
title: OpenArm
title: "Robots"
- sections:
- local: phone_teleop
+276
View File
@@ -0,0 +1,276 @@
# OpenArm
[OpenArm](https://openarm.dev) is an open-source 7DOF humanoid arm designed for physical AI research and deployment.
To get your OpenArm, assembled or DIY, and join the global community, browse verified and certified manufacturers worldwide at [openarm.dev](https://openarm.dev).
## What's Unique?
- **Human-Scale Design**: OpenArm is designed with human-like proportions, scaled for a person around 160-165cm tall. This provides an optimal balance between practical reach and manageable inertia for safe, responsive operation.
- **Safety-First Architecture**: Built with QDD backdrivable motors and high compliance, OpenArm prioritizes safe human-robot interaction while maintaining practical payload capabilities (6.0kg peak / 4.1kg nominal) for real-world tasks.
- **Built for Durability**: Critical structural components use aluminum and stainless steel construction, ensuring robust performance for repetitive data collection and continuous research use.
- **Fully Accessible & Buildable**: Every component, from CNC parts and 3D-printed casings to electrical wiring is designed to be purchasable and buildable by individual researchers and labs, with complete fabrication data provided.
- **Practical & Affordable**: At $6,500 USD for a complete bimanual system, OpenArm delivers research-grade capabilities at a fraction of traditional humanoid robot costs.
## Platform Requirements
<Tip warning={true}>
**Linux Only**: OpenArm currently only works on Linux. The CAN bus USB adapter
does not have macOS drivers and has not been tested on Windows.
</Tip>
## Safety Guide
Before operating OpenArm, please read the [official safety guide](https://docs.openarm.dev/getting-started/safety-guide). Key points:
- **Secure installation**: Fasten the arm to a flat, stable surface with screws or clamps
- **Safe distance**: Keep body parts and objects outside the range of motion during operation
- **Protective equipment**: Always wear safety goggles; use additional PPE as needed
- **Payload limits**: Do not exceed specified payload limits (6.0kg peak / 4.1kg nominal per arm)
- **Emergency stop**: Know the location and operation of the emergency stop device
- **Regular inspection**: Check for loose screws, damaged mechanical limits, unusual noises, and wiring damage
## Hardware Setup
Follow the official [OpenArm hardware documentation](https://docs.openarm.dev) for:
- Bill of materials and sourcing
- 3D printing instructions
- Mechanical assembly
- Electrical wiring
The hardware repositories are available at [github.com/enactic/openarm](https://github.com/enactic/openarm).
## CAN Bus Setup
OpenArm uses CAN bus communication with Damiao motors. Once you have the CAN bus USB adapter plugged into your Linux PC, follow the [Damiao Motors and CAN Bus guide](./damiao) to configure the interface.
Quick setup:
```bash
# Setup CAN interfaces
lerobot-setup-can --mode=setup --interfaces=can0,can1
# Test motor communication
lerobot-setup-can --mode=test --interfaces=can0,can1
```
## Install LeRobot 🤗
Follow our [Installation Guide](./installation), then install the Damiao motor support:
```bash
pip install -e ".[damiao]"
```
## Usage
### Follower Arm (Robot)
<hfoptions id="follower">
<hfoption id="Command">
```bash
lerobot-calibrate \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_openarm_follower
```
</hfoption>
<hfoption id="API example">
```python
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
config = OpenArmFollowerConfig(
port="can0",
side="right", # or "left" for left arm
id="my_openarm_follower",
)
follower = OpenArmFollower(config)
follower.connect()
# Read current state
obs = follower.get_observation()
print(obs)
# Send action (position in degrees)
action = {
"joint_1.pos": 0.0,
"joint_2.pos": 0.0,
"joint_3.pos": 0.0,
"joint_4.pos": 45.0,
"joint_5.pos": 0.0,
"joint_6.pos": 0.0,
"joint_7.pos": 0.0,
"gripper.pos": 0.0,
}
follower.send_action(action)
follower.disconnect()
```
</hfoption>
</hfoptions>
### Leader Arm (Teleoperator)
The leader arm is used for teleoperation - manually moving it to control the follower arm.
<hfoptions id="leader">
<hfoption id="Command">
```bash
lerobot-calibrate \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_openarm_leader
```
</hfoption>
<hfoption id="API example">
```python
from lerobot.teleoperators.openarm_leader import OpenArmLeader, OpenArmLeaderConfig
config = OpenArmLeaderConfig(
port="can1",
id="my_openarm_leader",
manual_control=True, # Disable torque for manual movement
)
leader = OpenArmLeader(config)
leader.connect()
# Read current position (as action to send to follower)
action = leader.get_action()
print(action)
leader.disconnect()
```
</hfoption>
</hfoptions>
### Teleoperation
To teleoperate OpenArm with leader-follower control:
```bash
lerobot-teleoperate \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_follower \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_leader
```
### Bimanual Teleoperation
To teleoperate a bimanual OpenArm setup with two leader and two follower arms:
```bash
lerobot-teleoperate \
--robot.type=bi_openarm_follower \
--robot.left_arm_config.port=can0 \
--robot.left_arm_config.side=left \
--robot.right_arm_config.port=can1 \
--robot.right_arm_config.side=right \
--robot.id=my_bimanual_follower \
--teleop.type=bi_openarm_leader \
--teleop.left_arm_config.port=can2 \
--teleop.right_arm_config.port=can3 \
--teleop.id=my_bimanual_leader
```
### Recording Data
To record a dataset during teleoperation:
```bash
lerobot-record \
--robot.type=openarm_follower \
--robot.port=can0 \
--robot.side=right \
--robot.id=my_follower \
--teleop.type=openarm_leader \
--teleop.port=can1 \
--teleop.id=my_leader \
--repo-id=my_hf_username/my_openarm_dataset \
--fps=30 \
--num-episodes=10
```
## Configuration Options
### Follower Configuration
| Parameter | Default | Description |
| --------------------- | --------- | ---------------------------------------------------------- |
| `port` | - | CAN interface (e.g., `can0`) |
| `side` | `None` | Arm side: `"left"`, `"right"`, or `None` for custom limits |
| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
| `max_relative_target` | `None` | Safety limit for relative target positions |
| `position_kp` | Per-joint | Position control proportional gains |
| `position_kd` | Per-joint | Position control derivative gains |
### Leader Configuration
| Parameter | Default | Description |
| ------------------ | --------- | ----------------------------------- |
| `port` | - | CAN interface (e.g., `can1`) |
| `manual_control` | `True` | Disable torque for manual movement |
| `use_can_fd` | `True` | Enable CAN FD for higher data rates |
| `can_bitrate` | `1000000` | Nominal bitrate (1 Mbps) |
| `can_data_bitrate` | `5000000` | CAN FD data bitrate (5 Mbps) |
## Motor Configuration
OpenArm uses Damiao motors with the following default configuration:
| Joint | Motor Type | Send ID | Recv ID |
| --------------------------- | ---------- | ------- | ------- |
| joint_1 (Shoulder pan) | DM8009 | 0x01 | 0x11 |
| joint_2 (Shoulder lift) | DM8009 | 0x02 | 0x12 |
| joint_3 (Shoulder rotation) | DM4340 | 0x03 | 0x13 |
| joint_4 (Elbow flex) | DM4340 | 0x04 | 0x14 |
| joint_5 (Wrist roll) | DM4310 | 0x05 | 0x15 |
| joint_6 (Wrist pitch) | DM4310 | 0x06 | 0x16 |
| joint_7 (Wrist rotation) | DM4310 | 0x07 | 0x17 |
| gripper | DM4310 | 0x08 | 0x18 |
## Troubleshooting
### No Response from Motors
1. Check power supply connections
2. Verify CAN wiring (CAN-H, CAN-L, GND)
3. Run diagnostics: `lerobot-setup-can --mode=test --interfaces=can0`
4. See the [Damiao troubleshooting guide](./damiao#troubleshooting) for more details
### CAN Interface Not Found
Ensure the CAN interface is configured:
```bash
ip link show can0
```
## Resources
- [OpenArm Website](https://openarm.dev)
- [OpenArm Documentation](https://docs.openarm.dev)
- [OpenArm GitHub](https://github.com/enactic/openarm)
- [Safety Guide](https://docs.openarm.dev/getting-started/safety-guide)
- [Damiao Motors and CAN Bus](./damiao)
+99 -1
View File
@@ -188,7 +188,105 @@ Press `Ctrl+C` to stop the policy.
## Running in Simulation Mode (MuJoCo)
You can now test policies before unleashing them on the physical robot using MuJoCo. To do so simply set `is_simulation=True` in config.
You can test policies before deploying on the physical robot using MuJoCo simulation. Set `is_simulation=True` in config or pass `--robot.is_simulation=true` via CLI.
### Calibrate Exoskeleton Teleoperator
```bash
lerobot-calibrate \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo
```
### Teleoperate in Simulation
```bash
lerobot-teleoperate \
--robot.type=unitree_g1 \
--robot.is_simulation=true \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--fps=100
```
### Record Dataset in Simulation
```bash
python -m lerobot.scripts.lerobot_record \
--robot.type=unitree_g1 \
--robot.is_simulation=true \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--dataset.repo_id=your-username/dataset-name \
--dataset.single_task="Test" \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
```
Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim)
---
## Running on Real Robot
Once the robot server is running on the G1 (see Part 3), you can teleoperate and record on the real robot.
### Start the Camera Server
On the robot, start the ZMQ image server:
```bash
python src/lerobot/cameras/zmq/image_server.py
```
Keep this running in a separate terminal for camera streaming during recording.
### Teleoperate Real Robot
```bash
lerobot-teleoperate \
--robot.type=unitree_g1 \
--robot.is_simulation=false \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--fps=100
```
### Record Dataset on Real Robot
```bash
python -m lerobot.scripts.lerobot_record \
--robot.type=unitree_g1 \
--robot.is_simulation=false \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "172.18.129.215", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
--teleop.type=unitree_g1 \
--teleop.left_arm_config.port=/dev/ttyACM1 \
--teleop.right_arm_config.port=/dev/ttyACM0 \
--teleop.id=exo \
--dataset.repo_id=your-username/dataset-name \
--dataset.single_task="Test" \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
```
**Note**: Update `server_address` to match your robot's camera server IP.
Example real robot dataset: [nepyope/teleop_test_real](https://huggingface.co/datasets/nepyope/teleop_test_real)
---
## Additional Resources
+6 -1
View File
@@ -105,12 +105,17 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"]
# Robots
openarms = ["lerobot[damiao]"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
unitree_g1 = [
"pyzmq>=26.2.1,<28.0.0",
"onnxruntime>=1.16.0,<2.0.0"
"onnxruntime>=1.16.0,<2.0.0",
"pin>=3.0.0,<4.0.0",
"meshcat>=0.3.0,<0.4.0",
"matplotlib>=3.9.0,<4.0.0",
"casadi>=3.6.0,<4.0.0",
]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
+80 -16
View File
@@ -116,6 +116,9 @@ def update_meta_data(
Adjusts all indices and timestamps to account for previously aggregated
data and videos in the destination dataset.
For data file indices, uses the 'src_to_dst' mapping from aggregate_data()
to correctly map source file indices to their destination locations.
Args:
df: DataFrame containing the metadata to be updated.
dst_meta: Destination dataset metadata.
@@ -129,6 +132,48 @@ def update_meta_data(
df["meta/episodes/chunk_index"] = df["meta/episodes/chunk_index"] + meta_idx["chunk"]
df["meta/episodes/file_index"] = df["meta/episodes/file_index"] + meta_idx["file"]
# Update data file indices using source-to-destination mapping
# This is critical for handling datasets that are already results of a merge
data_src_to_dst = data_idx.get("src_to_dst", {})
if data_src_to_dst:
# Store original indices for lookup
df["_orig_data_chunk"] = df["data/chunk_index"].copy()
df["_orig_data_file"] = df["data/file_index"].copy()
# Vectorized mapping from (src_chunk, src_file) to (dst_chunk, dst_file)
# This is much faster than per-row iteration for large metadata tables
mapping_index = pd.MultiIndex.from_tuples(
list(data_src_to_dst.keys()),
names=["chunk_index", "file_index"],
)
mapping_values = list(data_src_to_dst.values())
mapping_df = pd.DataFrame(
mapping_values,
index=mapping_index,
columns=["dst_chunk", "dst_file"],
)
# Construct a MultiIndex for each row based on original data indices
row_index = pd.MultiIndex.from_arrays(
[df["_orig_data_chunk"], df["_orig_data_file"]],
names=["chunk_index", "file_index"],
)
# Align mapping to rows; missing keys fall back to the default destination
reindexed = mapping_df.reindex(row_index)
reindexed[["dst_chunk", "dst_file"]] = reindexed[["dst_chunk", "dst_file"]].fillna(
{"dst_chunk": data_idx["chunk"], "dst_file": data_idx["file"]}
)
# Assign mapped destination indices back to the DataFrame
df["data/chunk_index"] = reindexed["dst_chunk"].to_numpy()
df["data/file_index"] = reindexed["dst_file"].to_numpy()
# Clean up temporary columns
df = df.drop(columns=["_orig_data_chunk", "_orig_data_file"])
else:
# Fallback to simple offset (backward compatibility for single-file sources)
df["data/chunk_index"] = df["data/chunk_index"] + data_idx["chunk"]
df["data/file_index"] = df["data/file_index"] + data_idx["file"]
for key, video_idx in videos_idx.items():
@@ -146,8 +191,7 @@ def update_meta_data(
if src_to_dst:
# Map each episode to its correct destination file and apply offset
for idx in df.index:
# Convert to Python int to avoid numpy type mismatch in dict lookup
src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"]))
src_key = (df.at[idx, "_orig_chunk"], df.at[idx, "_orig_file"])
# Get destination chunk/file for this source file
dst_chunk, dst_file = src_to_dst.get(src_key, (video_idx["chunk"], video_idx["file"]))
@@ -163,8 +207,7 @@ def update_meta_data(
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
for idx in df.index:
# Convert to Python int to avoid numpy type mismatch in dict lookup
src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"]))
src_key = (df.at[idx, "_orig_chunk"], df.at[idx, "_orig_file"])
offset = src_to_offset.get(src_key, 0)
df.at[idx, f"videos/{key}/from_timestamp"] += offset
df.at[idx, f"videos/{key}/to_timestamp"] += offset
@@ -262,6 +305,10 @@ def aggregate_datasets(
meta_idx = aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx)
# Clear the src_to_dst mapping after processing each source dataset
# to avoid interference between different source datasets
data_idx.pop("src_to_dst", None)
dst_meta.info["total_episodes"] += src_meta.total_episodes
dst_meta.info["total_frames"] += src_meta.total_frames
@@ -312,10 +359,6 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
dst_file_durations = video_idx["dst_file_durations"]
for src_chunk_idx, src_file_idx in unique_chunk_file_pairs:
# Convert to Python int to ensure consistent dict keys
src_chunk_idx = int(src_chunk_idx)
src_file_idx = int(src_file_idx)
src_path = src_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=src_chunk_idx,
@@ -388,10 +431,16 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
Reads source data files, updates indices to match the aggregated dataset,
and writes them to the destination with proper file rotation.
Tracks a `src_to_dst` mapping from source (chunk, file) to destination (chunk, file)
which is critical for correctly updating episode metadata when source datasets
have multiple data files (e.g., from a previous merge operation).
Args:
src_meta: Source dataset metadata.
dst_meta: Destination dataset metadata.
data_idx: Dictionary tracking data chunk and file indices.
data_files_size_in_mb: Maximum size for data files in MB.
chunk_size: Maximum number of files per chunk.
Returns:
dict: Updated data_idx with current chunk and file indices.
@@ -409,6 +458,10 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
# retrieve features schema for proper image typing in parquet
hf_features = get_hf_features_from_features(dst_meta.features) if contains_images else None
# Track source to destination file mapping for metadata update
# This is critical for handling datasets that are already results of a merge
src_to_dst: dict[tuple[int, int], tuple[int, int]] = {}
for src_chunk_idx, src_file_idx in unique_chunk_file_ids:
src_path = src_meta.root / DEFAULT_DATA_PATH.format(
chunk_index=src_chunk_idx, file_index=src_file_idx
@@ -421,7 +474,9 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
df = pd.read_parquet(src_path)
df = update_data_df(df, src_meta, dst_meta)
data_idx = append_or_create_parquet_file(
# Write data and get the actual destination file it was written to
# This avoids duplicating the rotation logic here
data_idx, (dst_chunk, dst_file) = append_or_create_parquet_file(
df,
src_path,
data_idx,
@@ -433,6 +488,12 @@ def aggregate_data(src_meta, dst_meta, data_idx, data_files_size_in_mb, chunk_si
hf_features=hf_features,
)
# Record the mapping from source to actual destination
src_to_dst[(src_chunk_idx, src_file_idx)] = (dst_chunk, dst_file)
# Add the mapping to data_idx for use in metadata update
data_idx["src_to_dst"] = src_to_dst
return data_idx
@@ -473,7 +534,7 @@ def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx):
videos_idx,
)
meta_idx = append_or_create_parquet_file(
meta_idx, _ = append_or_create_parquet_file(
df,
src_path,
meta_idx,
@@ -501,7 +562,7 @@ def append_or_create_parquet_file(
contains_images: bool = False,
aggr_root: Path = None,
hf_features: datasets.Features | None = None,
):
) -> tuple[dict[str, int], tuple[int, int]]:
"""Appends data to an existing parquet file or creates a new one based on size constraints.
Manages file rotation when size limits are exceeded to prevent individual files
@@ -519,9 +580,11 @@ def append_or_create_parquet_file(
hf_features: Optional HuggingFace Features schema for proper image typing.
Returns:
dict: Updated index dictionary with current chunk and file indices.
tuple: (updated_idx, (dst_chunk, dst_file)) where updated_idx is the index dict
and (dst_chunk, dst_file) is the actual destination file the data was written to.
"""
dst_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"])
dst_chunk, dst_file = idx["chunk"], idx["file"]
dst_path = aggr_root / default_path.format(chunk_index=dst_chunk, file_index=dst_file)
if not dst_path.exists():
dst_path.parent.mkdir(parents=True, exist_ok=True)
@@ -529,14 +592,15 @@ def append_or_create_parquet_file(
to_parquet_with_hf_images(df, dst_path, features=hf_features)
else:
df.to_parquet(dst_path)
return idx
return idx, (dst_chunk, dst_file)
src_size = get_parquet_file_size_in_mb(src_path)
dst_size = get_parquet_file_size_in_mb(dst_path)
if dst_size + src_size >= max_mb:
idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size)
new_path = aggr_root / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"])
dst_chunk, dst_file = idx["chunk"], idx["file"]
new_path = aggr_root / default_path.format(chunk_index=dst_chunk, file_index=dst_file)
new_path.parent.mkdir(parents=True, exist_ok=True)
final_df = df
target_path = new_path
@@ -555,7 +619,7 @@ def append_or_create_parquet_file(
else:
final_df.to_parquet(target_path)
return idx
return idx, (dst_chunk, dst_file)
def finalize_aggregation(aggr_meta, all_metadata):
+38 -13
View File
@@ -28,8 +28,11 @@ from lerobot.utils.import_utils import _can_available
if TYPE_CHECKING or _can_available:
import can
else:
can.Message = object
can.interface = None
class can: # noqa: N801
Message = object
interface = None
import numpy as np
@@ -206,11 +209,31 @@ class DamiaoMotorsBus(MotorsBusBase):
Raises ConnectionError if any motor fails to respond.
"""
logger.info("Starting handshake with motors...")
missing_motors = []
# Drain any pending messages
while self.canbus.recv(timeout=0.01):
pass
missing_motors = []
for motor_name in self.motors:
msg = self._refresh_motor(motor_name)
if msg is None:
motor_id = self._get_motor_id(motor_name)
recv_id = self._get_motor_recv_id(motor_name)
# Send enable command
data = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, CAN_CMD_ENABLE]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
# Wait for response with longer timeout
response = None
start_time = time.time()
while time.time() - start_time < 0.1:
response = self.canbus.recv(timeout=0.1)
if response and response.arbitration_id == recv_id:
break
response = None
if response is None:
missing_motors.append(motor_name)
else:
self._process_response(motor_name, msg)
@@ -259,7 +282,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_name = self._get_motor_name(motor)
recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg)
@@ -317,7 +340,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_id = self._get_motor_id(motor)
recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id)
@@ -439,7 +462,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
recv_id = self._get_motor_recv_id(motor)
@@ -472,7 +495,7 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_type = self._motor_types[motor_name]
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
recv_id_to_motor[self._get_motor_recv_id(motor)] = motor_name
@@ -637,10 +660,10 @@ class DamiaoMotorsBus(MotorsBusBase):
for motor in motors:
motor_id = self._get_motor_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False)
msg = can.Message(
arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd
)
self.canbus.send(msg)
# Small delay to reduce bus congestion if necessary, though removed in sync_read previously
# precise_sleep(PRECISE_SLEEP_SEC)
# Collect responses
expected_recv_ids = [self._get_motor_recv_id(m) for m in motors]
@@ -676,7 +699,9 @@ class DamiaoMotorsBus(MotorsBusBase):
kd = self._gains[motor]["kd"]
data = self._encode_mit_packet(motor_type, kp, kd, float(value_degrees), 0.0, 0.0)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)
msg = can.Message(
arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd
)
self.canbus.send(msg)
precise_sleep(PRECISE_TIMEOUT_SEC)
+7 -5
View File
@@ -18,16 +18,18 @@
import math
import time
from dataclasses import dataclass
from typing import Any, Protocol, TypeVar, runtime_checkable
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable
import numpy as np
import torch
import torchvision.transforms.functional as F # noqa: N812
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.teleoperators.teleoperator import Teleoperator
from lerobot.teleoperators.utils import TeleopEvents
if TYPE_CHECKING:
from lerobot.teleoperators.teleoperator import Teleoperator
from .core import EnvTransition, PolicyAction, TransitionKey
from .pipeline import (
ComplementaryDataProcessorStep,
@@ -69,10 +71,10 @@ class HasTeleopEvents(Protocol):
# Type variable constrained to Teleoperator subclasses that also implement events
TeleopWithEvents = TypeVar("TeleopWithEvents", bound=Teleoperator)
TeleopWithEvents = TypeVar("TeleopWithEvents", bound="Teleoperator")
def _check_teleop_with_events(teleop: Teleoperator) -> None:
def _check_teleop_with_events(teleop: "Teleoperator") -> None:
"""
Runtime check that a teleoperator implements the `HasTeleopEvents` protocol.
@@ -103,7 +105,7 @@ class AddTeleopActionAsComplimentaryDataStep(ComplementaryDataProcessorStep):
teleop_device: The teleoperator instance to get the action from.
"""
teleop_device: Teleoperator
teleop_device: "Teleoperator"
def complementary_data(self, complementary_data: dict) -> dict:
"""
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .bi_openarm_follower import BiOpenArmFollower
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
__all__ = ["BiOpenArmFollower", "BiOpenArmFollowerConfig"]
@@ -0,0 +1,175 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
from ..robot import Robot
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
logger = logging.getLogger(__name__)
class BiOpenArmFollower(Robot):
"""
Bimanual OpenArm Follower Arms
"""
config_class = BiOpenArmFollowerConfig
name = "bi_openarm_follower"
def __init__(self, config: BiOpenArmFollowerConfig):
super().__init__(config)
self.config = config
left_arm_config = OpenArmFollowerConfig(
id=f"{config.id}_left" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.left_arm_config.port,
disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect,
max_relative_target=config.left_arm_config.max_relative_target,
cameras=config.left_arm_config.cameras,
side=config.left_arm_config.side,
can_interface=config.left_arm_config.can_interface,
use_can_fd=config.left_arm_config.use_can_fd,
can_bitrate=config.left_arm_config.can_bitrate,
can_data_bitrate=config.left_arm_config.can_data_bitrate,
motor_config=config.left_arm_config.motor_config,
position_kd=config.left_arm_config.position_kd,
position_kp=config.left_arm_config.position_kp,
joint_limits=config.left_arm_config.joint_limits,
)
right_arm_config = OpenArmFollowerConfig(
id=f"{config.id}_right" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.right_arm_config.port,
disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect,
max_relative_target=config.right_arm_config.max_relative_target,
cameras=config.right_arm_config.cameras,
side=config.right_arm_config.side,
can_interface=config.right_arm_config.can_interface,
use_can_fd=config.right_arm_config.use_can_fd,
can_bitrate=config.right_arm_config.can_bitrate,
can_data_bitrate=config.right_arm_config.can_data_bitrate,
motor_config=config.right_arm_config.motor_config,
position_kd=config.right_arm_config.position_kd,
position_kp=config.right_arm_config.position_kp,
joint_limits=config.right_arm_config.joint_limits,
)
self.left_arm = OpenArmFollower(left_arm_config)
self.right_arm = OpenArmFollower(right_arm_config)
# Only for compatibility with other parts of the codebase that expect a `robot.cameras` attribute
self.cameras = {**self.left_arm.cameras, **self.right_arm.cameras}
@property
def _motors_ft(self) -> dict[str, type]:
left_arm_motors_ft = self.left_arm._motors_ft
right_arm_motors_ft = self.right_arm._motors_ft
return {
**{f"left_{k}": v for k, v in left_arm_motors_ft.items()},
**{f"right_{k}": v for k, v in right_arm_motors_ft.items()},
}
@property
def _cameras_ft(self) -> dict[str, tuple]:
left_arm_cameras_ft = self.left_arm._cameras_ft
right_arm_cameras_ft = self.right_arm._cameras_ft
return {
**{f"left_{k}": v for k, v in left_arm_cameras_ft.items()},
**{f"right_{k}": v for k, v in right_arm_cameras_ft.items()},
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def calibrate(self) -> None:
self.left_arm.calibrate()
self.right_arm.calibrate()
def configure(self) -> None:
self.left_arm.configure()
self.right_arm.configure()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
left_obs = self.left_arm.get_observation()
obs_dict.update({f"left_{key}": value for key, value in left_obs.items()})
# Add "right_" prefix
right_obs = self.right_arm.get_observation()
obs_dict.update({f"right_{key}": value for key, value in right_obs.items()})
return obs_dict
def send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
custom_kd: dict[str, float] | None = None,
) -> RobotAction:
# Remove "left_" prefix
left_action = {
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
}
# Remove "right_" prefix
right_action = {
key.removeprefix("right_"): value for key, value in action.items() if key.startswith("right_")
}
sent_action_left = self.left_arm.send_action(left_action, custom_kp, custom_kd)
sent_action_right = self.right_arm.send_action(right_action, custom_kp, custom_kd)
# Add prefixes back
prefixed_sent_action_left = {f"left_{key}": value for key, value in sent_action_left.items()}
prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()}
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from lerobot.robots.openarm_follower import OpenArmFollowerConfigBase
from ..config import RobotConfig
@RobotConfig.register_subclass("bi_openarm_follower")
@dataclass
class BiOpenArmFollowerConfig(RobotConfig):
"""Configuration class for Bi OpenArm Follower robots."""
left_arm_config: OpenArmFollowerConfigBase
right_arm_config: OpenArmFollowerConfigBase
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_follower import OpenArmFollowerConfig, OpenArmFollowerConfigBase
from .openarm_follower import OpenArmFollower
__all__ = ["OpenArmFollower", "OpenArmFollowerConfig", "OpenArmFollowerConfigBase"]
@@ -0,0 +1,122 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from ..config import RobotConfig
LEFT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
"joint_1": (-75.0, 75.0),
"joint_2": (-90.0, 9.0),
"joint_3": (-85.0, 85.0),
"joint_4": (0.0, 135.0),
"joint_5": (-85.0, 85.0),
"joint_6": (-40.0, 40.0),
"joint_7": (-80.0, 80.0),
"gripper": (-65.0, 0.0),
}
RIGHT_DEFAULT_JOINTS_LIMITS: dict[str, tuple[float, float]] = {
"joint_1": (-75.0, 75.0),
"joint_2": (-9.0, 90.0),
"joint_3": (-85.0, 85.0),
"joint_4": (0.0, 135.0),
"joint_5": (-85.0, 85.0),
"joint_6": (-40.0, 40.0),
"joint_7": (-80.0, 80.0),
"gripper": (-65.0, 0.0),
}
@dataclass
class OpenArmFollowerConfigBase:
"""Base configuration for the OpenArms follower robot with Damiao motors."""
# CAN interfaces - one per arm
# arm CAN interface (e.g., "can1")
# Linux: "can0", "can1", etc.
port: str
# side of the arm: "left" or "right". If "None" default values will be used
side: str | None = None
# CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Whether to disable torque when disconnecting
disable_torque_on_disconnect: bool = True
# Safety limit for relative target positions
# Set to a positive scalar for all motors, or a dict mapping motor names to limits
max_relative_target: float | dict[str, float] | None = None
# Camera configurations
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: dict[str, tuple[int, int, str]] = field(
default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
}
)
# MIT control parameters for position control (used in send_action)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(
default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 25.0]
)
position_kd: list[float] = field(default_factory=lambda: [5.0, 5.0, 3.0, 5.0, 0.3, 0.3, 0.3, 0.3])
# Values for joint limits. Can be overridden via CLI (for custom values) or by setting config.side to either 'left' or 'right'.
# If config.side is left set to None and no CLI values are passed, the default joint limit values are small for safety.
joint_limits: dict[str, tuple[float, float]] = field(
default_factory=lambda: {
"joint_1": (-5.0, 5.0),
"joint_2": (-5.0, 5.0),
"joint_3": (-5.0, 5.0),
"joint_4": (0.0, 5.0),
"joint_5": (-5.0, 5.0),
"joint_6": (-5.0, 5.0),
"joint_7": (-5.0, 5.0),
"gripper": (-5.0, 0.0),
}
)
@RobotConfig.register_subclass("openarm_follower")
@dataclass
class OpenArmFollowerConfig(RobotConfig, OpenArmFollowerConfigBase):
pass
@@ -0,0 +1,348 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction, RobotObservation
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_openarm_follower import (
LEFT_DEFAULT_JOINTS_LIMITS,
RIGHT_DEFAULT_JOINTS_LIMITS,
OpenArmFollowerConfig,
)
logger = logging.getLogger(__name__)
class OpenArmFollower(Robot):
"""
OpenArms Follower Robot which uses CAN bus communication to control 7 DOF arm with a gripper.
The arm uses Damiao motors in MIT control mode.
"""
config_class = OpenArmFollowerConfig
name = "openarm_follower"
def __init__(self, config: OpenArmFollowerConfig):
super().__init__(config)
self.config = config
# Arm motors
motors: dict[str, Motor] = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
motor = Motor(
send_id, motor_type_str, MotorNormMode.DEGREES
) # Always use degrees for Damiao motors
motor.recv_id = recv_id
motor.motor_type_str = motor_type_str
motors[motor_name] = motor
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
if config.side is not None:
if config.side == "left":
config.joint_limits = LEFT_DEFAULT_JOINTS_LIMITS
elif config.side == "right":
config.joint_limits = RIGHT_DEFAULT_JOINTS_LIMITS
else:
raise ValueError(
"config.side must be either 'left', 'right' (for default values) or 'None' (for CLI values)"
)
else:
logger.info(
"Set config.side to either 'left' or 'right' to use pre-configured values for joint limits."
)
logger.info(f"Values used for joint limits: {config.joint_limits}.")
# Initialize cameras
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
"""Motor features for observation and action spaces."""
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float # Add this
features[f"{motor}.torque"] = float # Add this
return features
@property
def _cameras_ft(self) -> dict[str, tuple]:
"""Camera features for observation space."""
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
"""Combined observation features from motors and cameras."""
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
"""Action features."""
return self._motors_ft
@property
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
We assume that at connection time, the arms are in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
if self.is_calibrated:
self.bus.set_zero_position()
self.bus.enable_torque()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if robot is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms robot.
The calibration procedure:
1. Disable torque
2. Ask user to position arms in hanging position with grippers closed
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
self.bus.disable_torque()
# Step 1: Set zero position
input(
"\nCalibration: Set Zero Position)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Arm zero position set.")
logger.info("Setting range: -90° to +90° for safety by default for all joints")
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=0,
range_min=-90,
range_max=90,
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
"""Configure motors with appropriate settings."""
# TODO(Steven, Pepijn): Slightly different from what it is happening in the leader
with self.bus.torque_disabled():
self.bus.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
Reads all motor states (pos/vel/torque) in one CAN refresh cycle
instead of 3 separate reads.
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict: dict[str, Any] = {}
states = self.bus.sync_read_all_states()
for motor in self.bus.motors:
state = states.get(motor, {})
obs_dict[f"{motor}.pos"] = state.get("position", 0.0)
obs_dict[f"{motor}.vel"] = state.get("velocity", 0.0)
obs_dict[f"{motor}.torque"] = state.get("torque", 0.0)
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} get_observation took: {dt_ms:.1f}ms")
return obs_dict
def send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,
custom_kd: dict[str, float] | None = None,
) -> RobotAction:
"""
Send action command to robot.
The action magnitude may be clipped based on safety limits.
Args:
action: Dictionary with motor positions (e.g., "joint_1.pos", "joint_2.pos")
custom_kp: Optional custom kp gains per motor (e.g., {"joint_1": 120.0, "joint_2": 150.0})
custom_kd: Optional custom kd gains per motor (e.g., {"joint_1": 1.5, "joint_2": 2.0})
Returns:
The action actually sent (potentially clipped)
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Apply joint limit clipping to arm
for motor_name, position in goal_pos.items():
if motor_name in self.config.joint_limits:
min_limit, max_limit = self.config.joint_limits[motor_name]
clipped_position = max(min_limit, min(max_limit, position))
if clipped_position != position:
logger.debug(f"Clipped {motor_name} from {position:.2f}° to {clipped_position:.2f}°")
goal_pos[motor_name] = clipped_position
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# TODO(Steven, Pepijn): Refactor writing
# Motor name to index mapping for gains
motor_index = {
"joint_1": 0,
"joint_2": 1,
"joint_3": 2,
"joint_4": 3,
"joint_5": 4,
"joint_6": 5,
"joint_7": 6,
"gripper": 7,
}
# Use batch MIT control for arm (sends all commands, then collects responses)
commands = {}
for motor_name, position_degrees in goal_pos.items():
idx = motor_index.get(motor_name, 0)
# Use custom gains if provided, otherwise use config defaults
if custom_kp is not None and motor_name in custom_kp:
kp = custom_kp[motor_name]
else:
kp = (
self.config.position_kp[idx]
if isinstance(self.config.position_kp, list)
else self.config.position_kp
)
if custom_kd is not None and motor_name in custom_kd:
kd = custom_kd[motor_name]
else:
kd = (
self.config.position_kd[idx]
if isinstance(self.config.position_kd, list)
else self.config.position_kd
)
commands[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
self.bus._mit_control_batch(commands)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
# Disconnect cameras
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")
@@ -65,3 +65,6 @@ class UnitreeG1Config(RobotConfig):
# Cameras (ZMQ-based remote cameras)
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Compensates for gravity on the unitree's arms using the arm ik solver
gravity_compensation: bool = False
+1 -1
View File
@@ -18,7 +18,7 @@ from enum import IntEnum
# ruff: noqa: N801, N815
NUM_MOTORS = 35
NUM_MOTORS = 29
class G1_29_JointArmIndex(IntEnum):
@@ -0,0 +1,313 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import numpy as np
logger = logging.getLogger(__name__)
parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent2_dir)
class WeightedMovingFilter:
def __init__(self, weights, data_size=14):
self._window_size = len(weights)
self._weights = np.array(weights)
self._data_size = data_size
self._filtered_data = np.zeros(self._data_size)
self._data_queue = []
def _apply_filter(self):
if len(self._data_queue) < self._window_size:
return self._data_queue[-1]
data_array = np.array(self._data_queue)
temp_filtered_data = np.zeros(self._data_size)
for i in range(self._data_size):
temp_filtered_data[i] = np.convolve(data_array[:, i], self._weights, mode="valid")[-1]
return temp_filtered_data
def add_data(self, new_data):
assert len(new_data) == self._data_size
if len(self._data_queue) > 0 and np.array_equal(
new_data, self._data_queue[-1]
): # skip duplicate data
return
if len(self._data_queue) >= self._window_size:
self._data_queue.pop(0)
self._data_queue.append(new_data)
self._filtered_data = self._apply_filter()
@property
def filtered_data(self):
return self._filtered_data
class G1_29_ArmIK: # noqa: N801
def __init__(self, unit_test=False):
import casadi
import pinocchio as pin
from huggingface_hub import snapshot_download
from pinocchio import casadi as cpin
self._pin = pin
np.set_printoptions(precision=5, suppress=True, linewidth=200)
self.unit_test = unit_test
self.repo_path = snapshot_download("lerobot/unitree-g1-mujoco")
urdf_path = os.path.join(self.repo_path, "assets", "g1_body29_hand14.urdf")
mesh_dir = os.path.join(self.repo_path, "assets")
self.robot = self._pin.RobotWrapper.BuildFromURDF(urdf_path, mesh_dir)
self.mixed_jointsToLockIDs = [
"left_hip_pitch_joint",
"left_hip_roll_joint",
"left_hip_yaw_joint",
"left_knee_joint",
"left_ankle_pitch_joint",
"left_ankle_roll_joint",
"right_hip_pitch_joint",
"right_hip_roll_joint",
"right_hip_yaw_joint",
"right_knee_joint",
"right_ankle_pitch_joint",
"right_ankle_roll_joint",
"waist_yaw_joint",
"waist_roll_joint",
"waist_pitch_joint",
"left_hand_thumb_0_joint",
"left_hand_thumb_1_joint",
"left_hand_thumb_2_joint",
"left_hand_middle_0_joint",
"left_hand_middle_1_joint",
"left_hand_index_0_joint",
"left_hand_index_1_joint",
"right_hand_thumb_0_joint",
"right_hand_thumb_1_joint",
"right_hand_thumb_2_joint",
"right_hand_index_0_joint",
"right_hand_index_1_joint",
"right_hand_middle_0_joint",
"right_hand_middle_1_joint",
]
self.reduced_robot = self.robot.buildReducedRobot(
list_of_joints_to_lock=self.mixed_jointsToLockIDs,
reference_configuration=np.array([0.0] * self.robot.model.nq),
)
# Arm joint names in G1 motor order (G1_29_JointArmIndex)
self._arm_joint_names_g1 = [
"left_shoulder_pitch_joint",
"left_shoulder_roll_joint",
"left_shoulder_yaw_joint",
"left_elbow_joint",
"left_wrist_roll_joint",
"left_wrist_pitch_joint",
"left_wrist_yaw_joint",
"right_shoulder_pitch_joint",
"right_shoulder_roll_joint",
"right_shoulder_yaw_joint",
"right_elbow_joint",
"right_wrist_roll_joint",
"right_wrist_pitch_joint",
"right_wrist_yaw_joint",
]
# Pinocchio uses its own joint order in q; build index mapping.
self._arm_joint_names_pin = sorted(
self._arm_joint_names_g1,
key=lambda name: self.reduced_robot.model.idx_qs[self.reduced_robot.model.getJointId(name)],
)
logger.info(f"Pinocchio arm joint order: {self._arm_joint_names_pin}")
self._arm_reorder_g1_to_pin = [
self._arm_joint_names_g1.index(name) for name in self._arm_joint_names_pin
]
# Inverse mapping to return tau in G1 motor order.
self._arm_reorder_pin_to_g1 = np.argsort(self._arm_reorder_g1_to_pin)
self.reduced_robot.model.addFrame(
self._pin.Frame(
"L_ee",
self.reduced_robot.model.getJointId("left_wrist_yaw_joint"),
self._pin.SE3(np.eye(3), np.array([0.05, 0, 0]).T),
self._pin.FrameType.OP_FRAME,
)
)
self.reduced_robot.model.addFrame(
self._pin.Frame(
"R_ee",
self.reduced_robot.model.getJointId("right_wrist_yaw_joint"),
self._pin.SE3(np.eye(3), np.array([0.05, 0, 0]).T),
self._pin.FrameType.OP_FRAME,
)
)
# Creating Casadi models and data for symbolic computing
self.cmodel = cpin.Model(self.reduced_robot.model)
self.cdata = self.cmodel.createData()
# Creating symbolic variables
self.cq = casadi.SX.sym("q", self.reduced_robot.model.nq, 1)
self.cTf_l = casadi.SX.sym("tf_l", 4, 4)
self.cTf_r = casadi.SX.sym("tf_r", 4, 4)
cpin.framesForwardKinematics(self.cmodel, self.cdata, self.cq)
# Get the hand joint ID and define the error function
self.L_hand_id = self.reduced_robot.model.getFrameId("L_ee")
self.R_hand_id = self.reduced_robot.model.getFrameId("R_ee")
self.translational_error = casadi.Function(
"translational_error",
[self.cq, self.cTf_l, self.cTf_r],
[
casadi.vertcat(
self.cdata.oMf[self.L_hand_id].translation - self.cTf_l[:3, 3],
self.cdata.oMf[self.R_hand_id].translation - self.cTf_r[:3, 3],
)
],
)
self.rotational_error = casadi.Function(
"rotational_error",
[self.cq, self.cTf_l, self.cTf_r],
[
casadi.vertcat(
cpin.log3(self.cdata.oMf[self.L_hand_id].rotation @ self.cTf_l[:3, :3].T),
cpin.log3(self.cdata.oMf[self.R_hand_id].rotation @ self.cTf_r[:3, :3].T),
)
],
)
# Defining the optimization problem
self.opti = casadi.Opti()
self.var_q = self.opti.variable(self.reduced_robot.model.nq)
self.var_q_last = self.opti.parameter(self.reduced_robot.model.nq) # for smooth
self.param_tf_l = self.opti.parameter(4, 4)
self.param_tf_r = self.opti.parameter(4, 4)
self.translational_cost = casadi.sumsqr(
self.translational_error(self.var_q, self.param_tf_l, self.param_tf_r)
)
self.rotation_cost = casadi.sumsqr(
self.rotational_error(self.var_q, self.param_tf_l, self.param_tf_r)
)
self.regularization_cost = casadi.sumsqr(self.var_q)
self.smooth_cost = casadi.sumsqr(self.var_q - self.var_q_last)
# Setting optimization constraints and goals
self.opti.subject_to(
self.opti.bounded(
self.reduced_robot.model.lowerPositionLimit,
self.var_q,
self.reduced_robot.model.upperPositionLimit,
)
)
self.opti.minimize(
50 * self.translational_cost
+ self.rotation_cost
+ 0.02 * self.regularization_cost
+ 0.1 * self.smooth_cost
)
opts = {
"ipopt": {"print_level": 0, "max_iter": 50, "tol": 1e-6},
"print_time": False, # print or not
"calc_lam_p": False, # https://github.com/casadi/casadi/wiki/FAQ:-Why-am-I-getting-%22NaN-detected%22in-my-optimization%3F
}
self.opti.solver("ipopt", opts)
self.init_data = np.zeros(self.reduced_robot.model.nq)
self.smooth_filter = WeightedMovingFilter(np.array([0.4, 0.3, 0.2, 0.1]), 14)
def solve_ik(self, left_wrist, right_wrist, current_lr_arm_motor_q=None, current_lr_arm_motor_dq=None):
if current_lr_arm_motor_q is not None:
self.init_data = current_lr_arm_motor_q
self.opti.set_initial(self.var_q, self.init_data)
self.opti.set_value(self.param_tf_l, left_wrist)
self.opti.set_value(self.param_tf_r, right_wrist)
self.opti.set_value(self.var_q_last, self.init_data) # for smooth
try:
self.opti.solve()
sol_q = self.opti.value(self.var_q)
self.smooth_filter.add_data(sol_q)
sol_q = self.smooth_filter.filtered_data
if current_lr_arm_motor_dq is not None:
v = current_lr_arm_motor_dq * 0.0
else:
v = (sol_q - self.init_data) * 0.0
self.init_data = sol_q
sol_tauff = self._pin.rnea(
self.reduced_robot.model,
self.reduced_robot.data,
sol_q,
v,
np.zeros(self.reduced_robot.model.nv),
)
return sol_q, sol_tauff
except Exception as e:
logger.error(f"ERROR in convergence, plotting debug info.{e}")
sol_q = self.opti.debug.value(self.var_q)
self.smooth_filter.add_data(sol_q)
sol_q = self.smooth_filter.filtered_data
if current_lr_arm_motor_dq is not None:
v = current_lr_arm_motor_dq * 0.0
else:
v = (sol_q - self.init_data) * 0.0
self.init_data = sol_q
logger.error(
f"sol_q:{sol_q} \nmotorstate: \n{current_lr_arm_motor_q} \nleft_pose: \n{left_wrist} \nright_pose: \n{right_wrist}"
)
return current_lr_arm_motor_q, np.zeros(self.reduced_robot.model.nv)
def solve_tau(self, current_lr_arm_motor_q=None, current_lr_arm_motor_dq=None):
try:
q_g1 = np.array(current_lr_arm_motor_q, dtype=float)
if q_g1.shape[0] != len(self._arm_joint_names_g1):
raise ValueError(f"Expected {len(self._arm_joint_names_g1)} arm joints, got {q_g1.shape[0]}")
q_pin = q_g1[self._arm_reorder_g1_to_pin]
sol_tauff = self._pin.rnea(
self.reduced_robot.model,
self.reduced_robot.data,
q_pin,
np.zeros(self.reduced_robot.model.nv),
np.zeros(self.reduced_robot.model.nv),
)
return sol_tauff[self._arm_reorder_pin_to_g1]
except Exception as e:
logger.error(f"ERROR in convergence, plotting debug info.{e}")
return np.zeros(self.reduced_robot.model.nv)
+18 -1
View File
@@ -27,7 +27,8 @@ import numpy as np
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.envs.factory import make_env
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex, G1_29_JointIndex
from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
from ..robot import Robot
from .config_unitree_g1 import UnitreeG1Config
@@ -127,6 +128,8 @@ class UnitreeG1(Robot):
self.subscribe_thread = None
self.remote_controller = self.RemoteController()
self.arm_ik = G1_29_ArmIK()
def _subscribe_motor_state(self): # polls robot state @ 250Hz
while not self._shutdown_event.is_set():
start_time = time.time()
@@ -361,6 +364,20 @@ class UnitreeG1(Robot):
self.msg.motor_cmd[motor.value].kd = self.kd[motor.value]
self.msg.motor_cmd[motor.value].tau = 0
if self.config.gravity_compensation:
# Build action_np from motor commands (arm joints are indices 15-28, local indices 0-13)
action_np = np.zeros(14)
arm_start_idx = G1_29_JointArmIndex.kLeftShoulderPitch.value # 15
for joint in G1_29_JointArmIndex:
local_idx = joint.value - arm_start_idx
action_np[local_idx] = self.msg.motor_cmd[joint.value].q
tau = self.arm_ik.solve_tau(action_np)
# Apply tau back to motor commands
for joint in G1_29_JointArmIndex:
local_idx = joint.value - arm_start_idx
self.msg.motor_cmd[joint.value].tau = tau[local_idx]
self.msg.crc = self.crc.Crc(self.msg)
self.lowcmd_publisher.Write(self.msg)
return action
+8
View File
@@ -60,6 +60,14 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .reachy2 import Reachy2Robot
return Reachy2Robot(config)
elif config.type == "openarm_follower":
from .openarm_follower import OpenArmFollower
return OpenArmFollower(config)
elif config.type == "bi_openarm_follower":
from .bi_openarm_follower import BiOpenArmFollower
return BiOpenArmFollower(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot
+5
View File
@@ -36,23 +36,28 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
hope_jr,
koch_follower,
lekiwi,
make_robot_from_config,
omx_follower,
openarm_follower,
so_follower,
)
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
homunculus,
koch_leader,
make_teleoperator_from_config,
omx_leader,
openarm_leader,
so_leader,
unitree_g1,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.utils import init_logging
@@ -44,19 +44,23 @@ import numpy as np
from lerobot.model.kinematics import RobotKinematics
from lerobot.robots import ( # noqa: F401
RobotConfig,
bi_openarm_follower,
bi_so_follower,
koch_follower,
make_robot_from_config,
omx_follower,
openarm_follower,
so_follower,
)
from lerobot.teleoperators import ( # noqa: F401
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
gamepad,
koch_leader,
make_teleoperator_from_config,
omx_leader,
openarm_leader,
so_leader,
)
from lerobot.utils.robot_utils import precise_sleep
+6 -1
View File
@@ -98,26 +98,31 @@ from lerobot.processor.rename_processor import rename_stats
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
koch_follower,
make_robot_from_config,
omx_follower,
openarm_follower,
reachy2,
so_follower,
unitree_g1,
unitree_g1 as unitree_g1_robot,
)
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
homunculus,
koch_leader,
make_teleoperator_from_config,
omx_leader,
openarm_leader,
reachy2_teleoperator,
so_leader,
unitree_g1,
)
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
from lerobot.utils.constants import ACTION, OBS_STR
+2
View File
@@ -53,12 +53,14 @@ from lerobot.processor import (
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
koch_follower,
make_robot_from_config,
omx_follower,
openarm_follower,
reachy2,
so_follower,
unitree_g1,
@@ -70,18 +70,22 @@ from lerobot.processor import (
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_openarm_follower,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
koch_follower,
make_robot_from_config,
omx_follower,
openarm_follower,
reachy2,
so_follower,
unitree_g1 as unitree_g1_robot,
)
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_openarm_leader,
bi_so_leader,
gamepad,
homunculus,
@@ -89,8 +93,10 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
openarm_leader,
reachy2_teleoperator,
so_leader,
unitree_g1,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.robot_utils import precise_sleep
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .bi_openarm_leader import BiOpenArmLeader
from .config_bi_openarm_leader import BiOpenArmLeaderConfig
__all__ = ["BiOpenArmLeader", "BiOpenArmLeaderConfig"]
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from functools import cached_property
from lerobot.processor import RobotAction
from lerobot.teleoperators.openarm_leader import OpenArmLeaderConfig
from ..openarm_leader import OpenArmLeader
from ..teleoperator import Teleoperator
from .config_bi_openarm_leader import BiOpenArmLeaderConfig
logger = logging.getLogger(__name__)
class BiOpenArmLeader(Teleoperator):
"""
Bimanual OpenArm Leader Arms
"""
config_class = BiOpenArmLeaderConfig
name = "bi_openarm_leader"
def __init__(self, config: BiOpenArmLeaderConfig):
super().__init__(config)
self.config = config
left_arm_config = OpenArmLeaderConfig(
id=f"{config.id}_left" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.left_arm_config.port,
can_interface=config.left_arm_config.can_interface,
use_can_fd=config.left_arm_config.use_can_fd,
can_bitrate=config.left_arm_config.can_bitrate,
can_data_bitrate=config.left_arm_config.can_data_bitrate,
motor_config=config.left_arm_config.motor_config,
manual_control=config.left_arm_config.manual_control,
position_kd=config.left_arm_config.position_kd,
position_kp=config.left_arm_config.position_kp,
)
right_arm_config = OpenArmLeaderConfig(
id=f"{config.id}_right" if config.id else None,
calibration_dir=config.calibration_dir,
port=config.right_arm_config.port,
can_interface=config.right_arm_config.can_interface,
use_can_fd=config.right_arm_config.use_can_fd,
can_bitrate=config.right_arm_config.can_bitrate,
can_data_bitrate=config.right_arm_config.can_data_bitrate,
motor_config=config.right_arm_config.motor_config,
manual_control=config.right_arm_config.manual_control,
position_kd=config.right_arm_config.position_kd,
position_kp=config.right_arm_config.position_kp,
)
self.left_arm = OpenArmLeader(left_arm_config)
self.right_arm = OpenArmLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
**{f"right_{k}": v for k, v in right_arm_features.items()},
}
@cached_property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def calibrate(self) -> None:
self.left_arm.calibrate()
self.right_arm.calibrate()
def configure(self) -> None:
self.left_arm.configure()
self.right_arm.configure()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_action(self) -> RobotAction:
action_dict = {}
# Add "left_" prefix
left_action = self.left_arm.get_action()
action_dict.update({f"left_{key}": value for key, value in left_action.items()})
# Add "right_" prefix
right_action = self.right_arm.get_action()
action_dict.update({f"right_{key}": value for key, value in right_action.items()})
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from lerobot.teleoperators.openarm_leader import OpenArmLeaderConfigBase
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("bi_openarm_leader")
@dataclass
class BiOpenArmLeaderConfig(TeleoperatorConfig):
"""Configuration class for Bi OpenArm Follower robots."""
left_arm_config: OpenArmLeaderConfigBase
right_arm_config: OpenArmLeaderConfigBase
@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_leader import OpenArmLeaderConfig, OpenArmLeaderConfigBase
from .openarm_leader import OpenArmLeader
__all__ = ["OpenArmLeader", "OpenArmLeaderConfig", "OpenArmLeaderConfigBase"]
@@ -0,0 +1,75 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from ..config import TeleoperatorConfig
@dataclass
class OpenArmLeaderConfigBase:
"""Base configuration for the OpenArms leader/teleoperator with Damiao motors."""
# CAN interfaces - one per arm
# Arm CAN interface (e.g., "can3")
# Linux: "can0", "can1", etc.
port: str
# CAN interface type: "socketcan" (Linux), "slcan" (serial), or "auto" (auto-detect)
can_interface: str = "socketcan"
# CAN FD settings (OpenArms uses CAN FD by default)
use_can_fd: bool = True
can_bitrate: int = 1000000 # Nominal bitrate (1 Mbps)
can_data_bitrate: int = 5000000 # Data bitrate for CAN FD (5 Mbps)
# Motor configuration for OpenArms (7 DOF per arm)
# Maps motor names to (send_can_id, recv_can_id, motor_type)
# Based on: https://docs.openarm.dev/software/setup/configure-test
# OpenArms uses 4 types of motors:
# - DM8009 (DM-J8009P-2EC) for shoulders (high torque)
# - DM4340P and DM4340 for shoulder rotation and elbow
# - DM4310 (DM-J4310-2EC V1.1) for wrist and gripper
motor_config: dict[str, tuple[int, int, str]] = field(
default_factory=lambda: {
"joint_1": (0x01, 0x11, "dm8009"), # J1 - Shoulder pan (DM8009)
"joint_2": (0x02, 0x12, "dm8009"), # J2 - Shoulder lift (DM8009)
"joint_3": (0x03, 0x13, "dm4340"), # J3 - Shoulder rotation (DM4340)
"joint_4": (0x04, 0x14, "dm4340"), # J4 - Elbow flex (DM4340)
"joint_5": (0x05, 0x15, "dm4310"), # J5 - Wrist roll (DM4310)
"joint_6": (0x06, 0x16, "dm4310"), # J6 - Wrist pitch (DM4310)
"joint_7": (0x07, 0x17, "dm4310"), # J7 - Wrist rotation (DM4310)
"gripper": (0x08, 0x18, "dm4310"), # J8 - Gripper (DM4310)
}
)
# Torque mode settings for manual control
# When enabled, motors have torque disabled for manual movement
manual_control: bool = True
# TODO(Steven, Pepijn): Not used ... ?
# MIT control parameters (used when manual_control=False for torque control)
# List of 8 values: [joint_1, joint_2, joint_3, joint_4, joint_5, joint_6, joint_7, gripper]
position_kp: list[float] = field(
default_factory=lambda: [240.0, 240.0, 240.0, 240.0, 24.0, 31.0, 25.0, 16.0]
)
position_kd: list[float] = field(default_factory=lambda: [3.0, 3.0, 3.0, 3.0, 0.2, 0.2, 0.2, 0.2])
@TeleoperatorConfig.register_subclass("openarm_leader")
@dataclass
class OpenArmLeaderConfig(TeleoperatorConfig, OpenArmLeaderConfigBase):
pass
@@ -0,0 +1,225 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_openarm_leader import OpenArmLeaderConfig
logger = logging.getLogger(__name__)
class OpenArmLeader(Teleoperator):
"""
OpenArm Leader/Teleoperator Arm with Damiao motors.
This teleoperator uses CAN bus communication to read positions from
Damiao motors that are manually moved (torque disabled).
"""
config_class = OpenArmLeaderConfig
name = "openarm_leader"
def __init__(self, config: OpenArmLeaderConfig):
super().__init__(config)
self.config = config
# Arm motors
motors: dict[str, Motor] = {}
for motor_name, (send_id, recv_id, motor_type_str) in config.motor_config.items():
motor = Motor(
send_id, motor_type_str, MotorNormMode.DEGREES
) # Always use degrees for Damiao motors
motor.recv_id = recv_id
motor.motor_type_str = motor_type_str
motors[motor_name] = motor
self.bus = DamiaoMotorsBus(
port=self.config.port,
motors=motors,
calibration=self.calibration,
can_interface=self.config.can_interface,
use_can_fd=self.config.use_can_fd,
bitrate=self.config.can_bitrate,
data_bitrate=self.config.can_data_bitrate if self.config.use_can_fd else None,
)
@property
def action_features(self) -> dict[str, type]:
"""Features produced by this teleoperator."""
features: dict[str, type] = {}
for motor in self.bus.motors:
features[f"{motor}.pos"] = float
features[f"{motor}.vel"] = float
features[f"{motor}.torque"] = float
return features
@property
def feedback_features(self) -> dict[str, type]:
"""Feedback features (not implemented for OpenArms)."""
return {}
@property
def is_connected(self) -> bool:
"""Check if teleoperator is connected."""
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the teleoperator.
For manual control, we disable torque after connecting so the
arm can be moved by hand.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
self.bus.connect()
# Run calibration if needed
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
self.configure()
if self.is_calibrated:
self.bus.set_zero_position()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
"""Check if teleoperator is calibrated."""
return self.bus.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArms leader.
The calibration procedure:
1. Disable torque (if not already disabled)
2. Ask user to position arm in zero position (hanging with gripper closed)
3. Set this as zero position
4. Record range of motion for each joint
5. Save calibration
"""
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration for {self}")
self.bus.disable_torque()
# Step 1: Set zero position
input(
"\nCalibration: Set Zero Position)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
# Set current position as zero for all motors
self.bus.set_zero_position()
logger.info("Arm zero position set.")
logger.info("Setting range: -90° to +90° by default for all joints")
# TODO(Steven, Pepijn): Check if MotorCalibration is actually needed here given that we only use Degrees
for motor_name, motor in self.bus.motors.items():
self.calibration[motor_name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=0,
range_min=-90,
range_max=90,
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
"""
Configure motors for manual teleoperation.
For manual control, we disable torque so the arm can be moved by hand.
"""
return self.bus.disable_torque() if self.config.manual_control else self.bus.configure_motors()
def setup_motors(self) -> None:
raise NotImplementedError(
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
def get_action(self) -> RobotAction:
"""
Get current action from the leader arm.
This is the main method for teleoperators - it reads the current state
of the leader arm and returns it as an action that can be sent to a follower.
Reads all motor states (pos/vel/torque) in one CAN refresh cycle.
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
action_dict: dict[str, Any] = {}
# Use sync_read_all_states to get pos/vel/torque in one go
states = self.bus.sync_read_all_states()
for motor in self.bus.motors:
state = states.get(motor, {})
action_dict[f"{motor}.pos"] = state.get("position")
action_dict[f"{motor}.vel"] = state.get("velocity")
action_dict[f"{motor}.torque"] = state.get("torque")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
def disconnect(self) -> None:
"""Disconnect from teleoperator."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
# For manual control, ensure torque is disabled before disconnecting
self.bus.disconnect(disable_torque=self.config.manual_control)
logger.info(f"{self} disconnected.")
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_unitree_g1 import ExoskeletonArmPortConfig, UnitreeG1TeleoperatorConfig
from .exo_calib import ExoskeletonCalibration, ExoskeletonJointCalibration
from .exo_ik import ExoskeletonIKHelper
from .exo_serial import ExoskeletonArm
from .unitree_g1 import UnitreeG1Teleoperator
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from ..config import TeleoperatorConfig
@dataclass
class ExoskeletonArmPortConfig:
"""Serial port configuration for individual exoskeleton arm."""
port: str = ""
baud_rate: int = 115200
@TeleoperatorConfig.register_subclass("unitree_g1")
@dataclass
class UnitreeG1TeleoperatorConfig(TeleoperatorConfig):
left_arm_config: ExoskeletonArmPortConfig = field(default_factory=ExoskeletonArmPortConfig)
right_arm_config: ExoskeletonArmPortConfig = field(default_factory=ExoskeletonArmPortConfig)
# Frozen joints (comma-separated joint names that won't be moved by IK)
frozen_joints: str = ""
@@ -0,0 +1,446 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module handles calibration of hall effect sensors used in the exoskeleton.
Each joint has a pair of ADC channels outputting sin and cos values that trace an ellipse
as the joint rotates due to imprecision in magnet/sensor placement. We fit this ellipse to a unit circle,
and calculate arctan2 of the unit circle to get the joint angle.
We then store the ellipse parameters and the zero offset for each joint to be used at runtime.
"""
import json
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
import serial
logger = logging.getLogger(__name__)
# exoskeleton joint names -> ADC channel pairs. TODO: add wrist pitch and wrist yaw
JOINTS = {
"shoulder_pitch": (0, 1),
"shoulder_yaw": (2, 3),
"shoulder_roll": (4, 5),
"elbow_flex": (6, 7),
"wrist_roll": (14, 15),
}
@dataclass
class ExoskeletonJointCalibration:
name: str # joint name
center_fit: list[float] # center of the ellipse
T: list[list[float]] # 2x2 transformation matrix
zero_offset: float = 0.0 # angle at neutral pose
@dataclass
class ExoskeletonCalibration:
"""Full calibration data for an exoskeleton arm."""
version: int = 2
side: str = ""
adc_max: int = 2**12 - 1
joints: list[ExoskeletonJointCalibration] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"version": self.version,
"side": self.side,
"adc_max": self.adc_max,
"joints": [
{
"name": j.name,
"center_fit": j.center_fit,
"T": j.T,
"zero_offset": j.zero_offset,
}
for j in self.joints
],
}
@classmethod
def from_dict(cls, data: dict) -> "ExoskeletonCalibration":
joints = [
ExoskeletonJointCalibration(
name=j["name"],
center_fit=j["center_fit"],
T=j["T"],
zero_offset=j.get("zero_offset", 0.0),
)
for j in data.get("joints", [])
]
return cls(
version=data.get("version", 2),
side=data.get("side", ""),
adc_max=data.get("adc_max", 2**12 - 1),
joints=joints,
)
@dataclass(frozen=True)
class CalibParams:
fit_every: float = 0.15
min_fit_points: int = 60
fit_window: int = 900
max_fit_points: int = 300
trim_low: float = 0.05
trim_high: float = 0.95
median_window: int = 5
history: int = 3500
draw_hz: float = 120.0
sample_count: int = 50
def normalize_angle(angle: float) -> float:
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def joint_z_and_angle(raw16: list[int], j: ExoskeletonJointCalibration) -> tuple[np.ndarray, float]:
"""
Applies calibration to each joint: raw centered ellipse-to-circle angle.
"""
pair = JOINTS[j.name]
s, c = raw16[pair[0]], raw16[pair[1]] # get sin and cos
p = np.array([float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2]) # center the raw values
z = np.asarray(j.T) @ (
p - np.asarray(j.center_fit)
) # center the ellipse and invert the transformation matrix to get unit circle coords
ang = float(np.arctan2(z[1], z[0])) - j.zero_offset # calculate the anvgle and apply the zero offset
return z, normalize_angle(-ang) # ensure range is [-pi, pi]
def exo_raw_to_angles(raw16: list[int], calib: ExoskeletonCalibration) -> dict[str, float]:
"""Convert raw sensor readings to joint angles using calibration."""
return {j.name: joint_z_and_angle(raw16, j)[1] for j in calib.joints}
def run_exo_calibration(
ser: serial.Serial,
side: str,
save_path: Path,
params: CalibParams | None = None,
) -> ExoskeletonCalibration:
"""
Run interactive calibration for an exoskeleton arm.
"""
try:
import cv2
import matplotlib.pyplot as plt
except ImportError as e:
raise ImportError(
"Calibration requires matplotlib and opencv-python. "
"Install with: pip install matplotlib opencv-python"
) from e
from .exo_serial import read_raw_from_serial
params = params or CalibParams()
joint_list = list(JOINTS.items()) # Convert dict to list for indexing
logger.info(f"Starting calibration for {side} exoskeleton arm")
def running_median(win: deque) -> float:
return float(np.median(np.fromiter(win, dtype=float)))
def read_joint_point(raw16: list[int], pair: tuple[int, int]):
s, c = raw16[pair[0]], raw16[pair[1]]
return float(c) - (2**12 - 1) / 2, float(s) - (2**12 - 1) / 2, float(s), float(c)
def select_fit_subset(xs, ys):
"""Select and filter points for ellipse fitting. Trims outliers by radius and downsamples."""
n = min(params.fit_window, len(xs))
if n <= 0:
return None, None
x = np.asarray(list(xs)[-n:], dtype=float) # most recent n samples
y = np.asarray(list(ys)[-n:], dtype=float)
r = np.sqrt(x * x + y * y) # radius from origin
if len(r) >= 20:
lo, hi = np.quantile(r, params.trim_low), np.quantile(r, params.trim_high) # outlier bounds
keep = (r >= lo) & (r <= hi)
x, y = x[keep], y[keep] # remove outliers
if len(x) > params.max_fit_points:
idx = np.linspace(0, len(x) - 1, params.max_fit_points).astype(int) # downsample evenly
x, y = x[idx], y[idx]
return x, y
def fit_ellipse_opencv(x, y):
"""Fit ellipse to (x,y) points using OpenCV. Returns center, axes, rotation matrix, and outline."""
x, y = np.asarray(x, dtype=float), np.asarray(y, dtype=float)
if len(x) < 5:
return None
pts = np.stack([x, y], axis=1).astype(np.float32).reshape(-1, 1, 2)
try:
(xc, yc), (w, h), angle_deg = cv2.fitEllipse(pts) # returns center, axes, rotation in degrees
except cv2.error:
return None
a, b = float(w) * 0.5, float(h) * 0.5 # get ellipse major and minor semi-axes
phi = np.deg2rad(float(angle_deg)) # to rad
if b > a: # ensure major axis is a
a, b = b, a
phi += np.pi / 2.0
if not np.isfinite(a) or not np.isfinite(b) or a <= 1e-6 or b <= 1e-6:
return None
cp, sp = float(np.cos(phi)), float(np.sin(phi)) #
rot = np.array([[cp, -sp], [sp, cp]], dtype=float) # 2x2 rotation matrix
center = np.array([float(xc), float(yc)], dtype=float) # offset vector
tt = np.linspace(0, 2 * np.pi, 360)
outline = (rot @ np.stack([a * np.cos(tt), b * np.sin(tt)])).T + center # for viz
return {"center": center, "a": a, "b": b, "R": rot, "ex": outline[:, 0], "ey": outline[:, 1]}
# Setup matplotlib
plt.ion()
fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(12, 6))
ax0.set_xlabel("cos - center")
ax0.set_ylabel("sin - center")
ax0.grid(True, alpha=0.25)
ax0.set_aspect("equal", adjustable="box")
ax1.set_title("Unit circle + angle")
ax1.set_xlabel("x")
ax1.set_ylabel("y")
ax1.grid(True, alpha=0.25)
ax1.set_aspect("equal", adjustable="box")
tt = np.linspace(0, 2 * np.pi, 360)
ax1.plot(np.cos(tt), np.sin(tt), "k-", linewidth=1)
ax0.set_xlim(-2200, 2200)
ax0.set_ylim(-2200, 2200)
ax1.set_xlim(-1.4, 1.4)
ax1.set_ylim(-1.4, 1.4)
sc0 = ax0.scatter([], [], s=6, animated=True)
(ell_line,) = ax0.plot([], [], "r-", linewidth=2, animated=True)
sc1 = ax1.scatter([], [], s=6, animated=True)
(radius_line,) = ax1.plot([], [], "g-", linewidth=2, animated=True)
angle_text = ax1.text(
0.02, 0.98, "", transform=ax1.transAxes, va="top", ha="left", fontsize=12, animated=True
)
fig.canvas.draw()
bg0 = fig.canvas.copy_from_bbox(ax0.bbox)
bg1 = fig.canvas.copy_from_bbox(ax1.bbox)
# State
joints_out = []
joint_idx = 0
phase = "ellipse"
advance_requested = False
zero_samples = []
def on_key(event):
nonlocal advance_requested
if event.key in ("n", "N", "enter", " "):
advance_requested = True
fig.canvas.mpl_connect("key_press_event", on_key)
def reset_state():
return {
"xs": deque(maxlen=params.history),
"ys": deque(maxlen=params.history),
"xu": deque(maxlen=params.history),
"yu": deque(maxlen=params.history),
"win_s": deque(maxlen=params.median_window),
"win_c": deque(maxlen=params.median_window),
"ellipse_cache": None,
"T": None,
"center_fit": None,
"have_transform": False,
"latest_z": None,
"last_fit": 0.0,
}
state = reset_state()
last_draw = 0.0
name, pair = joint_list[joint_idx]
fig.canvas.manager.set_window_title(f"[{joint_idx + 1}/{len(joint_list)}] {name} - ELLIPSE")
ax0.set_title(f"{name} raw (filtered)")
logger.info(f"[{joint_idx + 1}/{len(joint_list)}] Calibrating {name}")
logger.info("Step 1: Move joint around to map ellipse, then press 'n'")
try:
while plt.fignum_exists(fig.number):
name, pair = joint_list[joint_idx]
# Handles calibration GUI state: ellipse → zero_pose → next joint -> ellipse -> ...
if phase == "ellipse" and advance_requested and state["have_transform"]:
joints_out.append(
{
"name": name,
"center_fit": state["center_fit"].tolist(),
"T": state["T"].tolist(),
}
)
logger.info(f" -> Ellipse saved for {name}")
phase, zero_samples, advance_requested = "zero_pose", [], False
fig.canvas.manager.set_window_title(f"[{joint_idx + 1}/{len(joint_list)}] {name} - ZERO POSE")
ax0.set_title(f"{name} - hold zero pose")
fig.canvas.draw()
bg0, bg1 = fig.canvas.copy_from_bbox(ax0.bbox), fig.canvas.copy_from_bbox(ax1.bbox)
logger.info(f"Step 2: Hold {name} in zero position, then press 'n'")
elif phase == "ellipse" and advance_requested and not state["have_transform"]:
logger.info(" (Need valid fit first - keep moving the joint)")
advance_requested = False
elif phase == "zero_pose" and advance_requested:
if len(zero_samples) >= params.sample_count:
zero_offset = float(np.mean(zero_samples[-params.sample_count :]))
joints_out[-1]["zero_offset"] = zero_offset
logger.info(f" -> {name} zero: {zero_offset:+.3f} rad ({np.degrees(zero_offset):+.1f}°)")
joint_idx += 1
advance_requested = False
if joint_idx >= len(joint_list):
# All joints done
calib = ExoskeletonCalibration(
version=2,
side=side,
adc_max=2**12 - 1,
joints=[
ExoskeletonJointCalibration(
name=j["name"],
center_fit=j["center_fit"],
T=j["T"],
zero_offset=j.get("zero_offset", 0.0),
)
for j in joints_out
],
)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "w") as f:
json.dump(calib.to_dict(), f, indent=2)
logger.info(f"Saved calibration to {save_path}")
logger.info("Calibration complete!")
plt.close(fig)
return calib
# Next joint
phase, state = "ellipse", reset_state()
name, pair = joint_list[joint_idx]
fig.canvas.manager.set_window_title(
f"[{joint_idx + 1}/{len(joint_list)}] {name} - ELLIPSE"
)
ax0.set_title(f"{name} raw (filtered)")
fig.canvas.draw()
bg0, bg1 = fig.canvas.copy_from_bbox(ax0.bbox), fig.canvas.copy_from_bbox(ax1.bbox)
logger.info(f"[{joint_idx + 1}/{len(joint_list)}] Calibrating {name}")
logger.info("Step 1: Move joint around to map ellipse, then press 'n'")
else:
logger.info(
f" (Collecting samples: {len(zero_samples)}/{params.sample_count} - hold still)"
)
advance_requested = False
# Read sensor
raw16 = read_raw_from_serial(ser)
if raw16 is not None:
x_raw, y_raw, s_raw, c_raw = read_joint_point(raw16, pair)
if phase == "ellipse":
if state["have_transform"]:
z = state["T"] @ (np.array([x_raw, y_raw]) - state["center_fit"])
state["xu"].append(float(z[0]))
state["yu"].append(float(z[1]))
state["latest_z"] = (float(z[0]), float(z[1]))
state["win_s"].append(s_raw)
state["win_c"].append(c_raw)
if len(state["win_s"]) >= max(3, params.median_window):
state["ys"].append(running_median(state["win_s"]) - (2**12 - 1) / 2)
state["xs"].append(running_median(state["win_c"]) - (2**12 - 1) / 2)
else:
jdata = joints_out[-1]
z = np.array(jdata["T"]) @ (np.array([x_raw, y_raw]) - np.array(jdata["center_fit"]))
zero_samples.append(float(np.arctan2(z[1], z[0])))
state["latest_z"] = (float(z[0]), float(z[1]))
# Ellipse fitting
t = time.time()
if (
phase == "ellipse"
and (t - state["last_fit"]) >= params.fit_every
and len(state["xs"]) >= params.min_fit_points
):
xfit, yfit = select_fit_subset(state["xs"], state["ys"])
if xfit is not None and len(xfit) >= params.min_fit_points:
fit = fit_ellipse_opencv(xfit, yfit)
if fit is not None:
state["center_fit"] = fit["center"]
state["T"] = np.diag([1.0 / fit["a"], 1.0 / fit["b"]]) @ fit["R"].T
state["ellipse_cache"] = (fit["ex"], fit["ey"])
state["have_transform"] = True
state["last_fit"] = t
# Drawing
if (t - last_draw) >= 1.0 / params.draw_hz:
fig.canvas.restore_region(bg0)
fig.canvas.restore_region(bg1)
if phase == "ellipse":
sc0.set_offsets(np.c_[state["xs"], state["ys"]] if state["xs"] else np.empty((0, 2)))
ax0.draw_artist(sc0)
ell_line.set_data(*state["ellipse_cache"] if state["ellipse_cache"] else ([], []))
ax0.draw_artist(ell_line)
sc1.set_offsets(np.c_[state["xu"], state["yu"]] if state["xu"] else np.empty((0, 2)))
ax1.draw_artist(sc1)
if state["latest_z"]:
zx, zy = state["latest_z"]
radius_line.set_data([0.0, zx], [0.0, zy])
ang = float(np.arctan2(zy, zx))
angle_text.set_text(
f"angle: {ang:+.3f} rad ({np.degrees(ang):+.1f}°)\nmove {name}, press 'n' to advance"
)
else:
radius_line.set_data([], [])
angle_text.set_text("(waiting for fit)")
else:
sc0.set_offsets(np.empty((0, 2)))
ax0.draw_artist(sc0)
ell_line.set_data([], [])
ax0.draw_artist(ell_line)
if state["latest_z"]:
zx, zy = state["latest_z"]
sc1.set_offsets([[zx, zy]])
radius_line.set_data([0.0, zx], [0.0, zy])
ang = float(np.arctan2(zy, zx))
angle_text.set_text(
f"Zero pose for {name}\nangle: {ang:+.3f} rad\nsamples: {len(zero_samples)}/{params.sample_count}\nhold still, press 'n'"
)
else:
sc1.set_offsets(np.empty((0, 2)))
radius_line.set_data([], [])
angle_text.set_text("(waiting for data)")
ax1.draw_artist(sc1)
ax1.draw_artist(radius_line)
ax1.draw_artist(angle_text)
fig.canvas.blit(ax0.bbox)
fig.canvas.blit(ax1.bbox)
fig.canvas.flush_events()
last_draw = t
plt.pause(0.001)
finally:
plt.close(fig)
@@ -0,0 +1,353 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
IK helper for exoskeleton-to-G1 teleoperation. We map Exoskeleton joint angles to end-effector pose in world frame,
visualizing the result in meshcat after calibration.
"""
import logging
import os
from dataclasses import dataclass
import numpy as np
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointArmIndex
from lerobot.robots.unitree_g1.robot_kinematic_processor import G1_29_ArmIK
from .exo_calib import JOINTS
logger = logging.getLogger(__name__)
def _frame_id(model, name: str) -> int | None:
try:
fid = model.getFrameId(name)
return fid if 0 <= fid < model.nframes else None
except Exception:
return None
@dataclass
class ArmCfg:
side: str # "left" | "right"
urdf: str # exo_left.urdf / exo_right.urdf
root: str # "exo_left" / "exo_right"
g1_ee: str # "l_ee" / "r_ee"
offset: np.ndarray # world offset for viz + target
marker_prefix: str # "left" / "right"
class Markers:
"""Creates meshcat visualization primitives, showing end-effector frames of exoskeleton and G1"""
def __init__(self, viewer):
self.v = viewer
def sphere(self, path: str, r: float, rgba: tuple[float, float, float, float]):
import meshcat.geometry as mg
c = (int(rgba[0] * 255) << 16) | (int(rgba[1] * 255) << 8) | int(rgba[2] * 255)
self.v[path].set_object(
mg.Sphere(r),
mg.MeshPhongMaterial(color=c, opacity=rgba[3], transparent=rgba[3] < 1.0),
)
def axes(self, path: str, axis_len: float = 0.1, axis_w: int = 6):
import meshcat.geometry as mg
pts = np.array(
[[0, 0, 0], [axis_len, 0, 0], [0, 0, 0], [0, axis_len, 0], [0, 0, 0], [0, 0, axis_len]],
dtype=np.float32,
).T
cols = np.array(
[[1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 1, 0], [0, 0, 1], [0, 0, 1]],
dtype=np.float32,
).T
self.v[path].set_object(
mg.LineSegments(
mg.PointsGeometry(position=pts, color=cols),
mg.LineBasicMaterial(linewidth=axis_w, vertexColors=True),
)
)
def tf(self, path: str, mat: np.ndarray):
self.v[path].set_transform(mat)
class ExoskeletonIKHelper:
"""
- Loads G1 robot and exoskeleton URDF models via Pinocchio
- Computes forward kinematics on exoskeleton to get end-effector poses
- Solves inverse kinematics on G1 to match those poses
- Provides meshcat visualization showing both robots and targets
Args:
frozen_joints: List of G1 joint names to exclude from IK (kept at neutral).
"""
def __init__(self, frozen_joints: list[str] | None = None):
try:
import pinocchio as pin
except ImportError as e:
raise ImportError("ik mode needs pinocchio: pip install pin") from e
self.pin = pin
self.frozen_joints = frozen_joints or []
self.g1_ik = G1_29_ArmIK()
self.robot_g1 = self.g1_ik.reduced_robot
self.robot_g1.data = self.robot_g1.model.createData()
self.q_g1 = pin.neutral(self.robot_g1.model)
assets_dir = os.path.join(self.g1_ik.repo_path, "assets")
self.frozen_idx = self._frozen_joint_indices()
self.arms = [
ArmCfg(
side="left",
urdf=os.path.join(assets_dir, "exo_left.urdf"),
root="exo_left",
g1_ee="L_ee",
offset=np.array([0.6, 0.3, 0.0]),
marker_prefix="left",
),
ArmCfg(
side="right",
urdf=os.path.join(assets_dir, "exo_right.urdf"),
root="exo_right",
g1_ee="R_ee",
offset=np.array([0.6, -0.3, 0.0]),
marker_prefix="right",
),
]
self.exo = {} # side -> pin.RobotWrapper
self.q_exo = {} # side -> q
self.ee_id_exo = {} # side -> frame id
self.qmap = {} # side -> {joint_name: q_idx}
self.ee_id_g1 = {} # side -> frame id
self._load_exo_models(assets_dir)
for a in self.arms:
self.ee_id_g1[a.side] = _frame_id(self.robot_g1.model, a.g1_ee)
self.viewer = None
self.markers: Markers | None = None
self.viz_g1 = None
self.viz_exo = {} # side -> viz
def _frozen_joint_indices(self) -> dict[str, int]:
out = {}
m = self.robot_g1.model
for name in self.frozen_joints:
if name in m.names:
jid = m.getJointId(name)
out[name] = m.idx_qs[jid]
logger.info(f"freezing joint: {name} (q_idx={out[name]})")
return out
def _find_exo_ee(self, model, ee_name: str = "ee") -> int:
ee = _frame_id(model, ee_name)
if ee is not None:
return ee
for fid in reversed(range(model.nframes)):
if model.frames[fid].type == self.pin.FrameType.BODY:
return fid
return 0
def _build_joint_map(self, robot) -> dict[str, int]:
m = robot.model
return {n: m.idx_qs[m.getJointId(n)] for n in JOINTS if n in m.names}
def _load_exo_models(self, assets_dir: str):
pin = self.pin
for a in self.arms:
if not os.path.exists(a.urdf):
logger.warning(f"{a.side} exo urdf not found: {a.urdf}")
continue
r = pin.RobotWrapper.BuildFromURDF(a.urdf, assets_dir)
self.exo[a.side] = r
self.q_exo[a.side] = pin.neutral(r.model)
self.ee_id_exo[a.side] = self._find_exo_ee(r.model)
self.qmap[a.side] = self._build_joint_map(r)
logger.info(f"loaded {a.side} exo urdf: {a.urdf}")
def init_visualization(self):
"""
Creates a browser-based visualization of exoskeleton and G1 robot,
highlighting end-effector frames and target positions.
"""
try:
from pinocchio.visualize import MeshcatVisualizer
except ImportError as e:
logger.warning(f"meshcat viz unavailable: {e}")
return
# g1
self.viz_g1 = MeshcatVisualizer(
self.robot_g1.model, self.robot_g1.collision_model, self.robot_g1.visual_model
)
self.viz_g1.initViewer(open=True)
self.viz_g1.loadViewerModel("g1")
self.viz_g1.display(self.q_g1)
self.viewer = self.viz_g1.viewer
self.markers = Markers(self.viewer)
# exos
for a in self.arms:
if a.side not in self.exo:
continue
r = self.exo[a.side]
v = MeshcatVisualizer(r.model, r.collision_model, r.visual_model)
v.initViewer(open=False)
v.viewer = self.viewer
v.loadViewerModel(a.root)
offset_tf = np.eye(4)
offset_tf[:3, 3] = a.offset
self.viewer[a.root].set_transform(offset_tf)
v.display(self.q_exo[a.side])
self.viz_exo[a.side] = v
# markers
for a in self.arms:
p = a.marker_prefix
self.markers.sphere(f"markers/{p}_exo_ee", 0.012, (0.2, 1.0, 0.2, 0.9))
self.markers.sphere(f"markers/{p}_g1_ee", 0.015, (1.0, 0.2, 0.2, 0.9))
self.markers.sphere(f"markers/{p}_ik_target", 0.015, (0.1, 0.3, 1.0, 0.9))
self.markers.axes(f"markers/{p}_exo_axes", 0.06)
self.markers.axes(f"markers/{p}_g1_axes", 0.08)
logger.info(f"meshcat viz initialized: {self.viewer.url()}")
print(f"\nmeshcat url: {self.viewer.url()}\n")
def _fk_target_world(self, side: str, angles: dict[str, float]) -> np.ndarray | None:
"""returns wrist frame target to be used for G1 IK in 4x4 homogeneous transform. Takes offset into account."""
if side not in self.exo or not angles:
return None
pin = self.pin
q = self.q_exo[side]
qmap = self.qmap[side]
for name, ang in angles.items():
idx = qmap.get(name)
if idx is not None:
q[idx] = float(ang)
r = self.exo[side]
pin.forwardKinematics(r.model, r.data, q)
pin.updateFramePlacements(r.model, r.data)
ee = r.data.oMf[self.ee_id_exo[side]]
target = np.eye(4)
target[:3, :3] = ee.rotation
# offset gets applied in world space
cfg = next(a for a in self.arms if a.side == side)
target[:3, 3] = cfg.offset + ee.translation
return target
def update_visualization(self):
if self.viewer is None or self.markers is None:
return
pin = self.pin
# g1
if self.viz_g1 is not None:
self.viz_g1.display(self.q_g1)
pin.forwardKinematics(self.robot_g1.model, self.robot_g1.data, self.q_g1)
pin.updateFramePlacements(self.robot_g1.model, self.robot_g1.data)
for a in self.arms:
fid = self.ee_id_g1.get(a.side)
if fid is None:
continue
ee_tf = self.robot_g1.data.oMf[fid].homogeneous
p = a.marker_prefix
self.markers.tf(f"markers/{p}_g1_ee", ee_tf)
self.markers.tf(f"markers/{p}_g1_axes", ee_tf)
# exos
for a in self.arms:
side = a.side
v = self.viz_exo.get(side)
if v is None:
continue
v.display(self.q_exo[side])
r = self.exo[side]
pin.forwardKinematics(r.model, r.data, self.q_exo[side])
pin.updateFramePlacements(r.model, r.data)
ee = r.data.oMf[self.ee_id_exo[side]]
world_tf = (pin.SE3(np.eye(3), a.offset) * ee).homogeneous
p = a.marker_prefix
self.markers.tf(f"markers/{p}_exo_ee", world_tf)
self.markers.tf(f"markers/{p}_exo_axes", world_tf)
target_tf = np.eye(4)
target_tf[:3, :3] = ee.rotation
target_tf[:3, 3] = a.offset + ee.translation
self.markers.tf(f"markers/{p}_ik_target", target_tf)
def compute_g1_joints_from_exo(
self,
left_angles: dict[str, float],
right_angles: dict[str, float],
) -> dict[str, float]:
"""
Performs FK on exoskeleton to get end-effector poses in world frame,
after which it solves IK on G1 to return joint angles matching those poses in G1 motor order.
"""
pin = self.pin
targets = {
"left": self._fk_target_world("left", left_angles),
"right": self._fk_target_world("right", right_angles),
}
# fallback to current g1 ee pose if missing target
pin.forwardKinematics(self.robot_g1.model, self.robot_g1.data, self.q_g1)
pin.updateFramePlacements(self.robot_g1.model, self.robot_g1.data)
for a in self.arms:
if targets[a.side] is not None:
continue
fid = self.ee_id_g1.get(a.side)
if fid is not None:
targets[a.side] = self.robot_g1.data.oMf[fid].homogeneous
if targets["left"] is None or targets["right"] is None:
logger.warning("missing ik targets, returning current pose")
return {}
frozen_vals = {n: self.q_g1[i] for n, i in self.frozen_idx.items()}
self.q_g1, _ = self.g1_ik.solve_ik(
targets["left"], targets["right"], current_lr_arm_motor_q=self.q_g1
)
for n, i in self.frozen_idx.items():
self.q_g1[i] = frozen_vals[n]
return {
f"{j.name}.q": float(self.q_g1[i])
for i, j in enumerate(G1_29_JointArmIndex)
if i < len(self.q_g1)
}
@@ -0,0 +1,119 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from dataclasses import dataclass
from pathlib import Path
import serial
from .exo_calib import ExoskeletonCalibration, exo_raw_to_angles, run_exo_calibration
logger = logging.getLogger(__name__)
def parse_raw16(line: bytes) -> list[int] | None:
try:
parts = line.decode("utf-8", errors="ignore").split()
if len(parts) < 16:
return None
return [int(x) for x in parts[:16]]
except Exception:
return None
def read_raw_from_serial(ser) -> list[int] | None:
"""Read latest sample from serial; if buffer is backed up, keep only the newest."""
last = None
while ser.in_waiting > 0:
b = ser.readline()
if not b:
break
raw16 = parse_raw16(b)
if raw16 is not None:
last = raw16
if last is None:
b = ser.readline()
if b:
last = parse_raw16(b)
return last
@dataclass
class ExoskeletonArm:
port: str
calibration_fpath: Path
side: str
baud_rate: int = 115200
_ser: serial.Serial | None = None
calibration: ExoskeletonCalibration | None = None
def __post_init__(self):
if self.calibration_fpath.is_file():
self._load_calibration()
@property
def is_connected(self) -> bool:
return self._ser is not None and getattr(self._ser, "is_open", False)
@property
def is_calibrated(self) -> bool:
return self.calibration is not None
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
return
try:
self._ser = serial.Serial(self.port, self.baud_rate, timeout=0.02)
self._ser.reset_input_buffer()
logger.info(f"connected: {self.port}")
except serial.SerialException as e:
raise ConnectionError(f"failed to connect to {self.port}: {e}") from e
if calibrate and not self.is_calibrated:
self.calibrate()
def disconnect(self) -> None:
if self._ser:
try:
self._ser.close()
finally:
self._ser = None
def _load_calibration(self) -> None:
try:
data = json.loads(self.calibration_fpath.read_text())
self.calibration = ExoskeletonCalibration.from_dict(data)
logger.info(f"loaded calibration: {self.calibration_fpath}")
except Exception as e:
logger.warning(f"failed to load calibration: {e}")
def read_raw(self) -> list[int] | None:
if not self._ser:
return None
return read_raw_from_serial(self._ser)
def get_angles(self) -> dict[str, float]:
if not self.calibration:
raise RuntimeError("exoskeleton not calibrated")
raw = self.read_raw()
return {} if raw is None else exo_raw_to_angles(raw, self.calibration)
def calibrate(self) -> None:
ser = self._ser
self.calibration = run_exo_calibration(ser, self.side, self.calibration_fpath)
@@ -0,0 +1,157 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from lerobot.robots.unitree_g1.g1_utils import G1_29_JointIndex
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
from ..teleoperator import Teleoperator
from .config_unitree_g1 import UnitreeG1TeleoperatorConfig
from .exo_ik import ExoskeletonIKHelper
from .exo_serial import ExoskeletonArm
logger = logging.getLogger(__name__)
class UnitreeG1Teleoperator(Teleoperator):
"""
Bimanual exoskeleton arms teleoperator for Unitree G1 arms.
Uses inverse kinematics: exoskeleton FK computes end-effector pose,
G1 IK solves for joint angles.
"""
config_class = UnitreeG1TeleoperatorConfig
name = "unitree_g1"
def __init__(self, config: UnitreeG1TeleoperatorConfig):
super().__init__(config)
self.config = config
# Setup calibration directory
self.calibration_dir = (
config.calibration_dir
if config.calibration_dir
else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name
)
self.calibration_dir.mkdir(parents=True, exist_ok=True)
left_id = f"{config.id}_left" if config.id else "left"
right_id = f"{config.id}_right" if config.id else "right"
# Create exoskeleton arm instances
self.left_arm = ExoskeletonArm(
port=config.left_arm_config.port,
baud_rate=config.left_arm_config.baud_rate,
calibration_fpath=self.calibration_dir / f"{left_id}.json",
side="left",
)
self.right_arm = ExoskeletonArm(
port=config.right_arm_config.port,
baud_rate=config.right_arm_config.baud_rate,
calibration_fpath=self.calibration_dir / f"{right_id}.json",
side="right",
)
self.ik_helper: ExoskeletonIKHelper | None = None
@cached_property
def action_features(self) -> dict[str, type]:
return {f"{name}.q": float for name in self._g1_joint_names}
@cached_property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@property
def is_calibrated(self) -> bool:
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
frozen_joints = [j.strip() for j in self.config.frozen_joints.split(",") if j.strip()]
self.ik_helper = ExoskeletonIKHelper(frozen_joints=frozen_joints)
logger.info("IK helper initialized")
def calibrate(self) -> None:
if not self.left_arm.is_calibrated:
logger.info("Starting calibration for left arm...")
self.left_arm.calibrate()
else:
logger.info("Left arm already calibrated. Skipping.")
if not self.right_arm.is_calibrated:
logger.info("Starting calibration for right arm...")
self.right_arm.calibrate()
else:
logger.info("Right arm already calibrated. Skipping.")
logger.info("Starting visualization to verify calibration...")
self.run_visualization_loop()
def configure(self) -> None:
pass
def get_action(self) -> dict[str, float]:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Exoskeleton arms do not support feedback")
def disconnect(self) -> None:
self.left_arm.disconnect()
self.right_arm.disconnect()
def run_visualization_loop(self):
"""Run interactive Meshcat visualization loop to verify tracking."""
if self.ik_helper is None:
frozen_joints = [j.strip() for j in self.config.frozen_joints.split(",") if j.strip()]
self.ik_helper = ExoskeletonIKHelper(frozen_joints=frozen_joints)
self.ik_helper.init_visualization()
print("\n" + "=" * 60)
print("Visualization running! Move the exoskeletons to test tracking.")
print("Press Ctrl+C to exit.")
print("=" * 60 + "\n")
try:
while True:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
self.ik_helper.update_visualization()
time.sleep(0.01)
except KeyboardInterrupt:
print("\n\nVisualization stopped.")
@cached_property
def _g1_joint_names(self) -> list[str]:
return [joint.name for joint in G1_29_JointIndex]
+17 -3
View File
@@ -13,11 +13,13 @@
# limitations under the License.
from enum import Enum
from typing import cast
from typing import TYPE_CHECKING, cast
from lerobot.utils.import_utils import make_device_from_device_class
from .config import TeleoperatorConfig
if TYPE_CHECKING:
from .teleoperator import Teleoperator
@@ -31,7 +33,7 @@ class TeleopEvents(Enum):
TERMINATE_EPISODE = "terminate_episode"
def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
# TODO(Steven): Consider just using the make_device_from_device_class for all types
if config.type == "keyboard":
from .keyboard import KeyboardTeleop
@@ -73,6 +75,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .homunculus import HomunculusArm
return HomunculusArm(config)
elif config.type == "unitree_g1":
from .unitree_g1 import UnitreeG1Teleoperator
return UnitreeG1Teleoperator(config)
elif config.type == "bi_so_leader":
from .bi_so_leader import BiSOLeader
@@ -81,8 +87,16 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .reachy2_teleoperator import Reachy2Teleoperator
return Reachy2Teleoperator(config)
elif config.type == "openarm_leader":
from .openarm_leader import OpenArmLeader
return OpenArmLeader(config)
elif config.type == "bi_openarm_leader":
from .bi_openarm_leader import BiOpenArmLeader
return BiOpenArmLeader(config)
else:
try:
return cast(Teleoperator, make_device_from_device_class(config))
return cast("Teleoperator", make_device_from_device_class(config))
except Exception as e:
raise ValueError(f"Error creating robot with config {config}: {e}") from e
+89
View File
@@ -525,3 +525,92 @@ def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory):
assert img.shape[0] == 3, f"Image {image_key} should have 3 channels"
assert_dataset_iteration_works(aggr_ds)
def test_aggregate_already_merged_dataset(tmp_path, lerobot_dataset_factory):
"""Regression test for aggregating a dataset that is itself a result of a previous merge.
This test reproduces the bug where merging datasets with multiple parquet files
(e.g., from a previous merge with file rotation) would cause FileNotFoundError
because metadata file indices were incorrectly preserved instead of being mapped
to their actual destination files.
The fix adds src_to_dst tracking in aggregate_data() to correctly map source
file indices to destination file indices.
"""
# Step 1: Create datasets A and B
ds_a = lerobot_dataset_factory(
root=tmp_path / "ds_a",
repo_id=f"{DUMMY_REPO_ID}_a",
total_episodes=4,
total_frames=200,
)
ds_b = lerobot_dataset_factory(
root=tmp_path / "ds_b",
repo_id=f"{DUMMY_REPO_ID}_b",
total_episodes=4,
total_frames=200,
)
# Step 2: Merge A+B into AB with small file size to force multiple files
aggregate_datasets(
repo_ids=[ds_a.repo_id, ds_b.repo_id],
roots=[ds_a.root, ds_b.root],
aggr_repo_id=f"{DUMMY_REPO_ID}_ab",
aggr_root=tmp_path / "ds_ab",
data_files_size_in_mb=0.01, # Force file rotation
)
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "ds_ab")
ds_ab = LeRobotDataset(f"{DUMMY_REPO_ID}_ab", root=tmp_path / "ds_ab")
# Verify AB has multiple data files (file rotation occurred)
ab_data_files = list((tmp_path / "ds_ab" / "data").rglob("*.parquet"))
assert len(ab_data_files) > 1, "First merge should create multiple parquet files"
# Step 3: Create dataset C
ds_c = lerobot_dataset_factory(
root=tmp_path / "ds_c",
repo_id=f"{DUMMY_REPO_ID}_c",
total_episodes=2,
total_frames=100,
)
# Step 4: Merge AB+C into final - THIS IS WHERE THE BUG OCCURRED
aggregate_datasets(
repo_ids=[ds_ab.repo_id, ds_c.repo_id],
roots=[ds_ab.root, ds_c.root],
aggr_repo_id=f"{DUMMY_REPO_ID}_abc",
aggr_root=tmp_path / "ds_abc",
)
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "ds_abc")
ds_abc = LeRobotDataset(f"{DUMMY_REPO_ID}_abc", root=tmp_path / "ds_abc")
# Step 5: Verify all data files referenced in metadata actually exist
for ep_idx in range(ds_abc.num_episodes):
data_file_path = ds_abc.root / ds_abc.meta.get_data_file_path(ep_idx)
assert data_file_path.exists(), (
f"Episode {ep_idx} references non-existent file: {data_file_path}\n"
"This indicates the src_to_dst mapping fix is not working correctly."
)
# Step 6: Verify we can iterate through the entire dataset without FileNotFoundError
expected_episodes = ds_a.num_episodes + ds_b.num_episodes + ds_c.num_episodes
expected_frames = ds_a.num_frames + ds_b.num_frames + ds_c.num_frames
assert ds_abc.num_episodes == expected_episodes
assert ds_abc.num_frames == expected_frames
# This would raise FileNotFoundError before the fix
assert_dataset_iteration_works(ds_abc)