mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 08:39:49 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ed773219c0 | |||
| a075669184 | |||
| bc953e4b5a | |||
| 6b62113515 |
@@ -50,7 +50,3 @@
|
||||
- local: backwardcomp
|
||||
title: Backward compatibility
|
||||
title: "About"
|
||||
- sections:
|
||||
- local: datasets
|
||||
title: "The LeRobotDataset Format"
|
||||
-title: "Datasets"
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
# The LeRobotDataset Format
|
||||
|
||||
`LeRobotDataset` is a standardized dataset format designed to address the specific needs of robot learning research.
|
||||
In this, it provides a unified and convenient access to robotics data across modalities, including sensorimotor readings, multiple camera feeds and teleoperation status.
|
||||
`LeRobotDataset` also stores general information regarding the data collected, like the task being performed by the teleoperator, the kind of robot used and measurement details like the frames per second at which the recording of both image and robot state's streams are proceeding.
|
||||
|
||||
Therefore, `LeRobotDataset` provides a unified interface for handling multi-modal, time-series data, and it integrates seamlessly with the PyTorch and Hugging Face ecosystems.
|
||||
`LeRobotDataset` is designed to be easily extensible and customizable by users, and it already supports openly available data coming from a variety of embodiments, ranging from manipulator platforms like the SO-100 and ALOHA-2, to real-world humanoid data, simulation datasets and self-driving car datasets.
|
||||
This dataset format is built to be both efficient for training and flexible enough to accommodate the diverse data types encountered in robotics, while promoting reproducibility and ease of use for users.
|
||||
|
||||
## The Format's Design
|
||||
|
||||
A core design choice behind `LeRobotDataset` is separating the underlying data storage from the user-facing API.
|
||||
This allows for efficient serialization and storage while presenting the data in an intuitive, ready-to-use format.
|
||||
A dataset is always organized into three main components:
|
||||
|
||||
1. **Tabular Data**: Low-dimensional, high-frequency data such as joint states, and actions are stored in efficient [Apache Parquet](https://parquet.apache.org/) files, and typically offloaded to the more mature `datasets` library, providing fast, memory-mapped access.
|
||||
2. **Visual Data**: To handle large volumes of camera data, frames are concatenated and encoded into MP4 files. Frames from the same episode are always grouped together into the same video, and multiple videos are grouped together by camera. To reduce stress on the file system, groups of videos for the same camera view are also broke into multiple sub-directories, after a given threshold number.
|
||||
3. **Metadata**: A collection of JSON files which describes the dataset's structure in terms of its metadata, serving as the relational counterpart to both the tabular and visual dimensions of data. Metadata include the different feature schemas, frame rates, normalization statistics, and episode boundaries.
|
||||
|
||||
For scalability, and to support datasets with potentially millions of trajectories resulting in hundreads of millions or billions of individual camera frames, we merge data from different episodes into the same high-level structure.
|
||||
Concretely, this means that any given tabular collection and video will not typically contain information about one episode only, but rather a concatenation of the information available in multiple episodes.
|
||||
This keeps the pressure on the file system, both locally and on remote storage providers like Hugging Face, manageable, at the expense of leveraging more heavily the metadata part of the data, e.g. used to reconstruct information relative to at which position a given episode starts or ends.
|
||||
An example structure for a given `LeRobotDataset` would appear as follows:
|
||||
|
||||
```bash
|
||||
lerobot/svla_so101_pickplace
|
||||
├── data/
|
||||
│ └── chunk-000/
|
||||
│ ├── file_000000.parquet
|
||||
│ └── ...
|
||||
├── meta/
|
||||
│ ├── episodes/
|
||||
│ │ ├── chunk-000/
|
||||
│ │ │ └── file_000000.parquet
|
||||
│ │ └── ...
|
||||
│ ├── info.json
|
||||
│ ├── stats.json
|
||||
│ └── tasks.jsonl
|
||||
└── videos/
|
||||
└── chunk-000/
|
||||
├── observation.images.wrist_camera/
|
||||
│ ├── file_000000.mp4
|
||||
│ └── ...
|
||||
└── ...
|
||||
```
|
||||
|
||||
- **`meta/info.json`**: This is the central metadata file. It contains the complete dataset schema, defining all features (e.g., `observation.state`, `action`), their shapes, and data types. It also stores crucial information like the dataset's frames-per-second (`fps`), codebase version, and the path templates used to locate data and video files.
|
||||
- **`meta/stats.json`**: This file stores aggregated statistics (mean, std, min, max) for each feature across the entire dataset. These are used for data normalization and are accessible via `dataset.meta.stats`.
|
||||
- **`meta/tasks.jsonl`**: Contains the mapping from natural language task descriptions to integer task indices, which are used for task-conditioned policy training.
|
||||
- **`meta/episodes/`**: This directory contains metadata about each individual episode, such as its length, corresponding task, and pointers to where its data is stored. For scalability, this information is stored in chunked Parquet files rather than a single large JSON file.
|
||||
- **`data/`**: Contains the core frame-by-frame tabular data in Parquet files. To improve performance and handle large datasets, data from **multiple episodes are concatenated into larger files**. These files are organized into chunked subdirectories to keep file sizes manageable. Therefore, a single file typically contains data for more than one episode.
|
||||
- **`videos/`**: Contains the MP4 video files for all visual observation streams. Similar to the `data/` directory, video footage from **multiple episodes is concatenated into single MP4 files**. This strategy significantly reduces the number of files in the dataset, which is more efficient for modern filesystems. The path structure (`/videos/<camera_key>/<chunk>/file_...mp4`) allows the data loader to locate the correct video file and then seek to the precise timestamp for a given frame.
|
||||
|
||||
## Code Example: Using `LeRobotDataset` with `torch.utils.data.DataLoader`
|
||||
|
||||
This section provides an overview of how to access datasets hosted on Hugging Face using the `LeRobotDataset` class.
|
||||
Every dataset on the Hugging Face Hub containing the three main pillars presented above (Tabular and Visual Data, as well as relational Metadata) can be assessed with a single line.
|
||||
Most reinforcement learning (RL) and behavioral cloning (BC) algorithms tend to operate on stack of observation and actions.
|
||||
For instance, RL algorithms typically use a history of previous observations `[o_{t-H}, ..., o_{t}]` to mitigate partial observability.
|
||||
BC cloning algorithms are instead typically trained to regress chunks of multiple actions rather than single controls.
|
||||
To accommodate for the specifics of robot learning training, `LeRobotDataset` provides a native windowing operation, whereby we can use the _seconds_ before and after any given observation using `delta_timestamps`.
|
||||
Non available frames is opportuninely padded, with a padding mask released to provide support in this.
|
||||
Notably, this all happens within the `LeRobotDataset` and is entitrely transparent to higher level wrappers such as `torch.utils.data.DataLoader`.
|
||||
|
||||
Conveniently, by using `LeRobotDataset` with a Pytorch `DataLoader` one can automatically collate the individual sample dictionaries from the dataset into a single dictionary of batched tensors.
|
||||
|
||||
```python
|
||||
from lerobot.datasets import LeRobotDataset
|
||||
|
||||
# Load from the Hugging Face Hub (will be cached locally)
|
||||
dataset = LeRobotDataset("lerobot/svla_so101_pickplace")
|
||||
|
||||
# Get the 100th frame in the dataset by
|
||||
sample = dataset[100]
|
||||
print(sample)
|
||||
# The sample is a dictionary of tensors
|
||||
# {
|
||||
# 'observation.state': tensor([...]),
|
||||
# 'action': tensor([...]),
|
||||
# 'observation.images.wrist_camera': tensor([C, H, W]),
|
||||
# 'timestamp': tensor(1.234),
|
||||
# ...
|
||||
# }
|
||||
delta_timestamps = {
|
||||
"observation.images.wrist_camera": [-0.2, -0.1, 0.0] # 0.2, and 0.1 seconds *before* any observation
|
||||
}
|
||||
dataset = LeRobotDataset(
|
||||
"lerobot/svla_so101_pickplace",
|
||||
delta_timestamps=delta_timestamps
|
||||
)
|
||||
|
||||
# Accessing an index now returns a stack of frames for the specified key
|
||||
sample = dataset[100]
|
||||
|
||||
# The image tensor will now have a time dimension
|
||||
# 'observation.images.wrist_camera' has shape [T, C, H, W], where T=3
|
||||
print(sample['observation.images.wrist_camera'].shape)
|
||||
|
||||
batch_size=16
|
||||
# wrap the dataset in a DataLoader to use process it batches for training purposes
|
||||
data_loader = torch.utils.data.DataLoader(
|
||||
dataset,
|
||||
batch_size=batch_size
|
||||
)
|
||||
|
||||
# 3. Iterate over the DataLoader in a training loop
|
||||
num_epochs = 1
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
for epoch in range(num_epochs):
|
||||
for batch in data_loader:
|
||||
# 'batch' is a dictionary where each value is a batch of tensors.
|
||||
# For example, batch['action'] will have a shape of [32, action_dim].
|
||||
|
||||
# If using delta_timestamps, a batched image tensor might have a
|
||||
# shape of [32, T, C, H, W].
|
||||
|
||||
# Move data to the appropriate device (e.g., GPU)
|
||||
observations = batch['observation.state'].to(device)
|
||||
actions = batch['action'].to(device)
|
||||
images = batch['observation.images.wrist_camera'].to(device)
|
||||
|
||||
# Next do amazing_model.forward(batch)
|
||||
...
|
||||
```
|
||||
|
||||
## Streaming
|
||||
|
||||
`LeRobotDataset` now also supports streaming mode.
|
||||
You can stream of data from a large dataset hosted on the Hugging Face Hub by just replacing the dataset definition with:
|
||||
|
||||
```python
|
||||
from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset
|
||||
|
||||
# Streams frames from the Hugging Face Hub
|
||||
dataset = StreamingLeRobotDataset("lerobot/svla_so101_pickplace")
|
||||
```
|
||||
|
||||
Streaming datasets supports high-performance batch processing (ca. 80-100 it/s, varying on connectivity) and high levels of frames randomization: a key feature for behavioral cloning algorithms otherwise operating on highly non-i.i.d. data.
|
||||
@@ -0,0 +1,47 @@
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_follower_right = ViperXConfig(
|
||||
# port="/dev/tty.usbserial-FT891KBG",
|
||||
# id="viperx_right",
|
||||
# )
|
||||
|
||||
# follower_right = ViperX(config_follower_right)
|
||||
# follower_right.connect(calibrate=False)
|
||||
# follower_right.calibrate()
|
||||
# follower_right.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_leader_right = WidowXConfig(
|
||||
# port="/dev/tty.usbserial-FT89FM77",
|
||||
# id="widowx_right",
|
||||
# )
|
||||
|
||||
# leader_right = WidowX(config_leader_right)
|
||||
# leader_right.connect(calibrate=False)
|
||||
# leader_right.calibrate()
|
||||
# leader_right.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_follower_left = ViperXConfig(
|
||||
# port="/dev/tty.usbserial-FT89FM09",
|
||||
# id="viperx_left",
|
||||
# )
|
||||
|
||||
# follower_left = ViperX(config_follower_left)
|
||||
# follower_left.connect(calibrate=False)
|
||||
# follower_left.calibrate()
|
||||
# follower_left.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_leader_left = WidowXConfig(
|
||||
# port="/dev/tty.usbserial-FT891KPN",
|
||||
# id="widowx_left",
|
||||
# )
|
||||
|
||||
# leader_left = WidowX(config_leader_left)
|
||||
# leader_left.connect(calibrate=False)
|
||||
# leader_left.calibrate()
|
||||
# leader_left.disconnect()
|
||||
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
ALOHA Bimanual Recording Script
|
||||
|
||||
This script records episodes using ALOHA dual-arm system (ViperX followers + WidowX leaders).
|
||||
|
||||
Usage:
|
||||
1. New dataset: Set RESUME = False
|
||||
2. Resume/append: Set RESUME = True (will continue from existing episodes)
|
||||
|
||||
The script will:
|
||||
- Record NUM_EPISODES new episodes
|
||||
- Show progress with total episode count
|
||||
- Push dataset to HuggingFace Hub when complete
|
||||
"""
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.record import record_loop
|
||||
from lerobot.robots.aloha import Aloha, AlohaConfig
|
||||
from lerobot.teleoperators.aloha_teleop import AlohaTeleop, AlohaTeleopConfig
|
||||
from lerobot.utils.control_utils import (
|
||||
init_keyboard_listener,
|
||||
sanity_check_dataset_name,
|
||||
sanity_check_dataset_robot_compatibility,
|
||||
)
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import _init_rerun
|
||||
|
||||
# Recording configuration
|
||||
NUM_EPISODES = 0
|
||||
FPS = 30
|
||||
EPISODE_TIME_SEC = 200
|
||||
RESET_TIME_SEC = 30
|
||||
TASK_DESCRIPTION = "First put the Hugging Face t shirt with both arms in the box, then place the hat with the right arm in the box."
|
||||
REPO_ID = "pepijn223/aloha_box_2"
|
||||
RESUME = True # Set to True to resume/append to existing dataset
|
||||
|
||||
# Create camera configuration
|
||||
camera_config = {
|
||||
"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=FPS),
|
||||
"wrist_right": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=FPS),
|
||||
"wrist_left": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=FPS),
|
||||
}
|
||||
|
||||
# ALOHA Robot Configuration (dual ViperX followers)
|
||||
aloha_robot_config = AlohaConfig(
|
||||
id="aloha",
|
||||
left_arm_port="/dev/tty.usbserial-FT89FM09",
|
||||
right_arm_port="/dev/tty.usbserial-FT891KBG",
|
||||
left_arm_max_relative_target=20.0,
|
||||
right_arm_max_relative_target=20.0,
|
||||
left_arm_use_degrees=True,
|
||||
right_arm_use_degrees=True,
|
||||
cameras=camera_config,
|
||||
)
|
||||
|
||||
# ALOHA Teleoperator Configuration (dual WidowX leaders)
|
||||
aloha_teleop_config = AlohaTeleopConfig(
|
||||
id="aloha_teleop",
|
||||
left_arm_port="/dev/tty.usbserial-FT891KPN",
|
||||
right_arm_port="/dev/tty.usbserial-FT89FM77",
|
||||
left_arm_gripper_motor="xl430-w250",
|
||||
right_arm_gripper_motor="xc430-w150",
|
||||
left_arm_use_degrees=True,
|
||||
right_arm_use_degrees=True,
|
||||
)
|
||||
|
||||
# Initialize the robot and teleoperator
|
||||
robot = Aloha(aloha_robot_config)
|
||||
teleop = AlohaTeleop(aloha_teleop_config)
|
||||
|
||||
# Configure the dataset features
|
||||
action_features = hw_to_dataset_features(robot.action_features, "action")
|
||||
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
|
||||
dataset_features = {**action_features, **obs_features}
|
||||
|
||||
# Create or resume the dataset
|
||||
if RESUME:
|
||||
print(f"Resuming existing dataset: {REPO_ID}")
|
||||
dataset = LeRobotDataset(
|
||||
repo_id=REPO_ID,
|
||||
root=None, # Use default root
|
||||
)
|
||||
|
||||
# Start image writer for cameras
|
||||
if hasattr(robot, "cameras") and len(robot.cameras) > 0:
|
||||
dataset.start_image_writer(
|
||||
num_processes=0, # Use threads only
|
||||
num_threads=4 * len(robot.cameras), # 4 threads per camera
|
||||
)
|
||||
|
||||
# Sanity check compatibility
|
||||
sanity_check_dataset_robot_compatibility(dataset, robot, FPS, dataset_features)
|
||||
print(f"Resumed dataset with {dataset.num_episodes} existing episodes")
|
||||
else:
|
||||
print(f"Creating new dataset: {REPO_ID}")
|
||||
# Sanity check dataset name
|
||||
sanity_check_dataset_name(REPO_ID, None)
|
||||
|
||||
# Create new dataset
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=REPO_ID,
|
||||
fps=FPS,
|
||||
features=dataset_features,
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4 * len(robot.cameras), # 4 threads per camera
|
||||
)
|
||||
|
||||
# Initialize the keyboard listener and rerun visualization
|
||||
_, events = init_keyboard_listener()
|
||||
_init_rerun(session_name="aloha_recording")
|
||||
|
||||
# Connect the robot and teleoperator
|
||||
robot.connect()
|
||||
teleop.connect()
|
||||
|
||||
episode_idx = 0
|
||||
total_episodes_to_record = NUM_EPISODES
|
||||
existing_episodes = dataset.num_episodes if RESUME else 0
|
||||
|
||||
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
|
||||
current_episode = existing_episodes + episode_idx + 1
|
||||
log_say(f"Recording episode {current_episode} (batch: {episode_idx + 1}/{NUM_EPISODES})")
|
||||
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
teleop=teleop,
|
||||
dataset=dataset,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]):
|
||||
log_say("Reset the environment")
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
teleop=teleop,
|
||||
control_time_s=RESET_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
log_say("Re-recording episode")
|
||||
events["rerecord_episode"] = False
|
||||
events["exit_early"] = False
|
||||
dataset.clear_episode_buffer()
|
||||
continue
|
||||
|
||||
dataset.save_episode()
|
||||
episode_idx += 1
|
||||
|
||||
# Clean up
|
||||
log_say("Stop recording")
|
||||
robot.disconnect()
|
||||
teleop.disconnect()
|
||||
|
||||
# Summary
|
||||
final_episodes = dataset.num_episodes
|
||||
log_say(f"Dataset now contains {final_episodes} episodes total")
|
||||
|
||||
# Push to hub
|
||||
dataset.push_to_hub()
|
||||
log_say(f"Dataset '{REPO_ID}' pushed to HuggingFace Hub")
|
||||
@@ -0,0 +1,93 @@
|
||||
import time
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.robots.viperx import ViperX, ViperXConfig
|
||||
from lerobot.teleoperators.widowx import WidowX, WidowXConfig
|
||||
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
||||
|
||||
camera_config = {
|
||||
"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30),
|
||||
"wrist_right": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30),
|
||||
"wrist_left": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=30),
|
||||
}
|
||||
|
||||
config_follower_right = ViperXConfig(
|
||||
port="/dev/tty.usbserial-FT891KBG",
|
||||
id="viperx_right",
|
||||
max_relative_target=10.0, # increased from default 5.0 to 10.0
|
||||
use_degrees=True,
|
||||
cameras=camera_config,
|
||||
)
|
||||
|
||||
config_leader_right = WidowXConfig(
|
||||
port="/dev/tty.usbserial-FT89FM77",
|
||||
id="widowx_right",
|
||||
gripper_motor="xc430-w150",
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
config_follower_left = ViperXConfig(
|
||||
port="/dev/tty.usbserial-FT89FM09",
|
||||
id="viperx_left",
|
||||
max_relative_target=10.0, # increased from default 5.0 to 10.0
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
config_leader_left = WidowXConfig(
|
||||
port="/dev/tty.usbserial-FT891KPN",
|
||||
id="widowx_left",
|
||||
gripper_motor="xl430-w250",
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
_init_rerun(session_name="teleop")
|
||||
|
||||
follower_right = ViperX(config_follower_right)
|
||||
follower_right.connect()
|
||||
|
||||
leader_right = WidowX(config_leader_right)
|
||||
leader_right.connect()
|
||||
|
||||
follower_left = ViperX(config_follower_left)
|
||||
follower_left.connect()
|
||||
|
||||
leader_left = WidowX(config_leader_left)
|
||||
leader_left.connect()
|
||||
|
||||
|
||||
while True:
|
||||
act_right = leader_right.get_action()
|
||||
obs_right = follower_right.get_observation()
|
||||
|
||||
act_left = leader_left.get_action()
|
||||
obs_left = follower_left.get_observation()
|
||||
|
||||
print("=" * 60)
|
||||
print("ACTION (Leader Right):")
|
||||
for key, value in act_right.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("\nOBSERVATION (Follower Right):")
|
||||
for key, value in obs_right.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("=" * 60)
|
||||
print("ACTION (Leader Left):")
|
||||
for key, value in act_left.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("\nOBSERVATION (Follower Left):")
|
||||
for key, value in obs_left.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
print("=" * 60)
|
||||
|
||||
log_rerun_data({**obs_right, **obs_left}, {**act_right, **act_left})
|
||||
|
||||
follower_right.send_action(act_right)
|
||||
follower_left.send_action(act_left)
|
||||
|
||||
time.sleep(0.02)
|
||||
@@ -0,0 +1,4 @@
|
||||
from .aloha import Aloha
|
||||
from .config_aloha import AlohaConfig
|
||||
|
||||
__all__ = ["Aloha", "AlohaConfig"]
|
||||
@@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.robots.viperx import ViperX
|
||||
from lerobot.robots.viperx.config_viperx import ViperXConfig
|
||||
|
||||
from ..robot import Robot
|
||||
from .config_aloha import AlohaConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Aloha(Robot):
|
||||
"""
|
||||
ALOHA Bimanual Robot System using dual ViperX follower arms.
|
||||
Based on the ALOHA (A Low-cost Open-source Hardware System for Bimanual Teleoperation) design.
|
||||
"""
|
||||
|
||||
config_class = AlohaConfig
|
||||
name = "aloha"
|
||||
|
||||
def __init__(self, config: AlohaConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
|
||||
left_arm_config = ViperXConfig(
|
||||
id=f"{config.id}_left" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.left_arm_port,
|
||||
max_relative_target=config.left_arm_max_relative_target,
|
||||
use_degrees=config.left_arm_use_degrees,
|
||||
cameras={},
|
||||
)
|
||||
|
||||
right_arm_config = ViperXConfig(
|
||||
id=f"{config.id}_right" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.right_arm_port,
|
||||
max_relative_target=config.right_arm_max_relative_target,
|
||||
use_degrees=config.right_arm_use_degrees,
|
||||
cameras={},
|
||||
)
|
||||
|
||||
self.left_arm = ViperX(left_arm_config)
|
||||
self.right_arm = ViperX(right_arm_config)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"left_{motor}.pos": float for motor in self.left_arm.bus.motors} | {
|
||||
f"right_{motor}.pos": float for motor in self.right_arm.bus.motors
|
||||
}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return (
|
||||
self.left_arm.bus.is_connected
|
||||
and self.right_arm.bus.is_connected
|
||||
and all(cam.is_connected for cam in self.cameras.values())
|
||||
)
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
self.left_arm.connect(calibrate)
|
||||
self.right_arm.connect(calibrate)
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.left_arm.calibrate()
|
||||
self.right_arm.calibrate()
|
||||
|
||||
def configure(self) -> None:
|
||||
self.left_arm.configure()
|
||||
self.right_arm.configure()
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
self.left_arm.setup_motors()
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
obs_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
left_obs = self.left_arm.get_observation()
|
||||
obs_dict.update({f"left_{key}": value for key, value in left_obs.items()})
|
||||
|
||||
# Add "right_" prefix
|
||||
right_obs = self.right_arm.get_observation()
|
||||
obs_dict.update({f"right_{key}": value for key, value in right_obs.items()})
|
||||
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
# Remove "left_" prefix
|
||||
left_action = {
|
||||
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
|
||||
}
|
||||
# Remove "right_" prefix
|
||||
right_action = {
|
||||
key.removeprefix("right_"): value for key, value in action.items() if key.startswith("right_")
|
||||
}
|
||||
|
||||
send_action_left = self.left_arm.send_action(left_action)
|
||||
send_action_right = self.right_arm.send_action(right_action)
|
||||
|
||||
# Add prefixes back
|
||||
prefixed_send_action_left = {f"left_{key}": value for key, value in send_action_left.items()}
|
||||
prefixed_send_action_right = {f"right_{key}": value for key, value in send_action_right.items()}
|
||||
|
||||
return {**prefixed_send_action_left, **prefixed_send_action_right}
|
||||
|
||||
def disconnect(self):
|
||||
self.left_arm.disconnect()
|
||||
self.right_arm.disconnect()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
@@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("aloha")
|
||||
@dataclass
|
||||
class AlohaConfig(RobotConfig):
|
||||
left_arm_port: str
|
||||
right_arm_port: str
|
||||
|
||||
# Optional parameters for left arm (ViperX)
|
||||
left_arm_max_relative_target: float | dict[str, float] = 20.0
|
||||
left_arm_use_degrees: bool = True
|
||||
|
||||
# Optional parameters for right arm (ViperX)
|
||||
right_arm_max_relative_target: float | dict[str, float] = 20.0
|
||||
right_arm_use_degrees: bool = True
|
||||
|
||||
# cameras (shared between both arms)
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
@@ -43,3 +43,6 @@ class ViperXConfig(RobotConfig):
|
||||
# Troubleshooting: If one of your IntelRealSense cameras freeze during
|
||||
# data recording due to bandwidth limit, you might need to plug the camera
|
||||
# on another USB hub or PCIe card.
|
||||
|
||||
# Set to `True` for backward compatibility with previous policies/dataset
|
||||
use_degrees: bool = False
|
||||
|
||||
@@ -18,7 +18,6 @@ from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.constants import OBS_STATE
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
@@ -45,22 +44,23 @@ class ViperX(Robot):
|
||||
self,
|
||||
config: ViperXConfig,
|
||||
):
|
||||
raise NotImplementedError
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder": Motor(2, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_shadow": Motor(3, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"elbow": Motor(4, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_shadow": Motor(5, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"forearm_roll": Motor(6, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_angle": Motor(7, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_rotate": Motor(8, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"waist": Motor(1, "xm540-w270", norm_mode_body),
|
||||
"shoulder": Motor(2, "xm540-w270", norm_mode_body),
|
||||
"shoulder_shadow": Motor(3, "xm540-w270", norm_mode_body),
|
||||
"elbow": Motor(4, "xm540-w270", norm_mode_body),
|
||||
"elbow_shadow": Motor(5, "xm540-w270", norm_mode_body),
|
||||
"forearm_roll": Motor(6, "xm540-w270", norm_mode_body),
|
||||
"wrist_angle": Motor(7, "xm540-w270", norm_mode_body),
|
||||
"wrist_rotate": Motor(8, "xm430-w350", norm_mode_body),
|
||||
"gripper": Motor(9, "xm430-w350", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@@ -96,6 +96,9 @@ class ViperX(Robot):
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
@@ -109,16 +112,24 @@ class ViperX(Robot):
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
self.bus.write_calibration(self.calibration)
|
||||
return
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
input("Move robot to the middle of its range of motion and press ENTER....")
|
||||
input(f"Move {self} to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets = self.bus.set_half_turn_homings()
|
||||
|
||||
full_turn_motors = ["shoulder_pan", "wrist_roll"]
|
||||
full_turn_motors = ["shoulder", "forearm_roll", "wrist_rotate"]
|
||||
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
|
||||
print(
|
||||
f"Move all joints except {full_turn_motors} sequentially through their entire "
|
||||
@@ -153,33 +164,23 @@ class ViperX(Robot):
|
||||
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
|
||||
self.bus.write("Secondary_ID", "elbow_shadow", 4)
|
||||
|
||||
# Set a velocity limit of 131 as advised by Trossen Robotics
|
||||
# TODO(aliberts): remove as it's actually useless in position control
|
||||
self.bus.write("Velocity_Limit", 131)
|
||||
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
|
||||
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
|
||||
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point.
|
||||
# See: https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11
|
||||
for motor in self.bus.motors:
|
||||
if motor != "gripper":
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# TODO(pepijn): Re enable this
|
||||
# Use 'position control current based' for follower gripper to be limited by the limit of the
|
||||
# current. It can grasp an object without forcing too much even tho, it's goal position is a
|
||||
# complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
# self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
"""The returned observations do not have a batch dimension."""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position")
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
from .aloha_teleop import AlohaTeleop
|
||||
from .config_aloha_teleop import AlohaTeleopConfig
|
||||
|
||||
__all__ = ["AlohaTeleop", "AlohaTeleopConfig"]
|
||||
@@ -0,0 +1,125 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from functools import cached_property
|
||||
|
||||
from lerobot.teleoperators.widowx.config_widowx import WidowXConfig
|
||||
from lerobot.teleoperators.widowx.widowx import WidowX
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_aloha_teleop import AlohaTeleopConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AlohaTeleop(Teleoperator):
|
||||
"""
|
||||
ALOHA Bimanual Teleoperator System using dual WidowX leader arms.
|
||||
Based on the ALOHA (A Low-cost Open-source Hardware System for Bimanual Teleoperation) design.
|
||||
"""
|
||||
|
||||
config_class = AlohaTeleopConfig
|
||||
name = "aloha_teleop"
|
||||
|
||||
def __init__(self, config: AlohaTeleopConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
|
||||
left_arm_config = WidowXConfig(
|
||||
id=f"{config.id}_left" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.left_arm_port,
|
||||
gripper_motor=config.left_arm_gripper_motor,
|
||||
use_degrees=config.left_arm_use_degrees,
|
||||
)
|
||||
|
||||
right_arm_config = WidowXConfig(
|
||||
id=f"{config.id}_right" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.right_arm_port,
|
||||
gripper_motor=config.right_arm_gripper_motor,
|
||||
use_degrees=config.right_arm_use_degrees,
|
||||
)
|
||||
|
||||
self.left_arm = WidowX(left_arm_config)
|
||||
self.right_arm = WidowX(right_arm_config)
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return {f"left_{motor}.pos": float for motor in self.left_arm.bus.motors} | {
|
||||
f"right_{motor}.pos": float for motor in self.right_arm.bus.motors
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.left_arm.is_connected and self.right_arm.is_connected
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
self.left_arm.connect(calibrate)
|
||||
self.right_arm.connect(calibrate)
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.left_arm.calibrate()
|
||||
self.right_arm.calibrate()
|
||||
|
||||
def configure(self) -> None:
|
||||
self.left_arm.configure()
|
||||
self.right_arm.configure()
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
self.left_arm.setup_motors()
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
action_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
left_action = self.left_arm.get_action()
|
||||
action_dict.update({f"left_{key}": value for key, value in left_action.items()})
|
||||
|
||||
# Add "right_" prefix
|
||||
right_action = self.right_arm.get_action()
|
||||
action_dict.update({f"right_{key}": value for key, value in right_action.items()})
|
||||
|
||||
return action_dict
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# Remove "left_" prefix
|
||||
left_feedback = {
|
||||
key.removeprefix("left_"): value for key, value in feedback.items() if key.startswith("left_")
|
||||
}
|
||||
# Remove "right_" prefix
|
||||
right_feedback = {
|
||||
key.removeprefix("right_"): value for key, value in feedback.items() if key.startswith("right_")
|
||||
}
|
||||
|
||||
if left_feedback:
|
||||
self.left_arm.send_feedback(left_feedback)
|
||||
if right_feedback:
|
||||
self.right_arm.send_feedback(right_feedback)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
self.left_arm.disconnect()
|
||||
self.right_arm.disconnect()
|
||||
@@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from ..config import TeleoperatorConfig
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("aloha_teleop")
|
||||
@dataclass
|
||||
class AlohaTeleopConfig(TeleoperatorConfig):
|
||||
left_arm_port: str
|
||||
right_arm_port: str
|
||||
|
||||
# Parameters for left arm (WidowX)
|
||||
left_arm_gripper_motor: str = "xl430-w250"
|
||||
left_arm_use_degrees: bool = True
|
||||
|
||||
# Parameters for right arm (WidowX)
|
||||
right_arm_gripper_motor: str = "xc430-w150"
|
||||
right_arm_use_degrees: bool = True
|
||||
@@ -23,3 +23,7 @@ from ..config import TeleoperatorConfig
|
||||
@dataclass
|
||||
class WidowXConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the arm
|
||||
|
||||
gripper_motor: str = "xl430-w250" # This could be xc430-w150 or xl430-w250
|
||||
|
||||
use_degrees: bool = False
|
||||
|
||||
@@ -20,7 +20,6 @@ import time
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
DriveMode,
|
||||
DynamixelMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
@@ -40,22 +39,27 @@ class WidowX(Teleoperator):
|
||||
name = "widowx"
|
||||
|
||||
def __init__(self, config: WidowXConfig):
|
||||
raise NotImplementedError
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder": Motor(2, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_shadow": Motor(3, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"elbow": Motor(4, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_shadow": Motor(5, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"forearm_roll": Motor(6, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_angle": Motor(7, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_rotate": Motor(8, "xl430-w250", MotorNormMode.RANGE_M100_100),
|
||||
"gripper": Motor(9, "xc430-w150", MotorNormMode.RANGE_0_100),
|
||||
"waist": Motor(1, "xm430-w350", norm_mode_body),
|
||||
"shoulder": Motor(2, "xm430-w350", norm_mode_body),
|
||||
"shoulder_shadow": Motor(3, "xm430-w350", norm_mode_body),
|
||||
"elbow": Motor(4, "xm430-w350", norm_mode_body),
|
||||
"elbow_shadow": Motor(5, "xm430-w350", norm_mode_body),
|
||||
"forearm_roll": Motor(6, "xm430-w350", norm_mode_body),
|
||||
"wrist_angle": Motor(7, "xm430-w350", norm_mode_body),
|
||||
"wrist_rotate": Motor(
|
||||
8, "xm430-w350", norm_mode_body
|
||||
), # This could be xl430-w250 or xm430-w350
|
||||
"gripper": Motor(
|
||||
9, self.config.gripper_motor, MotorNormMode.RANGE_0_100
|
||||
), # This could be xc430-w150 or xl430-w250
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
|
||||
@property
|
||||
@@ -76,6 +80,9 @@ class WidowX(Teleoperator):
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
self.configure()
|
||||
@@ -86,19 +93,27 @@ class WidowX(Teleoperator):
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
self.bus.write_calibration(self.calibration)
|
||||
return
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
|
||||
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors}
|
||||
# self.bus.write("Drive_Mode", "el", DriveMode.INVERTED.value)
|
||||
# drive_modes = {motor: 1 if motor == ["elbow_shadow", "shoulder_shadow"] else 0 for motor in self.bus.motors}
|
||||
|
||||
input("Move robot to the middle of its range of motion and press ENTER....")
|
||||
input(f"Move {self} to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets = self.bus.set_half_turn_homings()
|
||||
|
||||
full_turn_motors = ["shoulder_pan", "wrist_roll"]
|
||||
full_turn_motors = ["shoulder", "forearm_roll", "wrist_rotate"]
|
||||
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
|
||||
print(
|
||||
f"Move all joints except {full_turn_motors} sequentially through their "
|
||||
@@ -113,7 +128,7 @@ class WidowX(Teleoperator):
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=drive_modes[motor],
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
@@ -133,6 +148,22 @@ class WidowX(Teleoperator):
|
||||
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
|
||||
self.bus.write("Secondary_ID", "elbow_shadow", 4)
|
||||
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# TODO(pepijn): Re enable this
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
||||
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
||||
# its goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
||||
# to make it move, and it will move back to its original target position when we release the force.
|
||||
# self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
# Set gripper's goal pos in current position mode so that we can use it as a trigger.
|
||||
# self.bus.enable_torque("gripper")
|
||||
|
||||
if self.is_calibrated:
|
||||
self.bus.write("Goal_Position", "gripper", self.config.gripper_open_pos)
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
Reference in New Issue
Block a user