mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 14:49:43 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ed773219c0 | |||
| a075669184 | |||
| bc953e4b5a | |||
| 6b62113515 |
@@ -0,0 +1,47 @@
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_follower_right = ViperXConfig(
|
||||
# port="/dev/tty.usbserial-FT891KBG",
|
||||
# id="viperx_right",
|
||||
# )
|
||||
|
||||
# follower_right = ViperX(config_follower_right)
|
||||
# follower_right.connect(calibrate=False)
|
||||
# follower_right.calibrate()
|
||||
# follower_right.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_leader_right = WidowXConfig(
|
||||
# port="/dev/tty.usbserial-FT89FM77",
|
||||
# id="widowx_right",
|
||||
# )
|
||||
|
||||
# leader_right = WidowX(config_leader_right)
|
||||
# leader_right.connect(calibrate=False)
|
||||
# leader_right.calibrate()
|
||||
# leader_right.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_follower_left = ViperXConfig(
|
||||
# port="/dev/tty.usbserial-FT89FM09",
|
||||
# id="viperx_left",
|
||||
# )
|
||||
|
||||
# follower_left = ViperX(config_follower_left)
|
||||
# follower_left.connect(calibrate=False)
|
||||
# follower_left.calibrate()
|
||||
# follower_left.disconnect()
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# config_leader_left = WidowXConfig(
|
||||
# port="/dev/tty.usbserial-FT891KPN",
|
||||
# id="widowx_left",
|
||||
# )
|
||||
|
||||
# leader_left = WidowX(config_leader_left)
|
||||
# leader_left.connect(calibrate=False)
|
||||
# leader_left.calibrate()
|
||||
# leader_left.disconnect()
|
||||
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
ALOHA Bimanual Recording Script
|
||||
|
||||
This script records episodes using ALOHA dual-arm system (ViperX followers + WidowX leaders).
|
||||
|
||||
Usage:
|
||||
1. New dataset: Set RESUME = False
|
||||
2. Resume/append: Set RESUME = True (will continue from existing episodes)
|
||||
|
||||
The script will:
|
||||
- Record NUM_EPISODES new episodes
|
||||
- Show progress with total episode count
|
||||
- Push dataset to HuggingFace Hub when complete
|
||||
"""
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.record import record_loop
|
||||
from lerobot.robots.aloha import Aloha, AlohaConfig
|
||||
from lerobot.teleoperators.aloha_teleop import AlohaTeleop, AlohaTeleopConfig
|
||||
from lerobot.utils.control_utils import (
|
||||
init_keyboard_listener,
|
||||
sanity_check_dataset_name,
|
||||
sanity_check_dataset_robot_compatibility,
|
||||
)
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import _init_rerun
|
||||
|
||||
# Recording configuration
|
||||
NUM_EPISODES = 0
|
||||
FPS = 30
|
||||
EPISODE_TIME_SEC = 200
|
||||
RESET_TIME_SEC = 30
|
||||
TASK_DESCRIPTION = "First put the Hugging Face t shirt with both arms in the box, then place the hat with the right arm in the box."
|
||||
REPO_ID = "pepijn223/aloha_box_2"
|
||||
RESUME = True # Set to True to resume/append to existing dataset
|
||||
|
||||
# Create camera configuration
|
||||
camera_config = {
|
||||
"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=FPS),
|
||||
"wrist_right": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=FPS),
|
||||
"wrist_left": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=FPS),
|
||||
}
|
||||
|
||||
# ALOHA Robot Configuration (dual ViperX followers)
|
||||
aloha_robot_config = AlohaConfig(
|
||||
id="aloha",
|
||||
left_arm_port="/dev/tty.usbserial-FT89FM09",
|
||||
right_arm_port="/dev/tty.usbserial-FT891KBG",
|
||||
left_arm_max_relative_target=20.0,
|
||||
right_arm_max_relative_target=20.0,
|
||||
left_arm_use_degrees=True,
|
||||
right_arm_use_degrees=True,
|
||||
cameras=camera_config,
|
||||
)
|
||||
|
||||
# ALOHA Teleoperator Configuration (dual WidowX leaders)
|
||||
aloha_teleop_config = AlohaTeleopConfig(
|
||||
id="aloha_teleop",
|
||||
left_arm_port="/dev/tty.usbserial-FT891KPN",
|
||||
right_arm_port="/dev/tty.usbserial-FT89FM77",
|
||||
left_arm_gripper_motor="xl430-w250",
|
||||
right_arm_gripper_motor="xc430-w150",
|
||||
left_arm_use_degrees=True,
|
||||
right_arm_use_degrees=True,
|
||||
)
|
||||
|
||||
# Initialize the robot and teleoperator
|
||||
robot = Aloha(aloha_robot_config)
|
||||
teleop = AlohaTeleop(aloha_teleop_config)
|
||||
|
||||
# Configure the dataset features
|
||||
action_features = hw_to_dataset_features(robot.action_features, "action")
|
||||
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
|
||||
dataset_features = {**action_features, **obs_features}
|
||||
|
||||
# Create or resume the dataset
|
||||
if RESUME:
|
||||
print(f"Resuming existing dataset: {REPO_ID}")
|
||||
dataset = LeRobotDataset(
|
||||
repo_id=REPO_ID,
|
||||
root=None, # Use default root
|
||||
)
|
||||
|
||||
# Start image writer for cameras
|
||||
if hasattr(robot, "cameras") and len(robot.cameras) > 0:
|
||||
dataset.start_image_writer(
|
||||
num_processes=0, # Use threads only
|
||||
num_threads=4 * len(robot.cameras), # 4 threads per camera
|
||||
)
|
||||
|
||||
# Sanity check compatibility
|
||||
sanity_check_dataset_robot_compatibility(dataset, robot, FPS, dataset_features)
|
||||
print(f"Resumed dataset with {dataset.num_episodes} existing episodes")
|
||||
else:
|
||||
print(f"Creating new dataset: {REPO_ID}")
|
||||
# Sanity check dataset name
|
||||
sanity_check_dataset_name(REPO_ID, None)
|
||||
|
||||
# Create new dataset
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id=REPO_ID,
|
||||
fps=FPS,
|
||||
features=dataset_features,
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4 * len(robot.cameras), # 4 threads per camera
|
||||
)
|
||||
|
||||
# Initialize the keyboard listener and rerun visualization
|
||||
_, events = init_keyboard_listener()
|
||||
_init_rerun(session_name="aloha_recording")
|
||||
|
||||
# Connect the robot and teleoperator
|
||||
robot.connect()
|
||||
teleop.connect()
|
||||
|
||||
episode_idx = 0
|
||||
total_episodes_to_record = NUM_EPISODES
|
||||
existing_episodes = dataset.num_episodes if RESUME else 0
|
||||
|
||||
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
|
||||
current_episode = existing_episodes + episode_idx + 1
|
||||
log_say(f"Recording episode {current_episode} (batch: {episode_idx + 1}/{NUM_EPISODES})")
|
||||
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
teleop=teleop,
|
||||
dataset=dataset,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
|
||||
# Reset the environment if not stopping or re-recording
|
||||
if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]):
|
||||
log_say("Reset the environment")
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
teleop=teleop,
|
||||
control_time_s=RESET_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
log_say("Re-recording episode")
|
||||
events["rerecord_episode"] = False
|
||||
events["exit_early"] = False
|
||||
dataset.clear_episode_buffer()
|
||||
continue
|
||||
|
||||
dataset.save_episode()
|
||||
episode_idx += 1
|
||||
|
||||
# Clean up
|
||||
log_say("Stop recording")
|
||||
robot.disconnect()
|
||||
teleop.disconnect()
|
||||
|
||||
# Summary
|
||||
final_episodes = dataset.num_episodes
|
||||
log_say(f"Dataset now contains {final_episodes} episodes total")
|
||||
|
||||
# Push to hub
|
||||
dataset.push_to_hub()
|
||||
log_say(f"Dataset '{REPO_ID}' pushed to HuggingFace Hub")
|
||||
@@ -0,0 +1,93 @@
|
||||
import time
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
|
||||
from lerobot.robots.viperx import ViperX, ViperXConfig
|
||||
from lerobot.teleoperators.widowx import WidowX, WidowXConfig
|
||||
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
||||
|
||||
camera_config = {
|
||||
"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30),
|
||||
"wrist_right": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30),
|
||||
"wrist_left": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=30),
|
||||
}
|
||||
|
||||
config_follower_right = ViperXConfig(
|
||||
port="/dev/tty.usbserial-FT891KBG",
|
||||
id="viperx_right",
|
||||
max_relative_target=10.0, # increased from default 5.0 to 10.0
|
||||
use_degrees=True,
|
||||
cameras=camera_config,
|
||||
)
|
||||
|
||||
config_leader_right = WidowXConfig(
|
||||
port="/dev/tty.usbserial-FT89FM77",
|
||||
id="widowx_right",
|
||||
gripper_motor="xc430-w150",
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
config_follower_left = ViperXConfig(
|
||||
port="/dev/tty.usbserial-FT89FM09",
|
||||
id="viperx_left",
|
||||
max_relative_target=10.0, # increased from default 5.0 to 10.0
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
config_leader_left = WidowXConfig(
|
||||
port="/dev/tty.usbserial-FT891KPN",
|
||||
id="widowx_left",
|
||||
gripper_motor="xl430-w250",
|
||||
use_degrees=True,
|
||||
)
|
||||
|
||||
_init_rerun(session_name="teleop")
|
||||
|
||||
follower_right = ViperX(config_follower_right)
|
||||
follower_right.connect()
|
||||
|
||||
leader_right = WidowX(config_leader_right)
|
||||
leader_right.connect()
|
||||
|
||||
follower_left = ViperX(config_follower_left)
|
||||
follower_left.connect()
|
||||
|
||||
leader_left = WidowX(config_leader_left)
|
||||
leader_left.connect()
|
||||
|
||||
|
||||
while True:
|
||||
act_right = leader_right.get_action()
|
||||
obs_right = follower_right.get_observation()
|
||||
|
||||
act_left = leader_left.get_action()
|
||||
obs_left = follower_left.get_observation()
|
||||
|
||||
print("=" * 60)
|
||||
print("ACTION (Leader Right):")
|
||||
for key, value in act_right.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("\nOBSERVATION (Follower Right):")
|
||||
for key, value in obs_right.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("=" * 60)
|
||||
print("ACTION (Leader Left):")
|
||||
for key, value in act_left.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
|
||||
print("\nOBSERVATION (Follower Left):")
|
||||
for key, value in obs_left.items():
|
||||
if key.endswith(".pos"):
|
||||
print(f" {key:20}: {value:8.3f}")
|
||||
print("=" * 60)
|
||||
|
||||
log_rerun_data({**obs_right, **obs_left}, {**act_right, **act_left})
|
||||
|
||||
follower_right.send_action(act_right)
|
||||
follower_left.send_action(act_left)
|
||||
|
||||
time.sleep(0.02)
|
||||
@@ -0,0 +1,4 @@
|
||||
from .aloha import Aloha
|
||||
from .config_aloha import AlohaConfig
|
||||
|
||||
__all__ = ["Aloha", "AlohaConfig"]
|
||||
@@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.robots.viperx import ViperX
|
||||
from lerobot.robots.viperx.config_viperx import ViperXConfig
|
||||
|
||||
from ..robot import Robot
|
||||
from .config_aloha import AlohaConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Aloha(Robot):
|
||||
"""
|
||||
ALOHA Bimanual Robot System using dual ViperX follower arms.
|
||||
Based on the ALOHA (A Low-cost Open-source Hardware System for Bimanual Teleoperation) design.
|
||||
"""
|
||||
|
||||
config_class = AlohaConfig
|
||||
name = "aloha"
|
||||
|
||||
def __init__(self, config: AlohaConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
|
||||
left_arm_config = ViperXConfig(
|
||||
id=f"{config.id}_left" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.left_arm_port,
|
||||
max_relative_target=config.left_arm_max_relative_target,
|
||||
use_degrees=config.left_arm_use_degrees,
|
||||
cameras={},
|
||||
)
|
||||
|
||||
right_arm_config = ViperXConfig(
|
||||
id=f"{config.id}_right" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.right_arm_port,
|
||||
max_relative_target=config.right_arm_max_relative_target,
|
||||
use_degrees=config.right_arm_use_degrees,
|
||||
cameras={},
|
||||
)
|
||||
|
||||
self.left_arm = ViperX(left_arm_config)
|
||||
self.right_arm = ViperX(right_arm_config)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"left_{motor}.pos": float for motor in self.left_arm.bus.motors} | {
|
||||
f"right_{motor}.pos": float for motor in self.right_arm.bus.motors
|
||||
}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return (
|
||||
self.left_arm.bus.is_connected
|
||||
and self.right_arm.bus.is_connected
|
||||
and all(cam.is_connected for cam in self.cameras.values())
|
||||
)
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
self.left_arm.connect(calibrate)
|
||||
self.right_arm.connect(calibrate)
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.left_arm.calibrate()
|
||||
self.right_arm.calibrate()
|
||||
|
||||
def configure(self) -> None:
|
||||
self.left_arm.configure()
|
||||
self.right_arm.configure()
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
self.left_arm.setup_motors()
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
obs_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
left_obs = self.left_arm.get_observation()
|
||||
obs_dict.update({f"left_{key}": value for key, value in left_obs.items()})
|
||||
|
||||
# Add "right_" prefix
|
||||
right_obs = self.right_arm.get_observation()
|
||||
obs_dict.update({f"right_{key}": value for key, value in right_obs.items()})
|
||||
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
# Remove "left_" prefix
|
||||
left_action = {
|
||||
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")
|
||||
}
|
||||
# Remove "right_" prefix
|
||||
right_action = {
|
||||
key.removeprefix("right_"): value for key, value in action.items() if key.startswith("right_")
|
||||
}
|
||||
|
||||
send_action_left = self.left_arm.send_action(left_action)
|
||||
send_action_right = self.right_arm.send_action(right_action)
|
||||
|
||||
# Add prefixes back
|
||||
prefixed_send_action_left = {f"left_{key}": value for key, value in send_action_left.items()}
|
||||
prefixed_send_action_right = {f"right_{key}": value for key, value in send_action_right.items()}
|
||||
|
||||
return {**prefixed_send_action_left, **prefixed_send_action_right}
|
||||
|
||||
def disconnect(self):
|
||||
self.left_arm.disconnect()
|
||||
self.right_arm.disconnect()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
@@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("aloha")
|
||||
@dataclass
|
||||
class AlohaConfig(RobotConfig):
|
||||
left_arm_port: str
|
||||
right_arm_port: str
|
||||
|
||||
# Optional parameters for left arm (ViperX)
|
||||
left_arm_max_relative_target: float | dict[str, float] = 20.0
|
||||
left_arm_use_degrees: bool = True
|
||||
|
||||
# Optional parameters for right arm (ViperX)
|
||||
right_arm_max_relative_target: float | dict[str, float] = 20.0
|
||||
right_arm_use_degrees: bool = True
|
||||
|
||||
# cameras (shared between both arms)
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
@@ -43,3 +43,6 @@ class ViperXConfig(RobotConfig):
|
||||
# Troubleshooting: If one of your IntelRealSense cameras freeze during
|
||||
# data recording due to bandwidth limit, you might need to plug the camera
|
||||
# on another USB hub or PCIe card.
|
||||
|
||||
# Set to `True` for backward compatibility with previous policies/dataset
|
||||
use_degrees: bool = False
|
||||
|
||||
@@ -18,7 +18,6 @@ from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.constants import OBS_STATE
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
@@ -45,22 +44,23 @@ class ViperX(Robot):
|
||||
self,
|
||||
config: ViperXConfig,
|
||||
):
|
||||
raise NotImplementedError
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder": Motor(2, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_shadow": Motor(3, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"elbow": Motor(4, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_shadow": Motor(5, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"forearm_roll": Motor(6, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_angle": Motor(7, "xm540-w270", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_rotate": Motor(8, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"waist": Motor(1, "xm540-w270", norm_mode_body),
|
||||
"shoulder": Motor(2, "xm540-w270", norm_mode_body),
|
||||
"shoulder_shadow": Motor(3, "xm540-w270", norm_mode_body),
|
||||
"elbow": Motor(4, "xm540-w270", norm_mode_body),
|
||||
"elbow_shadow": Motor(5, "xm540-w270", norm_mode_body),
|
||||
"forearm_roll": Motor(6, "xm540-w270", norm_mode_body),
|
||||
"wrist_angle": Motor(7, "xm540-w270", norm_mode_body),
|
||||
"wrist_rotate": Motor(8, "xm430-w350", norm_mode_body),
|
||||
"gripper": Motor(9, "xm430-w350", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
@@ -96,6 +96,9 @@ class ViperX(Robot):
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
@@ -109,16 +112,24 @@ class ViperX(Robot):
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
self.bus.write_calibration(self.calibration)
|
||||
return
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
input("Move robot to the middle of its range of motion and press ENTER....")
|
||||
input(f"Move {self} to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets = self.bus.set_half_turn_homings()
|
||||
|
||||
full_turn_motors = ["shoulder_pan", "wrist_roll"]
|
||||
full_turn_motors = ["shoulder", "forearm_roll", "wrist_rotate"]
|
||||
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
|
||||
print(
|
||||
f"Move all joints except {full_turn_motors} sequentially through their entire "
|
||||
@@ -153,33 +164,23 @@ class ViperX(Robot):
|
||||
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
|
||||
self.bus.write("Secondary_ID", "elbow_shadow", 4)
|
||||
|
||||
# Set a velocity limit of 131 as advised by Trossen Robotics
|
||||
# TODO(aliberts): remove as it's actually useless in position control
|
||||
self.bus.write("Velocity_Limit", 131)
|
||||
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
|
||||
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
|
||||
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point.
|
||||
# See: https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11
|
||||
for motor in self.bus.motors:
|
||||
if motor != "gripper":
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# TODO(pepijn): Re enable this
|
||||
# Use 'position control current based' for follower gripper to be limited by the limit of the
|
||||
# current. It can grasp an object without forcing too much even tho, it's goal position is a
|
||||
# complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
# self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
"""The returned observations do not have a batch dimension."""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position")
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
from .aloha_teleop import AlohaTeleop
|
||||
from .config_aloha_teleop import AlohaTeleopConfig
|
||||
|
||||
__all__ = ["AlohaTeleop", "AlohaTeleopConfig"]
|
||||
@@ -0,0 +1,125 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from functools import cached_property
|
||||
|
||||
from lerobot.teleoperators.widowx.config_widowx import WidowXConfig
|
||||
from lerobot.teleoperators.widowx.widowx import WidowX
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_aloha_teleop import AlohaTeleopConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AlohaTeleop(Teleoperator):
|
||||
"""
|
||||
ALOHA Bimanual Teleoperator System using dual WidowX leader arms.
|
||||
Based on the ALOHA (A Low-cost Open-source Hardware System for Bimanual Teleoperation) design.
|
||||
"""
|
||||
|
||||
config_class = AlohaTeleopConfig
|
||||
name = "aloha_teleop"
|
||||
|
||||
def __init__(self, config: AlohaTeleopConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
|
||||
left_arm_config = WidowXConfig(
|
||||
id=f"{config.id}_left" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.left_arm_port,
|
||||
gripper_motor=config.left_arm_gripper_motor,
|
||||
use_degrees=config.left_arm_use_degrees,
|
||||
)
|
||||
|
||||
right_arm_config = WidowXConfig(
|
||||
id=f"{config.id}_right" if config.id else None,
|
||||
calibration_dir=config.calibration_dir,
|
||||
port=config.right_arm_port,
|
||||
gripper_motor=config.right_arm_gripper_motor,
|
||||
use_degrees=config.right_arm_use_degrees,
|
||||
)
|
||||
|
||||
self.left_arm = WidowX(left_arm_config)
|
||||
self.right_arm = WidowX(right_arm_config)
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return {f"left_{motor}.pos": float for motor in self.left_arm.bus.motors} | {
|
||||
f"right_{motor}.pos": float for motor in self.right_arm.bus.motors
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def feedback_features(self) -> dict[str, type]:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.left_arm.is_connected and self.right_arm.is_connected
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
self.left_arm.connect(calibrate)
|
||||
self.right_arm.connect(calibrate)
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.left_arm.is_calibrated and self.right_arm.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
self.left_arm.calibrate()
|
||||
self.right_arm.calibrate()
|
||||
|
||||
def configure(self) -> None:
|
||||
self.left_arm.configure()
|
||||
self.right_arm.configure()
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
self.left_arm.setup_motors()
|
||||
self.right_arm.setup_motors()
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
action_dict = {}
|
||||
|
||||
# Add "left_" prefix
|
||||
left_action = self.left_arm.get_action()
|
||||
action_dict.update({f"left_{key}": value for key, value in left_action.items()})
|
||||
|
||||
# Add "right_" prefix
|
||||
right_action = self.right_arm.get_action()
|
||||
action_dict.update({f"right_{key}": value for key, value in right_action.items()})
|
||||
|
||||
return action_dict
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
# Remove "left_" prefix
|
||||
left_feedback = {
|
||||
key.removeprefix("left_"): value for key, value in feedback.items() if key.startswith("left_")
|
||||
}
|
||||
# Remove "right_" prefix
|
||||
right_feedback = {
|
||||
key.removeprefix("right_"): value for key, value in feedback.items() if key.startswith("right_")
|
||||
}
|
||||
|
||||
if left_feedback:
|
||||
self.left_arm.send_feedback(left_feedback)
|
||||
if right_feedback:
|
||||
self.right_arm.send_feedback(right_feedback)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
self.left_arm.disconnect()
|
||||
self.right_arm.disconnect()
|
||||
@@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from ..config import TeleoperatorConfig
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("aloha_teleop")
|
||||
@dataclass
|
||||
class AlohaTeleopConfig(TeleoperatorConfig):
|
||||
left_arm_port: str
|
||||
right_arm_port: str
|
||||
|
||||
# Parameters for left arm (WidowX)
|
||||
left_arm_gripper_motor: str = "xl430-w250"
|
||||
left_arm_use_degrees: bool = True
|
||||
|
||||
# Parameters for right arm (WidowX)
|
||||
right_arm_gripper_motor: str = "xc430-w150"
|
||||
right_arm_use_degrees: bool = True
|
||||
@@ -23,3 +23,7 @@ from ..config import TeleoperatorConfig
|
||||
@dataclass
|
||||
class WidowXConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the arm
|
||||
|
||||
gripper_motor: str = "xl430-w250" # This could be xc430-w150 or xl430-w250
|
||||
|
||||
use_degrees: bool = False
|
||||
|
||||
@@ -20,7 +20,6 @@ import time
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.motors.dynamixel import (
|
||||
DriveMode,
|
||||
DynamixelMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
@@ -40,22 +39,27 @@ class WidowX(Teleoperator):
|
||||
name = "widowx"
|
||||
|
||||
def __init__(self, config: WidowXConfig):
|
||||
raise NotImplementedError
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
|
||||
self.bus = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder": Motor(2, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_shadow": Motor(3, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"elbow": Motor(4, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_shadow": Motor(5, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"forearm_roll": Motor(6, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_angle": Motor(7, "xm430-w350", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_rotate": Motor(8, "xl430-w250", MotorNormMode.RANGE_M100_100),
|
||||
"gripper": Motor(9, "xc430-w150", MotorNormMode.RANGE_0_100),
|
||||
"waist": Motor(1, "xm430-w350", norm_mode_body),
|
||||
"shoulder": Motor(2, "xm430-w350", norm_mode_body),
|
||||
"shoulder_shadow": Motor(3, "xm430-w350", norm_mode_body),
|
||||
"elbow": Motor(4, "xm430-w350", norm_mode_body),
|
||||
"elbow_shadow": Motor(5, "xm430-w350", norm_mode_body),
|
||||
"forearm_roll": Motor(6, "xm430-w350", norm_mode_body),
|
||||
"wrist_angle": Motor(7, "xm430-w350", norm_mode_body),
|
||||
"wrist_rotate": Motor(
|
||||
8, "xm430-w350", norm_mode_body
|
||||
), # This could be xl430-w250 or xm430-w350
|
||||
"gripper": Motor(
|
||||
9, self.config.gripper_motor, MotorNormMode.RANGE_0_100
|
||||
), # This could be xc430-w150 or xl430-w250
|
||||
},
|
||||
calibration=self.calibration,
|
||||
)
|
||||
|
||||
@property
|
||||
@@ -76,6 +80,9 @@ class WidowX(Teleoperator):
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
self.configure()
|
||||
@@ -86,19 +93,27 @@ class WidowX(Teleoperator):
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
self.bus.write_calibration(self.calibration)
|
||||
return
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
|
||||
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors}
|
||||
# self.bus.write("Drive_Mode", "el", DriveMode.INVERTED.value)
|
||||
# drive_modes = {motor: 1 if motor == ["elbow_shadow", "shoulder_shadow"] else 0 for motor in self.bus.motors}
|
||||
|
||||
input("Move robot to the middle of its range of motion and press ENTER....")
|
||||
input(f"Move {self} to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets = self.bus.set_half_turn_homings()
|
||||
|
||||
full_turn_motors = ["shoulder_pan", "wrist_roll"]
|
||||
full_turn_motors = ["shoulder", "forearm_roll", "wrist_rotate"]
|
||||
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
|
||||
print(
|
||||
f"Move all joints except {full_turn_motors} sequentially through their "
|
||||
@@ -113,7 +128,7 @@ class WidowX(Teleoperator):
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=drive_modes[motor],
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
@@ -133,6 +148,22 @@ class WidowX(Teleoperator):
|
||||
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
|
||||
self.bus.write("Secondary_ID", "elbow_shadow", 4)
|
||||
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
|
||||
|
||||
# TODO(pepijn): Re enable this
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
||||
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
||||
# its goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
||||
# to make it move, and it will move back to its original target position when we release the force.
|
||||
# self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
|
||||
# Set gripper's goal pos in current position mode so that we can use it as a trigger.
|
||||
# self.bus.enable_torque("gripper")
|
||||
|
||||
if self.is_calibrated:
|
||||
self.bus.write("Goal_Position", "gripper", self.config.gripper_open_pos)
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
Reference in New Issue
Block a user