mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-11 14:49:43 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0ccec237c0 |
@@ -0,0 +1,53 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_left_arm() -> tuple[HomonculusArm, HopeJrArm]:
|
||||
left_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem11301", id="left")
|
||||
left_exo_arm = HomonculusArm(left_exo_arm_cfg)
|
||||
left_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-1140", id="left", side="left")
|
||||
left_arm = HopeJrArm(left_arm_cfg)
|
||||
return left_exo_arm, left_arm
|
||||
|
||||
|
||||
def main():
|
||||
left_exo_arm, left_arm = make_left_arm()
|
||||
display_len = max(len(key) for key in left_exo_arm.action_features)
|
||||
left_exo_arm.connect()
|
||||
left_arm.connect()
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
left_arm_action = left_exo_arm.get_action()
|
||||
left_arm_action["shoulder_pitch.pos"] = -left_arm_action["shoulder_pitch.pos"]
|
||||
left_arm_action["wrist_yaw.pos"] = -left_arm_action["wrist_yaw.pos"]
|
||||
left_arm.send_action(left_arm_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
|
||||
for joint, val in left_arm_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(left_arm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
left_exo_arm.disconnect()
|
||||
left_arm.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,51 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_left_hand() -> tuple[HomonculusGlove, HopeJrHand]:
|
||||
left_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem11301", id="left", side="left")
|
||||
left_glove = HomonculusGlove(left_glove_cfg)
|
||||
left_hand_cfg = HopeJrHandConfig("/dev/cu.usbmodem58760432281", id="left", side="left")
|
||||
left_hand = HopeJrHand(left_hand_cfg)
|
||||
return left_glove, left_hand
|
||||
|
||||
|
||||
def main():
|
||||
left_glove, left_hand = make_left_hand()
|
||||
left_glove.connect()
|
||||
left_hand.connect()
|
||||
display_len = max(len(key) for key in left_hand.action_features)
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
left_glove_action = left_glove.get_action()
|
||||
left_hand_action = homonculus_glove_to_hope_jr_hand(left_glove_action)
|
||||
left_hand.send_action(left_hand_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
|
||||
for joint, val in left_hand_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(left_hand_action) + 4)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
left_glove.disconnect()
|
||||
left_hand.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,53 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_right_arm() -> tuple[HomonculusArm, HopeJrArm]:
|
||||
right_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right")
|
||||
right_exo_arm = HomonculusArm(right_exo_arm_cfg)
|
||||
right_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
|
||||
right_arm = HopeJrArm(right_arm_cfg)
|
||||
return right_exo_arm, right_arm
|
||||
|
||||
|
||||
def main():
|
||||
right_exo_arm, right_arm = make_right_arm()
|
||||
display_len = max(len(key) for key in right_exo_arm.action_features)
|
||||
right_exo_arm.connect()
|
||||
right_arm.connect()
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
right_arm_action = right_exo_arm.get_action()
|
||||
right_arm_action["shoulder_pitch.pos"] = -right_arm_action["shoulder_pitch.pos"]
|
||||
right_arm_action["wrist_yaw.pos"] = -right_arm_action["wrist_yaw.pos"]
|
||||
right_arm.send_action(right_arm_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
|
||||
for joint, val in right_arm_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(right_arm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
right_exo_arm.disconnect()
|
||||
right_arm.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,51 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_right_hand() -> tuple[HomonculusGlove, HopeJrHand]:
|
||||
right_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem11301", id="right", side="right")
|
||||
right_glove = HomonculusGlove(right_glove_cfg)
|
||||
right_hand_cfg = HopeJrHandConfig("/dev/tty.usbserial-140", id="right", side="right")
|
||||
right_hand = HopeJrHand(right_hand_cfg)
|
||||
return right_glove, right_hand
|
||||
|
||||
|
||||
def main():
|
||||
right_glove, right_hand = make_right_hand()
|
||||
right_glove.connect()
|
||||
right_hand.connect()
|
||||
display_len = max(len(key) for key in right_hand.action_features)
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
right_glove_action = right_glove.get_action()
|
||||
right_hand_action = homonculus_glove_to_hope_jr_hand(right_glove_action)
|
||||
right_hand.send_action(right_hand_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
|
||||
for joint, val in right_hand_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(right_hand_action) + 4)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
right_glove.disconnect()
|
||||
right_hand.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,36 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
# cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
|
||||
cfg = HopeJrArmConfig("/dev/tty.usbserial-1140", id="left", side="left")
|
||||
arm = HopeJrArm(cfg)
|
||||
display_len = max(len(key) for key in arm.action_features)
|
||||
|
||||
arm.connect()
|
||||
# arm.calibrate()
|
||||
arm.bus.disable_torque()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_obs = arm.bus.sync_read("Present_Position", normalize=False)
|
||||
norm_obs = {arm.bus.motors[name].id: val for name, val in raw_obs.items()}
|
||||
norm_obs = arm.bus._normalize(norm_obs)
|
||||
norm_obs = {arm.bus._id_to_name(id_): val for id_, val in norm_obs.items()}
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n----------------------------------")
|
||||
print(f"{'NAME':<15} | {'RAW':>7} | {'NORM':>7}")
|
||||
for motor in arm.bus.motors:
|
||||
print(f"{motor:<15} | {raw_obs[motor]:>7} | {norm_obs[motor]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(arm.bus.motors) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as e:
|
||||
traceback.print_exc(e)
|
||||
finally:
|
||||
arm.disconnect()
|
||||
@@ -0,0 +1,33 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
# cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left")
|
||||
cfg = HomonculusArmConfig("/dev/tty.usbmodem11401", id="left")
|
||||
arm = HomonculusArm(cfg)
|
||||
display_len = max(len(key) for key in arm.action_features)
|
||||
|
||||
arm.connect()
|
||||
arm.calibrate()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_action = arm._read(normalize=False)
|
||||
norm_action = arm._normalize(raw_action)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
|
||||
for joint in arm.joints:
|
||||
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(norm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
arm.disconnect()
|
||||
@@ -0,0 +1,32 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
config = HomonculusGloveConfig("/dev/tty.usbmodem11401", side="left", id="left")
|
||||
glove = HomonculusGlove(config)
|
||||
glove.connect()
|
||||
|
||||
display_len = max(len(key) for key in glove.action_features)
|
||||
glove.calibrate()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_action = glove._read(normalize=False)
|
||||
norm_action = glove._normalize(raw_action)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
|
||||
for joint in glove.joints:
|
||||
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(norm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
glove.disconnect()
|
||||
@@ -0,0 +1,8 @@
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig
|
||||
|
||||
cfg = HopeJrHandConfig("/dev/cu.usbmodem58760432281", id="left", side="left")
|
||||
hand = HopeJrHand(cfg)
|
||||
|
||||
hand.connect()
|
||||
hand.calibrate()
|
||||
hand.disconnect()
|
||||
@@ -0,0 +1,4 @@
|
||||
from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig
|
||||
from .hope_jr_arm import HopeJrArm
|
||||
from .hope_jr_hand import HopeJrHand
|
||||
from .joints_translation import homonculus_glove_to_hope_jr_hand
|
||||
@@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.common.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("hope_jr_hand")
|
||||
@dataclass
|
||||
class HopeJrHandConfig(RobotConfig):
|
||||
port: str # Port to connect to the hand
|
||||
side: str # "left" / "right"
|
||||
|
||||
disable_torque_on_disconnect: bool = True
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
if self.side not in ["right", "left"]:
|
||||
raise ValueError(self.side)
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("hope_jr_arm")
|
||||
@dataclass
|
||||
class HopeJrArmConfig(RobotConfig):
|
||||
port: str # Port to connect to the hand
|
||||
side: str # "left" / "right"
|
||||
disable_torque_on_disconnect: bool = True
|
||||
|
||||
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
||||
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
|
||||
# the number of motors in your follower arms.
|
||||
max_relative_target: int | None = None
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
from ..utils import ensure_safe_goal_position
|
||||
from .config_hope_jr import HopeJrArmConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HopeJrArm(Robot):
|
||||
config_class = HopeJrArmConfig
|
||||
name = "hope_jr_arm"
|
||||
|
||||
def __init__(self, config: HopeJrArmConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = FeetechMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"shoulder_pitch": Motor(2, "sm8512bl", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_yaw": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_roll": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_flex": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_roll": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_yaw": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_pitch": Motor(8, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
},
|
||||
# motors={
|
||||
# "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100),
|
||||
# "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# },
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
# HACK
|
||||
self.shoulder_pitch = "shoulder_pitch"
|
||||
self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"]
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
# TODO(aliberts): add cam.is_connected for cam in self.cameras
|
||||
return self.bus.is_connected
|
||||
|
||||
def connect(self) -> None:
|
||||
"""
|
||||
We assume that at connection time, arm is in a rest position,
|
||||
and torque can be safely disabled to run calibration.
|
||||
"""
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect(handshake=False)
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
# Connect the cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||
|
||||
unknown_range_motors = self.other_motors
|
||||
|
||||
input("Move arm to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets_full_turn = self.bus.set_half_turn_homings("shoulder_pitch")
|
||||
homing_offsets_unknown_range = self.bus.set_half_turn_homings(unknown_range_motors)
|
||||
homing_offsets = {**homing_offsets_full_turn, **homing_offsets_unknown_range}
|
||||
|
||||
logger.info(
|
||||
"Move all joints except 'shoulder_pitch' sequentially through their "
|
||||
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
|
||||
range_mins["shoulder_pitch"] = 1024
|
||||
range_maxes["shoulder_pitch"] = 3071
|
||||
|
||||
range_mins["wrist_roll"] = 1500
|
||||
range_maxes["wrist_roll"] = 2500
|
||||
|
||||
self.calibration = {}
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Return_Delay_Time", motor, 0)
|
||||
self.bus.write("Maximum_Acceleration", motor, 50)
|
||||
self.bus.write("Acceleration", motor, 50)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch)
|
||||
obs_dict.update(self.bus.sync_read("Present_Position", self.other_motors))
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
# /!\ Slower fps expected due to reading from the follower.
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.bus.sync_read("Present_Position")
|
||||
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
|
||||
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
|
||||
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import Motor, MotorNormMode
|
||||
from lerobot.common.motors.calibration_gui import RangeFinderGUI
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
from .config_hope_jr import HopeJrHandConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
RIGHT_HAND_INVERSIONS = [
|
||||
"thumb_mcp",
|
||||
"thumb_dip",
|
||||
"index_ulnar_flexor",
|
||||
"middle_ulnar_flexor",
|
||||
"ring_ulnar_flexor",
|
||||
"ring_pip_dip",
|
||||
"pinky_ulnar_flexor",
|
||||
"pinky_pip_dip",
|
||||
]
|
||||
|
||||
LEFT_HAND_INVERSIONS = [
|
||||
"thumb_cmc",
|
||||
"thumb_mcp",
|
||||
"thumb_dip",
|
||||
"index_radial_flexor",
|
||||
"index_pip_dip",
|
||||
"middle_radial_flexor",
|
||||
"middle_pip_dip",
|
||||
"ring_radial_flexor",
|
||||
"ring_pip_dip",
|
||||
"pinky_radial_flexor",
|
||||
# "pinky_pip_dip",
|
||||
]
|
||||
|
||||
|
||||
class HopeJrHand(Robot):
|
||||
config_class = HopeJrHandConfig
|
||||
name = "hope_jr_hand"
|
||||
|
||||
def __init__(self, config: HopeJrHandConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = FeetechMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
# Thumb
|
||||
"thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Index
|
||||
"index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Middle
|
||||
"middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Ring
|
||||
"ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Pinky
|
||||
"pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
protocol_version=1,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
# TODO(aliberts): add cam.is_connected for cam in self.cameras
|
||||
return self.bus.is_connected
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
# Connect the cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
fingers = {}
|
||||
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
|
||||
fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)]
|
||||
|
||||
self.calibration = RangeFinderGUI(self.bus, fingers).run()
|
||||
for motor in self.inverted_motors:
|
||||
self.calibration[motor].drive_mode = 1
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.configure_motors()
|
||||
# for motor in self.bus.motors:
|
||||
# self.bus.write("Return_Delay_Time", motor, 0)
|
||||
# self.bus.write("Acceleration_2", motor, 50)
|
||||
# self.bus.write("Acceleration", motor, 50)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in self.bus.motors:
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read hand position
|
||||
start = time.perf_counter()
|
||||
for motor in self.bus.motors:
|
||||
obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor, normalize=False)
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return action
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
@@ -0,0 +1,53 @@
|
||||
INDEX_SPLAY = 0.3
|
||||
MIDDLE_SPLAY = 0.3
|
||||
RING_SPLAY = 0.3
|
||||
PINKY_SPLAY = 0.5
|
||||
|
||||
|
||||
def get_ulnar_flexion(flexion: float, abduction: float, splay: float):
|
||||
# ulnar_component = max(-abduction, 0)
|
||||
ulnar_component = -abduction
|
||||
return ulnar_component * splay + flexion * (1 - splay)
|
||||
|
||||
|
||||
def get_radial_flexion(flexion: float, abduction: float, splay: float):
|
||||
radial_component = abduction
|
||||
# radial_component = max(abduction, 0)
|
||||
return radial_component * splay + flexion * (1 - splay)
|
||||
|
||||
|
||||
def homonculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]:
|
||||
return {
|
||||
"thumb_cmc.pos": glove_action["thumb_cmc.pos"],
|
||||
"thumb_mcp.pos": glove_action["thumb_mcp.pos"],
|
||||
"thumb_pip.pos": glove_action["thumb_pip.pos"],
|
||||
"thumb_dip.pos": glove_action["thumb_dip.pos"],
|
||||
"index_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
|
||||
),
|
||||
"index_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
|
||||
),
|
||||
"index_pip_dip.pos": glove_action["index_dip.pos"],
|
||||
"middle_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
|
||||
),
|
||||
"middle_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
|
||||
),
|
||||
"middle_pip_dip.pos": glove_action["middle_dip.pos"],
|
||||
"ring_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
|
||||
),
|
||||
"ring_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
|
||||
),
|
||||
"ring_pip_dip.pos": glove_action["ring_dip.pos"],
|
||||
"pinky_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
|
||||
),
|
||||
"pinky_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
|
||||
),
|
||||
"pinky_pip_dip.pos": glove_action["pinky_dip.pos"],
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig
|
||||
from .homonculus_arm import HomonculusArm
|
||||
from .homonculus_glove import HomonculusGlove
|
||||
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from ..config import TeleoperatorConfig
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("homonculus_glove")
|
||||
@dataclass
|
||||
class HomonculusGloveConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the glove
|
||||
side: str # "left" / "right"
|
||||
baud_rate: int = 115_200
|
||||
|
||||
def __post_init__(self):
|
||||
if self.side not in ["right", "left"]:
|
||||
raise ValueError(self.side)
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("homonculus_arm")
|
||||
@dataclass
|
||||
class HomonculusArmConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the arm
|
||||
baud_rate: int = 115_200
|
||||
@@ -0,0 +1,257 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from pprint import pformat
|
||||
|
||||
import serial
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode
|
||||
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_homonculus import HomonculusArmConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HomonculusArm(Teleoperator):
|
||||
"""
|
||||
Homonculus Arm designed by Hugging Face.
|
||||
"""
|
||||
|
||||
config_class = HomonculusArmConfig
|
||||
name = "homonculus_arm"
|
||||
|
||||
def __init__(self, config: HomonculusArmConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
||||
|
||||
self.joints = {
|
||||
"shoulder_pitch": MotorNormMode.RANGE_M100_100,
|
||||
"shoulder_yaw": MotorNormMode.RANGE_M100_100,
|
||||
"shoulder_roll": MotorNormMode.RANGE_M100_100,
|
||||
"elbow_flex": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_roll": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_yaw": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_pitch": MotorNormMode.RANGE_M100_100,
|
||||
}
|
||||
|
||||
self._state: dict[str, float] | None = None
|
||||
self.new_state_event = threading.Event()
|
||||
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
|
||||
self.lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.thread.is_alive() and self.serial.is_open
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
if not self.serial.is_open:
|
||||
self.serial.open()
|
||||
self.thread.start()
|
||||
|
||||
# wait for the thread to ramp up & 1st state to be ready
|
||||
if not self.new_state_event.wait(timeout=2):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
|
||||
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.calibration_fpath.is_file()
|
||||
|
||||
def calibrate(self) -> None:
|
||||
print(
|
||||
"\nMove all joints through their entire range of motion."
|
||||
"\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
range_mins, range_maxes = self._record_ranges_of_motion()
|
||||
|
||||
self.calibration = {}
|
||||
for id_, joint in enumerate(self.joints):
|
||||
self.calibration[joint] = MotorCalibration(
|
||||
id=id_,
|
||||
drive_mode=0,
|
||||
homing_offset=0,
|
||||
range_min=range_mins[joint],
|
||||
range_max=range_maxes[joint],
|
||||
)
|
||||
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def _record_ranges_of_motion(
|
||||
self, joints: list[str] | None = None, display_values: bool = True
|
||||
) -> tuple[dict[str, int], dict[str, int]]:
|
||||
"""Interactively record the min/max encoder values of each joint.
|
||||
|
||||
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||
|
||||
Args:
|
||||
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||
|
||||
Raises:
|
||||
TypeError: `joints` is not `None` or a list.
|
||||
ValueError: any joint's recorded min and max are the same.
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||
observed for each joint.
|
||||
"""
|
||||
if joints is None:
|
||||
joints = list(self.joints)
|
||||
elif not isinstance(joints, list):
|
||||
raise TypeError(joints)
|
||||
|
||||
start_positions = self._read(joints, normalize=False)
|
||||
mins = start_positions.copy()
|
||||
maxes = start_positions.copy()
|
||||
while True:
|
||||
positions = self._read(joints, normalize=False)
|
||||
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||
|
||||
if display_values:
|
||||
print("\n-------------------------------------------")
|
||||
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||
for joint in joints:
|
||||
print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}")
|
||||
|
||||
if enter_pressed():
|
||||
break
|
||||
|
||||
if display_values:
|
||||
# Move cursor up to overwrite the previous output
|
||||
move_cursor_up(len(joints) + 3)
|
||||
|
||||
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||
if same_min_max:
|
||||
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||
|
||||
return mins, maxes
|
||||
|
||||
def configure(self) -> None:
|
||||
pass
|
||||
|
||||
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
normalized_values = {}
|
||||
for joint, val in values.items():
|
||||
min_ = self.calibration[joint].range_min
|
||||
max_ = self.calibration[joint].range_max
|
||||
drive_mode = self.calibration[joint].drive_mode
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
|
||||
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[joint] = -norm if drive_mode else norm
|
||||
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[joint] = 100 - norm if drive_mode else norm
|
||||
|
||||
return normalized_values
|
||||
|
||||
def _read(
|
||||
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
|
||||
) -> dict[str, int | float]:
|
||||
"""
|
||||
Return the most recent (single) values from self.last_d,
|
||||
optionally applying calibration.
|
||||
"""
|
||||
if not self.new_state_event.wait(timeout=timeout):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
|
||||
|
||||
with self.lock:
|
||||
state = self._state
|
||||
|
||||
self.new_state_event.clear()
|
||||
|
||||
if state is None:
|
||||
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
|
||||
|
||||
if joints is not None:
|
||||
state = {k: v for k, v in state.items() if k in joints}
|
||||
|
||||
if normalize:
|
||||
state = self._normalize(state, joints)
|
||||
|
||||
return state
|
||||
|
||||
def _read_loop(self):
|
||||
"""
|
||||
Continuously read from the serial buffer in its own thread and sends values to the main thread through
|
||||
a queue.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
raw_values = self.serial.readline().decode("utf-8").strip().split(" ")
|
||||
if len(raw_values) != 21: # 16 raw + 5 angle values
|
||||
continue
|
||||
|
||||
joint_angles = {
|
||||
"shoulder_pitch": int(raw_values[19]),
|
||||
"shoulder_yaw": int(raw_values[18]),
|
||||
"shoulder_roll": int(raw_values[20]),
|
||||
"elbow_flex": int(raw_values[17]),
|
||||
"wrist_roll": int(raw_values[16]),
|
||||
"wrist_yaw": int(raw_values[1]),
|
||||
"wrist_pitch": int(raw_values[0]),
|
||||
}
|
||||
|
||||
with self.lock:
|
||||
self._state = joint_angles
|
||||
self.new_state_event.set()
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if not self.is_connected:
|
||||
DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.thread.join(timeout=0.5)
|
||||
self.serial.close()
|
||||
logger.info(f"{self} disconnected.")
|
||||
@@ -0,0 +1,293 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from pprint import pformat
|
||||
|
||||
import serial
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import MotorCalibration
|
||||
from lerobot.common.motors.motors_bus import MotorNormMode
|
||||
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_homonculus import HomonculusGloveConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LEFT_HAND_INVERSIONS = [
|
||||
"thumb_cmc",
|
||||
"index_dip",
|
||||
"middle_mcp_abduction",
|
||||
"middle_dip",
|
||||
"pinky_mcp_abduction",
|
||||
"pinky_dip",
|
||||
]
|
||||
|
||||
RIGHT_HAND_INVERSIONS = [
|
||||
"thumb_mcp",
|
||||
"thumb_cmc",
|
||||
"index_mcp_abduction",
|
||||
"index_dip",
|
||||
"middle_mcp_abduction",
|
||||
"middle_dip",
|
||||
"ring_mcp_abduction",
|
||||
"ring_mcp_flexion",
|
||||
"ring_dip",
|
||||
"pinky_mcp_abduction",
|
||||
]
|
||||
|
||||
|
||||
class HomonculusGlove(Teleoperator):
|
||||
"""
|
||||
Homonculus Glove designed by Hugging Face.
|
||||
"""
|
||||
|
||||
config_class = HomonculusGloveConfig
|
||||
name = "homonculus_glove"
|
||||
|
||||
def __init__(self, config: HomonculusGloveConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
||||
|
||||
self.joints = {
|
||||
"thumb_cmc": MotorNormMode.RANGE_0_100,
|
||||
"thumb_mcp": MotorNormMode.RANGE_0_100,
|
||||
"thumb_pip": MotorNormMode.RANGE_0_100,
|
||||
"thumb_dip": MotorNormMode.RANGE_0_100,
|
||||
"index_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"index_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"index_dip": MotorNormMode.RANGE_0_100,
|
||||
"middle_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"middle_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"middle_dip": MotorNormMode.RANGE_0_100,
|
||||
"ring_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"ring_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"ring_dip": MotorNormMode.RANGE_0_100,
|
||||
"pinky_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"pinky_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"pinky_dip": MotorNormMode.RANGE_0_100,
|
||||
}
|
||||
self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
|
||||
|
||||
self._state: dict[str, float] | None = None
|
||||
self.new_state_event = threading.Event()
|
||||
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
|
||||
self.lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.thread.is_alive() and self.serial.is_open
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
if not self.serial.is_open:
|
||||
self.serial.open()
|
||||
self.thread.start()
|
||||
|
||||
# wait for the thread to ramp up & 1st state to be ready
|
||||
if not self.new_state_event.wait(timeout=2):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
|
||||
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.calibration_fpath.is_file()
|
||||
|
||||
def calibrate(self) -> None:
|
||||
range_mins, range_maxes = {}, {}
|
||||
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
|
||||
print(
|
||||
f"\nMove {finger} through its entire range of motion."
|
||||
"\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
finger_joints = [joint for joint in self.joints if joint.startswith(finger)]
|
||||
finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints)
|
||||
range_mins.update(finger_mins)
|
||||
range_maxes.update(finger_maxes)
|
||||
|
||||
self.calibration = {}
|
||||
for id_, joint in enumerate(self.joints):
|
||||
self.calibration[joint] = MotorCalibration(
|
||||
id=id_,
|
||||
drive_mode=1 if joint in self.inverted_joints else 0,
|
||||
homing_offset=0,
|
||||
range_min=range_mins[joint],
|
||||
range_max=range_maxes[joint],
|
||||
)
|
||||
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def _record_ranges_of_motion(
|
||||
self, joints: list[str] | None = None, display_values: bool = True
|
||||
) -> tuple[dict[str, int], dict[str, int]]:
|
||||
"""Interactively record the min/max encoder values of each joint.
|
||||
|
||||
Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||
|
||||
Args:
|
||||
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||
|
||||
Raises:
|
||||
TypeError: `joints` is not `None` or a list.
|
||||
ValueError: any joint's recorded min and max are the same.
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||
observed for each joint.
|
||||
"""
|
||||
if joints is None:
|
||||
joints = list(self.joints)
|
||||
elif not isinstance(joints, list):
|
||||
raise TypeError(joints)
|
||||
|
||||
display_len = max(len(key) for key in joints)
|
||||
|
||||
start_positions = self._read(joints, normalize=False)
|
||||
mins = start_positions.copy()
|
||||
maxes = start_positions.copy()
|
||||
while True:
|
||||
positions = self._read(joints, normalize=False)
|
||||
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||
|
||||
if display_values:
|
||||
print("\n-------------------------------------------")
|
||||
print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||
for joint in joints:
|
||||
print(
|
||||
f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}"
|
||||
)
|
||||
|
||||
if enter_pressed():
|
||||
break
|
||||
|
||||
if display_values:
|
||||
# Move cursor up to overwrite the previous output
|
||||
move_cursor_up(len(joints) + 3)
|
||||
|
||||
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||
if same_min_max:
|
||||
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||
|
||||
return mins, maxes
|
||||
|
||||
def configure(self) -> None:
|
||||
pass
|
||||
|
||||
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
normalized_values = {}
|
||||
for joint, val in values.items():
|
||||
min_ = self.calibration[joint].range_min
|
||||
max_ = self.calibration[joint].range_max
|
||||
drive_mode = self.calibration[joint].drive_mode
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
|
||||
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[joint] = -norm if drive_mode else norm
|
||||
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[joint] = 100 - norm if drive_mode else norm
|
||||
|
||||
return normalized_values
|
||||
|
||||
def _read(
|
||||
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
|
||||
) -> dict[str, int | float]:
|
||||
"""
|
||||
Return the most recent (single) values from self.last_d,
|
||||
optionally applying calibration.
|
||||
"""
|
||||
if not self.new_state_event.wait(timeout=timeout):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
|
||||
|
||||
with self.lock:
|
||||
state = self._state
|
||||
|
||||
self.new_state_event.clear()
|
||||
|
||||
if state is None:
|
||||
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
|
||||
|
||||
if joints is not None:
|
||||
state = {k: v for k, v in state.items() if k in joints}
|
||||
|
||||
if normalize:
|
||||
state = self._normalize(state, joints)
|
||||
|
||||
return state
|
||||
|
||||
def _read_loop(self):
|
||||
"""
|
||||
Continuously read from the serial buffer in its own thread and sends values to the main thread through
|
||||
a queue.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
positions = self.serial.readline().decode("utf-8").strip().split(" ")
|
||||
if len(positions) != len(self.joints):
|
||||
continue
|
||||
|
||||
joint_positions = {
|
||||
joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)
|
||||
}
|
||||
|
||||
with self.lock:
|
||||
self._state = joint_positions
|
||||
self.new_state_event.set()
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if not self.is_connected:
|
||||
DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.thread.join(timeout=0.5)
|
||||
self.serial.close()
|
||||
logger.info(f"{self} disconnected.")
|
||||
Reference in New Issue
Block a user