From 0ccec237c0bd1f663d5d5ab5bbe9f7dd64147f63 Mon Sep 17 00:00:00 2001 From: nepyope Date: Thu, 26 Jun 2025 14:30:10 +0200 Subject: [PATCH] Add Hope Jr robot and homonculus teleoperator support --- examples/hope_jr/teleop/left_arm_teleop.py | 53 ++++ examples/hope_jr/teleop/left_hand_teleop.py | 51 +++ examples/hope_jr/teleop/right_arm_teleop.py | 53 ++++ examples/hope_jr/teleop/right_hand_teleop.py | 51 +++ examples/hope_jr/vis/arm.py | 36 +++ examples/hope_jr/vis/exo.py | 33 ++ examples/hope_jr/vis/glove.py | 32 ++ examples/hope_jr/vis/hand.py | 8 + lerobot/common/robots/hope_jr/__init__.py | 4 + .../common/robots/hope_jr/config_hope_jr.py | 52 ++++ lerobot/common/robots/hope_jr/hope_jr_arm.py | 216 +++++++++++++ lerobot/common/robots/hope_jr/hope_jr_hand.py | 204 ++++++++++++ .../robots/hope_jr/joints_translation.py | 53 ++++ .../teleoperators/homonculus/__init__.py | 3 + .../homonculus/config_homonculus.py | 38 +++ .../homonculus/homonculus_arm.py | 257 +++++++++++++++ .../homonculus/homonculus_glove.py | 293 ++++++++++++++++++ 17 files changed, 1437 insertions(+) create mode 100644 examples/hope_jr/teleop/left_arm_teleop.py create mode 100644 examples/hope_jr/teleop/left_hand_teleop.py create mode 100644 examples/hope_jr/teleop/right_arm_teleop.py create mode 100644 examples/hope_jr/teleop/right_hand_teleop.py create mode 100644 examples/hope_jr/vis/arm.py create mode 100644 examples/hope_jr/vis/exo.py create mode 100644 examples/hope_jr/vis/glove.py create mode 100644 examples/hope_jr/vis/hand.py create mode 100644 lerobot/common/robots/hope_jr/__init__.py create mode 100644 lerobot/common/robots/hope_jr/config_hope_jr.py create mode 100644 lerobot/common/robots/hope_jr/hope_jr_arm.py create mode 100644 lerobot/common/robots/hope_jr/hope_jr_hand.py create mode 100644 lerobot/common/robots/hope_jr/joints_translation.py create mode 100644 lerobot/common/teleoperators/homonculus/__init__.py create mode 100644 lerobot/common/teleoperators/homonculus/config_homonculus.py create mode 100644 lerobot/common/teleoperators/homonculus/homonculus_arm.py create mode 100644 lerobot/common/teleoperators/homonculus/homonculus_glove.py diff --git a/examples/hope_jr/teleop/left_arm_teleop.py b/examples/hope_jr/teleop/left_arm_teleop.py new file mode 100644 index 000000000..369e99a80 --- /dev/null +++ b/examples/hope_jr/teleop/left_arm_teleop.py @@ -0,0 +1,53 @@ +import time +import traceback + +from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig +from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig +from lerobot.common.utils.utils import move_cursor_up + + +def make_left_arm() -> tuple[HomonculusArm, HopeJrArm]: + left_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem11301", id="left") + left_exo_arm = HomonculusArm(left_exo_arm_cfg) + left_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-1140", id="left", side="left") + left_arm = HopeJrArm(left_arm_cfg) + return left_exo_arm, left_arm + + +def main(): + left_exo_arm, left_arm = make_left_arm() + display_len = max(len(key) for key in left_exo_arm.action_features) + left_exo_arm.connect() + left_arm.connect() + + try: + while True: + start = time.perf_counter() + + left_arm_action = left_exo_arm.get_action() + left_arm_action["shoulder_pitch.pos"] = -left_arm_action["shoulder_pitch.pos"] + left_arm_action["wrist_yaw.pos"] = -left_arm_action["wrist_yaw.pos"] + left_arm.send_action(left_arm_action) + + time.sleep(0.01) + loop_s = time.perf_counter() - start + + print("\n" + "-" * (display_len + 10)) + print(f"{'NAME':<{display_len}} | {'ARM':>7}") + for joint, val in left_arm_action.items(): + print(f"{joint:<{display_len}} | {val:>7.2f}") + + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(left_arm_action) + 5) + + except KeyboardInterrupt: + pass + except Exception: + traceback.print_exc() + finally: + left_exo_arm.disconnect() + left_arm.disconnect() + + +if __name__ == "__main__": + main() diff --git a/examples/hope_jr/teleop/left_hand_teleop.py b/examples/hope_jr/teleop/left_hand_teleop.py new file mode 100644 index 000000000..6c4d11b2c --- /dev/null +++ b/examples/hope_jr/teleop/left_hand_teleop.py @@ -0,0 +1,51 @@ +import time +import traceback + +from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand +from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig +from lerobot.common.utils.utils import move_cursor_up + + +def make_left_hand() -> tuple[HomonculusGlove, HopeJrHand]: + left_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem11301", id="left", side="left") + left_glove = HomonculusGlove(left_glove_cfg) + left_hand_cfg = HopeJrHandConfig("/dev/cu.usbmodem58760432281", id="left", side="left") + left_hand = HopeJrHand(left_hand_cfg) + return left_glove, left_hand + + +def main(): + left_glove, left_hand = make_left_hand() + left_glove.connect() + left_hand.connect() + display_len = max(len(key) for key in left_hand.action_features) + + try: + while True: + start = time.perf_counter() + + left_glove_action = left_glove.get_action() + left_hand_action = homonculus_glove_to_hope_jr_hand(left_glove_action) + left_hand.send_action(left_hand_action) + + time.sleep(0.01) + loop_s = time.perf_counter() - start + + print(f"\n{'NAME':<{display_len}} | {'HAND':>7}") + for joint, val in left_hand_action.items(): + print(f"{joint:<{display_len}} | {val:>7.2f}") + + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(left_hand_action) + 4) + + except KeyboardInterrupt: + pass + except Exception: + traceback.print_exc() + finally: + left_glove.disconnect() + left_hand.disconnect() + + +if __name__ == "__main__": + main() diff --git a/examples/hope_jr/teleop/right_arm_teleop.py b/examples/hope_jr/teleop/right_arm_teleop.py new file mode 100644 index 000000000..143e241ee --- /dev/null +++ b/examples/hope_jr/teleop/right_arm_teleop.py @@ -0,0 +1,53 @@ +import time +import traceback + +from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig +from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig +from lerobot.common.utils.utils import move_cursor_up + + +def make_right_arm() -> tuple[HomonculusArm, HopeJrArm]: + right_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right") + right_exo_arm = HomonculusArm(right_exo_arm_cfg) + right_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right") + right_arm = HopeJrArm(right_arm_cfg) + return right_exo_arm, right_arm + + +def main(): + right_exo_arm, right_arm = make_right_arm() + display_len = max(len(key) for key in right_exo_arm.action_features) + right_exo_arm.connect() + right_arm.connect() + + try: + while True: + start = time.perf_counter() + + right_arm_action = right_exo_arm.get_action() + right_arm_action["shoulder_pitch.pos"] = -right_arm_action["shoulder_pitch.pos"] + right_arm_action["wrist_yaw.pos"] = -right_arm_action["wrist_yaw.pos"] + right_arm.send_action(right_arm_action) + + time.sleep(0.01) + loop_s = time.perf_counter() - start + + print("\n" + "-" * (display_len + 10)) + print(f"{'NAME':<{display_len}} | {'ARM':>7}") + for joint, val in right_arm_action.items(): + print(f"{joint:<{display_len}} | {val:>7.2f}") + + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(right_arm_action) + 5) + + except KeyboardInterrupt: + pass + except Exception: + traceback.print_exc() + finally: + right_exo_arm.disconnect() + right_arm.disconnect() + + +if __name__ == "__main__": + main() diff --git a/examples/hope_jr/teleop/right_hand_teleop.py b/examples/hope_jr/teleop/right_hand_teleop.py new file mode 100644 index 000000000..204f26f01 --- /dev/null +++ b/examples/hope_jr/teleop/right_hand_teleop.py @@ -0,0 +1,51 @@ +import time +import traceback + +from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand +from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig +from lerobot.common.utils.utils import move_cursor_up + + +def make_right_hand() -> tuple[HomonculusGlove, HopeJrHand]: + right_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem11301", id="right", side="right") + right_glove = HomonculusGlove(right_glove_cfg) + right_hand_cfg = HopeJrHandConfig("/dev/tty.usbserial-140", id="right", side="right") + right_hand = HopeJrHand(right_hand_cfg) + return right_glove, right_hand + + +def main(): + right_glove, right_hand = make_right_hand() + right_glove.connect() + right_hand.connect() + display_len = max(len(key) for key in right_hand.action_features) + + try: + while True: + start = time.perf_counter() + + right_glove_action = right_glove.get_action() + right_hand_action = homonculus_glove_to_hope_jr_hand(right_glove_action) + right_hand.send_action(right_hand_action) + + time.sleep(0.01) + loop_s = time.perf_counter() - start + + print(f"\n{'NAME':<{display_len}} | {'HAND':>7}") + for joint, val in right_hand_action.items(): + print(f"{joint:<{display_len}} | {val:>7.2f}") + + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(right_hand_action) + 4) + + except KeyboardInterrupt: + pass + except Exception: + traceback.print_exc() + finally: + right_glove.disconnect() + right_hand.disconnect() + + +if __name__ == "__main__": + main() diff --git a/examples/hope_jr/vis/arm.py b/examples/hope_jr/vis/arm.py new file mode 100644 index 000000000..f517e3203 --- /dev/null +++ b/examples/hope_jr/vis/arm.py @@ -0,0 +1,36 @@ +import time +import traceback + +from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig +from lerobot.common.utils.utils import move_cursor_up + +# cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right") +cfg = HopeJrArmConfig("/dev/tty.usbserial-1140", id="left", side="left") +arm = HopeJrArm(cfg) +display_len = max(len(key) for key in arm.action_features) + +arm.connect() +# arm.calibrate() +arm.bus.disable_torque() +try: + while True: + start = time.perf_counter() + raw_obs = arm.bus.sync_read("Present_Position", normalize=False) + norm_obs = {arm.bus.motors[name].id: val for name, val in raw_obs.items()} + norm_obs = arm.bus._normalize(norm_obs) + norm_obs = {arm.bus._id_to_name(id_): val for id_, val in norm_obs.items()} + loop_s = time.perf_counter() - start + + print("\n----------------------------------") + print(f"{'NAME':<15} | {'RAW':>7} | {'NORM':>7}") + for motor in arm.bus.motors: + print(f"{motor:<15} | {raw_obs[motor]:>7} | {norm_obs[motor]:>7.2f}") + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(arm.bus.motors) + 5) + +except KeyboardInterrupt: + pass +except Exception as e: + traceback.print_exc(e) +finally: + arm.disconnect() diff --git a/examples/hope_jr/vis/exo.py b/examples/hope_jr/vis/exo.py new file mode 100644 index 000000000..d362266dd --- /dev/null +++ b/examples/hope_jr/vis/exo.py @@ -0,0 +1,33 @@ +import time +import traceback + +from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig +from lerobot.common.utils.utils import move_cursor_up + +# cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left") +cfg = HomonculusArmConfig("/dev/tty.usbmodem11401", id="left") +arm = HomonculusArm(cfg) +display_len = max(len(key) for key in arm.action_features) + +arm.connect() +arm.calibrate() +try: + while True: + start = time.perf_counter() + raw_action = arm._read(normalize=False) + norm_action = arm._normalize(raw_action) + loop_s = time.perf_counter() - start + + print("\n" + "-" * (display_len + 10)) + print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}") + for joint in arm.joints: + print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}") + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(norm_action) + 5) + +except KeyboardInterrupt: + pass +except Exception: + traceback.print_exc() +finally: + arm.disconnect() diff --git a/examples/hope_jr/vis/glove.py b/examples/hope_jr/vis/glove.py new file mode 100644 index 000000000..be497aec6 --- /dev/null +++ b/examples/hope_jr/vis/glove.py @@ -0,0 +1,32 @@ +import time +import traceback + +from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig +from lerobot.common.utils.utils import move_cursor_up + +config = HomonculusGloveConfig("/dev/tty.usbmodem11401", side="left", id="left") +glove = HomonculusGlove(config) +glove.connect() + +display_len = max(len(key) for key in glove.action_features) +glove.calibrate() +try: + while True: + start = time.perf_counter() + raw_action = glove._read(normalize=False) + norm_action = glove._normalize(raw_action) + loop_s = time.perf_counter() - start + + print("\n" + "-" * (display_len + 10)) + print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}") + for joint in glove.joints: + print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}") + print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") + move_cursor_up(len(norm_action) + 5) + +except KeyboardInterrupt: + pass +except Exception: + traceback.print_exc() +finally: + glove.disconnect() diff --git a/examples/hope_jr/vis/hand.py b/examples/hope_jr/vis/hand.py new file mode 100644 index 000000000..5d941fb66 --- /dev/null +++ b/examples/hope_jr/vis/hand.py @@ -0,0 +1,8 @@ +from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig + +cfg = HopeJrHandConfig("/dev/cu.usbmodem58760432281", id="left", side="left") +hand = HopeJrHand(cfg) + +hand.connect() +hand.calibrate() +hand.disconnect() diff --git a/lerobot/common/robots/hope_jr/__init__.py b/lerobot/common/robots/hope_jr/__init__.py new file mode 100644 index 000000000..101cd6331 --- /dev/null +++ b/lerobot/common/robots/hope_jr/__init__.py @@ -0,0 +1,4 @@ +from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig +from .hope_jr_arm import HopeJrArm +from .hope_jr_hand import HopeJrHand +from .joints_translation import homonculus_glove_to_hope_jr_hand diff --git a/lerobot/common/robots/hope_jr/config_hope_jr.py b/lerobot/common/robots/hope_jr/config_hope_jr.py new file mode 100644 index 000000000..413a77598 --- /dev/null +++ b/lerobot/common/robots/hope_jr/config_hope_jr.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.common.cameras import CameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("hope_jr_hand") +@dataclass +class HopeJrHandConfig(RobotConfig): + port: str # Port to connect to the hand + side: str # "left" / "right" + + disable_torque_on_disconnect: bool = True + + cameras: dict[str, CameraConfig] = field(default_factory=dict) + + def __post_init__(self): + super().__post_init__() + if self.side not in ["right", "left"]: + raise ValueError(self.side) + + +@RobotConfig.register_subclass("hope_jr_arm") +@dataclass +class HopeJrArmConfig(RobotConfig): + port: str # Port to connect to the hand + side: str # "left" / "right" + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/lerobot/common/robots/hope_jr/hope_jr_arm.py b/lerobot/common/robots/hope_jr/hope_jr_arm.py new file mode 100644 index 000000000..66e7f2821 --- /dev/null +++ b/lerobot/common/robots/hope_jr/hope_jr_arm.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_hope_jr import HopeJrArmConfig + +logger = logging.getLogger(__name__) + + +class HopeJrArm(Robot): + config_class = HopeJrArmConfig + name = "hope_jr_arm" + + def __init__(self, config: HopeJrArmConfig): + super().__init__(config) + self.config = config + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pitch": Motor(2, "sm8512bl", MotorNormMode.RANGE_M100_100), + "shoulder_yaw": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100), + "shoulder_roll": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_yaw": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_pitch": Motor(8, "sts3250", MotorNormMode.RANGE_M100_100), + }, + # motors={ + # "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100), + # "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100), + # "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100), + # "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100), + # "wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100), + # "wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100), + # "wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100), + # }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + # HACK + self.shoulder_pitch = "shoulder_pitch" + self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"] + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.bus.is_connected + + def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect(handshake=False) + if not self.is_calibrated: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + unknown_range_motors = self.other_motors + + input("Move arm to the middle of its range of motion and press ENTER....") + homing_offsets_full_turn = self.bus.set_half_turn_homings("shoulder_pitch") + homing_offsets_unknown_range = self.bus.set_half_turn_homings(unknown_range_motors) + homing_offsets = {**homing_offsets_full_turn, **homing_offsets_unknown_range} + + logger.info( + "Move all joints except 'shoulder_pitch' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) + range_mins["shoulder_pitch"] = 1024 + range_maxes["shoulder_pitch"] = 3071 + + range_mins["wrist_roll"] = 1500 + range_maxes["wrist_roll"] = 2500 + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + for motor in self.bus.motors: + self.bus.write("Return_Delay_Time", motor, 0) + self.bus.write("Maximum_Acceleration", motor, 50) + self.bus.write("Acceleration", motor, 50) + + def setup_motors(self) -> None: + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read arm position + start = time.perf_counter() + obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch) + obs_dict.update(self.bus.sync_read("Present_Position", self.other_motors)) + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + self.bus.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/hope_jr/hope_jr_hand.py b/lerobot/common/robots/hope_jr/hope_jr_hand.py new file mode 100644 index 000000000..534c3caf9 --- /dev/null +++ b/lerobot/common/robots/hope_jr/hope_jr_hand.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorNormMode +from lerobot.common.motors.calibration_gui import RangeFinderGUI +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, +) + +from ..robot import Robot +from .config_hope_jr import HopeJrHandConfig + +logger = logging.getLogger(__name__) + +RIGHT_HAND_INVERSIONS = [ + "thumb_mcp", + "thumb_dip", + "index_ulnar_flexor", + "middle_ulnar_flexor", + "ring_ulnar_flexor", + "ring_pip_dip", + "pinky_ulnar_flexor", + "pinky_pip_dip", +] + +LEFT_HAND_INVERSIONS = [ + "thumb_cmc", + "thumb_mcp", + "thumb_dip", + "index_radial_flexor", + "index_pip_dip", + "middle_radial_flexor", + "middle_pip_dip", + "ring_radial_flexor", + "ring_pip_dip", + "pinky_radial_flexor", + # "pinky_pip_dip", +] + + +class HopeJrHand(Robot): + config_class = HopeJrHandConfig + name = "hope_jr_hand" + + def __init__(self, config: HopeJrHandConfig): + super().__init__(config) + self.config = config + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + # Thumb + "thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100), + # Index + "index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100), + "index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100), + "index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100), + # Middle + "middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100), + "middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100), + "middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100), + # Ring + "ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100), + "ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100), + "ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100), + # Pinky + "pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100), + "pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100), + "pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + protocol_version=1, + ) + self.cameras = make_cameras_from_configs(config.cameras) + self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.bus.is_connected + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + fingers = {} + for finger in ["thumb", "index", "middle", "ring", "pinky"]: + fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)] + + self.calibration = RangeFinderGUI(self.bus, fingers).run() + for motor in self.inverted_motors: + self.calibration[motor].drive_mode = 1 + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + # for motor in self.bus.motors: + # self.bus.write("Return_Delay_Time", motor, 0) + # self.bus.write("Acceleration_2", motor, 50) + # self.bus.write("Acceleration", motor, 50) + + def setup_motors(self) -> None: + for motor in self.bus.motors: + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read hand position + start = time.perf_counter() + for motor in self.bus.motors: + obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor, normalize=False) + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + self.bus.sync_write("Goal_Position", goal_pos) + return action + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/hope_jr/joints_translation.py b/lerobot/common/robots/hope_jr/joints_translation.py new file mode 100644 index 000000000..68c00bffc --- /dev/null +++ b/lerobot/common/robots/hope_jr/joints_translation.py @@ -0,0 +1,53 @@ +INDEX_SPLAY = 0.3 +MIDDLE_SPLAY = 0.3 +RING_SPLAY = 0.3 +PINKY_SPLAY = 0.5 + + +def get_ulnar_flexion(flexion: float, abduction: float, splay: float): + # ulnar_component = max(-abduction, 0) + ulnar_component = -abduction + return ulnar_component * splay + flexion * (1 - splay) + + +def get_radial_flexion(flexion: float, abduction: float, splay: float): + radial_component = abduction + # radial_component = max(abduction, 0) + return radial_component * splay + flexion * (1 - splay) + + +def homonculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]: + return { + "thumb_cmc.pos": glove_action["thumb_cmc.pos"], + "thumb_mcp.pos": glove_action["thumb_mcp.pos"], + "thumb_pip.pos": glove_action["thumb_pip.pos"], + "thumb_dip.pos": glove_action["thumb_dip.pos"], + "index_radial_flexor.pos": get_radial_flexion( + glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY + ), + "index_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY + ), + "index_pip_dip.pos": glove_action["index_dip.pos"], + "middle_radial_flexor.pos": get_radial_flexion( + glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY + ), + "middle_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY + ), + "middle_pip_dip.pos": glove_action["middle_dip.pos"], + "ring_radial_flexor.pos": get_radial_flexion( + glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY + ), + "ring_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY + ), + "ring_pip_dip.pos": glove_action["ring_dip.pos"], + "pinky_radial_flexor.pos": get_radial_flexion( + glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY + ), + "pinky_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY + ), + "pinky_pip_dip.pos": glove_action["pinky_dip.pos"], + } diff --git a/lerobot/common/teleoperators/homonculus/__init__.py b/lerobot/common/teleoperators/homonculus/__init__.py new file mode 100644 index 000000000..2749104db --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/__init__.py @@ -0,0 +1,3 @@ +from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig +from .homonculus_arm import HomonculusArm +from .homonculus_glove import HomonculusGlove diff --git a/lerobot/common/teleoperators/homonculus/config_homonculus.py b/lerobot/common/teleoperators/homonculus/config_homonculus.py new file mode 100644 index 000000000..88303d968 --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/config_homonculus.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("homonculus_glove") +@dataclass +class HomonculusGloveConfig(TeleoperatorConfig): + port: str # Port to connect to the glove + side: str # "left" / "right" + baud_rate: int = 115_200 + + def __post_init__(self): + if self.side not in ["right", "left"]: + raise ValueError(self.side) + + +@TeleoperatorConfig.register_subclass("homonculus_arm") +@dataclass +class HomonculusArmConfig(TeleoperatorConfig): + port: str # Port to connect to the arm + baud_rate: int = 115_200 diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py new file mode 100644 index 000000000..0a75787a6 --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +from pprint import pformat + +import serial + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode +from lerobot.common.utils.utils import enter_pressed, move_cursor_up + +from ..teleoperator import Teleoperator +from .config_homonculus import HomonculusArmConfig + +logger = logging.getLogger(__name__) + + +class HomonculusArm(Teleoperator): + """ + Homonculus Arm designed by Hugging Face. + """ + + config_class = HomonculusArmConfig + name = "homonculus_arm" + + def __init__(self, config: HomonculusArmConfig): + super().__init__(config) + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + + self.joints = { + "shoulder_pitch": MotorNormMode.RANGE_M100_100, + "shoulder_yaw": MotorNormMode.RANGE_M100_100, + "shoulder_roll": MotorNormMode.RANGE_M100_100, + "elbow_flex": MotorNormMode.RANGE_M100_100, + "wrist_roll": MotorNormMode.RANGE_M100_100, + "wrist_yaw": MotorNormMode.RANGE_M100_100, + "wrist_pitch": MotorNormMode.RANGE_M100_100, + } + + self._state: dict[str, float] | None = None + self.new_state_event = threading.Event() + self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop") + self.lock = threading.Lock() + + @property + def action_features(self) -> dict: + return {f"{joint}.pos": float for joint in self.joints} + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + return self.thread.is_alive() and self.serial.is_open + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + if not self.serial.is_open: + self.serial.open() + self.thread.start() + + # wait for the thread to ramp up & 1st state to be ready + if not self.new_state_event.wait(timeout=2): + raise TimeoutError(f"{self}: Timed out waiting for state after 2s.") + + if not self.is_calibrated: + self.calibrate() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.calibration_fpath.is_file() + + def calibrate(self) -> None: + print( + "\nMove all joints through their entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self._record_ranges_of_motion() + + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=0, + homing_offset=0, + range_min=range_mins[joint], + range_max=range_maxes[joint], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the joints while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: `joints` is not `None` or a list. + ValueError: any joint's recorded min and max are the same. + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self._read(joints, normalize=False) + mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()} + maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}") + + if enter_pressed(): + break + + if display_values: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes + + def configure(self) -> None: + pass + + def _normalize(self, values: dict[str, int], joints: list[str] | None = None): + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + drive_mode = self.calibration[joint].drive_mode + bounded_val = min(max_, max(min_, val)) + + if self.joints[joint] is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[joint] = -norm if drive_mode else norm + elif self.joints[joint] is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[joint] = 100 - norm if drive_mode else norm + + return normalized_values + + def _read( + self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1 + ) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if not self.new_state_event.wait(timeout=timeout): + raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.") + + with self.lock: + state = self._state + + self.new_state_event.clear() + + if state is None: + raise RuntimeError(f"{self} Internal error: Event set but no state available.") + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + if normalize: + state = self._normalize(state, joints) + + return state + + def _read_loop(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while True: + try: + if self.serial.in_waiting > 0: + self.serial.flush() + raw_values = self.serial.readline().decode("utf-8").strip().split(" ") + if len(raw_values) != 21: # 16 raw + 5 angle values + continue + + joint_angles = { + "shoulder_pitch": int(raw_values[19]), + "shoulder_yaw": int(raw_values[18]), + "shoulder_roll": int(raw_values[20]), + "elbow_flex": int(raw_values[17]), + "wrist_roll": int(raw_values[16]), + "wrist_yaw": int(raw_values[1]), + "wrist_pitch": int(raw_values[0]), + } + + with self.lock: + self._state = joint_angles + self.new_state_event.set() + + except Exception as e: + logger.debug(f"Error reading frame in background thread for {self}: {e}") + + def get_action(self) -> dict[str, float]: + joint_positions = self._read() + return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.thread.join(timeout=0.5) + self.serial.close() + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py new file mode 100644 index 000000000..aebe6057c --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +from pprint import pformat + +import serial + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import MotorCalibration +from lerobot.common.motors.motors_bus import MotorNormMode +from lerobot.common.utils.utils import enter_pressed, move_cursor_up + +from ..teleoperator import Teleoperator +from .config_homonculus import HomonculusGloveConfig + +logger = logging.getLogger(__name__) + +LEFT_HAND_INVERSIONS = [ + "thumb_cmc", + "index_dip", + "middle_mcp_abduction", + "middle_dip", + "pinky_mcp_abduction", + "pinky_dip", +] + +RIGHT_HAND_INVERSIONS = [ + "thumb_mcp", + "thumb_cmc", + "index_mcp_abduction", + "index_dip", + "middle_mcp_abduction", + "middle_dip", + "ring_mcp_abduction", + "ring_mcp_flexion", + "ring_dip", + "pinky_mcp_abduction", +] + + +class HomonculusGlove(Teleoperator): + """ + Homonculus Glove designed by Hugging Face. + """ + + config_class = HomonculusGloveConfig + name = "homonculus_glove" + + def __init__(self, config: HomonculusGloveConfig): + super().__init__(config) + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + + self.joints = { + "thumb_cmc": MotorNormMode.RANGE_0_100, + "thumb_mcp": MotorNormMode.RANGE_0_100, + "thumb_pip": MotorNormMode.RANGE_0_100, + "thumb_dip": MotorNormMode.RANGE_0_100, + "index_mcp_abduction": MotorNormMode.RANGE_M100_100, + "index_mcp_flexion": MotorNormMode.RANGE_0_100, + "index_dip": MotorNormMode.RANGE_0_100, + "middle_mcp_abduction": MotorNormMode.RANGE_M100_100, + "middle_mcp_flexion": MotorNormMode.RANGE_0_100, + "middle_dip": MotorNormMode.RANGE_0_100, + "ring_mcp_abduction": MotorNormMode.RANGE_M100_100, + "ring_mcp_flexion": MotorNormMode.RANGE_0_100, + "ring_dip": MotorNormMode.RANGE_0_100, + "pinky_mcp_abduction": MotorNormMode.RANGE_M100_100, + "pinky_mcp_flexion": MotorNormMode.RANGE_0_100, + "pinky_dip": MotorNormMode.RANGE_0_100, + } + self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS + + self._state: dict[str, float] | None = None + self.new_state_event = threading.Event() + self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop") + self.lock = threading.Lock() + + @property + def action_features(self) -> dict: + return {f"{joint}.pos": float for joint in self.joints} + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + return self.thread.is_alive() and self.serial.is_open + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + if not self.serial.is_open: + self.serial.open() + self.thread.start() + + # wait for the thread to ramp up & 1st state to be ready + if not self.new_state_event.wait(timeout=2): + raise TimeoutError(f"{self}: Timed out waiting for state after 2s.") + + if not self.is_calibrated: + self.calibrate() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.calibration_fpath.is_file() + + def calibrate(self) -> None: + range_mins, range_maxes = {}, {} + for finger in ["thumb", "index", "middle", "ring", "pinky"]: + print( + f"\nMove {finger} through its entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + finger_joints = [joint for joint in self.joints if joint.startswith(finger)] + finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints) + range_mins.update(finger_mins) + range_maxes.update(finger_maxes) + + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=1 if joint in self.inverted_joints else 0, + homing_offset=0, + range_min=range_mins[joint], + range_max=range_maxes[joint], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: `joints` is not `None` or a list. + ValueError: any joint's recorded min and max are the same. + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + display_len = max(len(key) for key in joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self._read(joints, normalize=False) + mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()} + maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print( + f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}" + ) + + if enter_pressed(): + break + + if display_values: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes + + def configure(self) -> None: + pass + + def _normalize(self, values: dict[str, int], joints: list[str] | None = None): + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + drive_mode = self.calibration[joint].drive_mode + bounded_val = min(max_, max(min_, val)) + + if self.joints[joint] is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[joint] = -norm if drive_mode else norm + elif self.joints[joint] is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[joint] = 100 - norm if drive_mode else norm + + return normalized_values + + def _read( + self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1 + ) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if not self.new_state_event.wait(timeout=timeout): + raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.") + + with self.lock: + state = self._state + + self.new_state_event.clear() + + if state is None: + raise RuntimeError(f"{self} Internal error: Event set but no state available.") + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + if normalize: + state = self._normalize(state, joints) + + return state + + def _read_loop(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while True: + try: + if self.serial.in_waiting > 0: + self.serial.flush() + positions = self.serial.readline().decode("utf-8").strip().split(" ") + if len(positions) != len(self.joints): + continue + + joint_positions = { + joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True) + } + + with self.lock: + self._state = joint_positions + self.new_state_event.set() + + except Exception as e: + logger.debug(f"Error reading frame in background thread for {self}: {e}") + + def get_action(self) -> dict[str, float]: + joint_positions = self._read() + return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.thread.join(timeout=0.5) + self.serial.close() + logger.info(f"{self} disconnected.")