#!/usr/bin/env python # Copyright 2025 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from functools import cached_property from lerobot.processor import RobotAction, RobotObservation from lerobot.robots.so_follower import SOFollower, SOFollowerRobotConfig from ..robot import Robot from .config_bi_so_follower import BiSOFollowerConfig logger = logging.getLogger(__name__) class BiSOFollower(Robot): """ [Bimanual SO Follower Arms](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio """ config_class = BiSOFollowerConfig name = "bi_so_follower" def __init__(self, config: BiSOFollowerConfig): super().__init__(config) self.config = config left_arm_config = SOFollowerRobotConfig( id=f"{config.id}_left" if config.id else None, calibration_dir=config.calibration_dir, port=config.left_arm_config.port, disable_torque_on_disconnect=config.left_arm_config.disable_torque_on_disconnect, max_relative_target=config.left_arm_config.max_relative_target, use_degrees=config.left_arm_config.use_degrees, cameras=config.left_arm_config.cameras, ) right_arm_config = SOFollowerRobotConfig( id=f"{config.id}_right" if config.id else None, calibration_dir=config.calibration_dir, port=config.right_arm_config.port, disable_torque_on_disconnect=config.right_arm_config.disable_torque_on_disconnect, max_relative_target=config.right_arm_config.max_relative_target, use_degrees=config.right_arm_config.use_degrees, cameras=config.right_arm_config.cameras, ) self.left_arm = SOFollower(left_arm_config) self.right_arm = SOFollower(right_arm_config) # Only for compatibility with other parts of the codebase that expect a `robot.cameras` attribute self.cameras = {**self.left_arm.cameras, **self.right_arm.cameras} @property def _motors_ft(self) -> dict[str, type]: left_arm_motors_ft = self.left_arm._motors_ft right_arm_motors_ft = self.right_arm._motors_ft return { **{f"left_{k}": v for k, v in left_arm_motors_ft.items()}, **{f"right_{k}": v for k, v in right_arm_motors_ft.items()}, } @property def _cameras_ft(self) -> dict[str, tuple]: left_arm_cameras_ft = self.left_arm._cameras_ft right_arm_cameras_ft = self.right_arm._cameras_ft return { **{f"left_{k}": v for k, v in left_arm_cameras_ft.items()}, **{f"right_{k}": v for k, v in right_arm_cameras_ft.items()}, } @cached_property def observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property def action_features(self) -> dict[str, type]: return self._motors_ft @property def is_connected(self) -> bool: return self.left_arm.is_connected and self.right_arm.is_connected def connect(self, calibrate: bool = True) -> None: self.left_arm.connect(calibrate) self.right_arm.connect(calibrate) @property def is_calibrated(self) -> bool: return self.left_arm.is_calibrated and self.right_arm.is_calibrated def calibrate(self) -> None: self.left_arm.calibrate() self.right_arm.calibrate() def configure(self) -> None: self.left_arm.configure() self.right_arm.configure() def setup_motors(self) -> None: self.left_arm.setup_motors() self.right_arm.setup_motors() def get_observation(self) -> RobotObservation: obs_dict = {} # Add "left_" prefix left_obs = self.left_arm.get_observation() obs_dict.update({f"left_{key}": value for key, value in left_obs.items()}) # Add "right_" prefix right_obs = self.right_arm.get_observation() obs_dict.update({f"right_{key}": value for key, value in right_obs.items()}) return obs_dict def send_action(self, action: RobotAction) -> RobotAction: # Remove "left_" prefix left_action = { key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_") } # Remove "right_" prefix right_action = { key.removeprefix("right_"): value for key, value in action.items() if key.startswith("right_") } sent_action_left = self.left_arm.send_action(left_action) sent_action_right = self.right_arm.send_action(right_action) # Add prefixes back prefixed_sent_action_left = {f"left_{key}": value for key, value in sent_action_left.items()} prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()} return {**prefixed_sent_action_left, **prefixed_sent_action_right} def disconnect(self): self.left_arm.disconnect() self.right_arm.disconnect()