diff --git a/examples/so100_to_so100_EE/record.py b/examples/so100_to_so100_EE/record.py new file mode 100644 index 000000000..f46b1c7f0 --- /dev/null +++ b/examples/so100_to_so100_EE/record.py @@ -0,0 +1,191 @@ +# !/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from typing import Any + +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features +from lerobot.datasets.utils import combine_feature_dicts +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import RobotAction, RobotProcessorPipeline +from lerobot.processor.converters import ( + observation_to_transition, + robot_action_to_transition, + transition_to_observation, + transition_to_robot_action, +) +from lerobot.record import record_loop +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.robot_kinematic_processor import ( + AddRobotObservationAsComplimentaryData, + EEBoundsAndSafety, + ForwardKinematicsJointsToEE, + InverseKinematicsEEToJoints, +) +from lerobot.robots.so100_follower.so100_follower import SO100Follower +from lerobot.teleoperators.so100_leader.config_so100_leader import SO100LeaderConfig +from lerobot.teleoperators.so100_leader.so100_leader import SO100Leader +from lerobot.utils.control_utils import init_keyboard_listener +from lerobot.utils.utils import log_say +from lerobot.utils.visualization_utils import _init_rerun + +NUM_EPISODES = 10 +FPS = 30 +EPISODE_TIME_SEC = 60 +RESET_TIME_SEC = 30 +TASK_DESCRIPTION = "My task description" +HF_REPO_ID = "imstevenpmwork/kin_chadil" + str(time.time()) + +camera_config = {"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=FPS)} +follower_config = SO100FollowerConfig( + port="/dev/tty.usbmodem5A460814411", id="my_awesome_follower_arm", cameras=camera_config, use_degrees=True +) + +leader_config = SO100LeaderConfig(port="/dev/tty.usbmodem5A460819811", id="my_awesome_leader_arm") + +follower = SO100Follower(follower_config) +leader = SO100Leader(leader_config) + +follower_kinematics_solver = RobotKinematics( + urdf_path="./examples/phone_to_so100/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(follower.bus.motors.keys()), +) + +leader_kinematics_solver = RobotKinematics( + urdf_path="./examples/phone_to_so100/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(leader.bus.motors.keys()), +) + +follower_joints_to_ee = RobotProcessorPipeline[dict[str, Any], dict[str, Any]]( + steps=[ + ForwardKinematicsJointsToEE( + kinematics=follower_kinematics_solver, motor_names=list(follower.bus.motors.keys()) + ), + ], + to_transition=observation_to_transition, + to_output=transition_to_observation, +) + +leader_joints_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[ + ForwardKinematicsJointsToEE( + kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys()) + ), + ], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, +) + + +ee_to_follower_joints = RobotProcessorPipeline[RobotAction, RobotAction]( + [ + AddRobotObservationAsComplimentaryData(robot=follower), + EEBoundsAndSafety( + end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, + max_ee_step_m=0.10, + max_ee_twist_step_rad=0.50, + ), + InverseKinematicsEEToJoints( + kinematics=follower_kinematics_solver, + motor_names=list(follower.bus.motors.keys()), + ), + ], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, +) + +# Create the dataset +dataset = LeRobotDataset.create( + repo_id=HF_REPO_ID, + fps=FPS, + features=combine_feature_dicts( + aggregate_pipeline_dataset_features( + pipeline=leader_joints_to_ee, + initial_features=create_initial_features(action=leader.action_features), + use_videos=True, + ), + aggregate_pipeline_dataset_features( + pipeline=follower_joints_to_ee, + initial_features=create_initial_features(observation=follower.observation_features), + use_videos=True, + ), + ), + robot_type=follower.name, + use_videos=True, + image_writer_threads=4, +) + +# Initialize the keyboard listener and rerun visualization +_, events = init_keyboard_listener() +_init_rerun(session_name="recording_phone") + +# Connect the robot and teleoperator +leader.connect() +follower.connect() + +episode_idx = 0 +while episode_idx < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") + + record_loop( + robot=follower, + events=events, + fps=FPS, + teleop=leader, + dataset=dataset, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=leader_joints_to_ee, + robot_action_processor=ee_to_follower_joints, + robot_observation_processor=follower_joints_to_ee, + ) + + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]): + log_say("Reset the environment") + record_loop( + robot=follower, + events=events, + fps=FPS, + teleop=leader, + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=leader_joints_to_ee, + robot_action_processor=ee_to_follower_joints, + robot_observation_processor=follower_joints_to_ee, + ) + + if events["rerecord_episode"]: + log_say("Re-recording episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue + + dataset.save_episode() + episode_idx += 1 + +# Clean up +log_say("Stop recording") +leader.disconnect() +follower.disconnect() +dataset.push_to_hub() diff --git a/examples/so100_to_so100_EE/teleoperate.py b/examples/so100_to_so100_EE/teleoperate.py new file mode 100644 index 000000000..db6da7444 --- /dev/null +++ b/examples/so100_to_so100_EE/teleoperate.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specif + +import time + +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import RobotAction, RobotProcessorPipeline +from lerobot.processor.converters import ( + robot_action_to_transition, + transition_to_robot_action, +) +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.robot_kinematic_processor import ( + AddRobotObservationAsComplimentaryData, + EEBoundsAndSafety, + ForwardKinematicsJointsToEE, + InverseKinematicsEEToJoints, +) +from lerobot.robots.so100_follower.so100_follower import SO100Follower +from lerobot.teleoperators.so100_leader.config_so100_leader import SO100LeaderConfig +from lerobot.teleoperators.so100_leader.so100_leader import SO100Leader + +follower_config = SO100FollowerConfig( + port="/dev/tty.usbmodem5A460814411", id="my_awesome_follower_arm", use_degrees=True +) + +leader_config = SO100LeaderConfig(port="/dev/tty.usbmodem5A460819811", id="my_awesome_leader_arm") + +follower = SO100Follower(follower_config) +leader = SO100Leader(leader_config) + +follower_kinematics_solver = RobotKinematics( + urdf_path="./examples/phone_to_so100/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(follower.bus.motors.keys()), +) + +leader_kinematics_solver = RobotKinematics( + urdf_path="./examples/phone_to_so100/SO101/so101_new_calib.urdf", + target_frame_name="gripper_frame_link", + joint_names=list(leader.bus.motors.keys()), +) + + +leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( + steps=[ + ForwardKinematicsJointsToEE( + kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys()) + ), + ], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, +) + +ee_to_follower_joints = RobotProcessorPipeline[RobotAction, RobotAction]( + [ + AddRobotObservationAsComplimentaryData(robot=follower), + EEBoundsAndSafety( + end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, + max_ee_step_m=0.10, + max_ee_twist_step_rad=0.50, + ), + InverseKinematicsEEToJoints( + kinematics=follower_kinematics_solver, + motor_names=list(follower.bus.motors.keys()), + initial_guess_current_joints=False, + ), + ], + to_transition=robot_action_to_transition, + to_output=transition_to_robot_action, +) + +follower.connect() +leader.connect() + +print("Starting teleop loop. Move your phone to teleoperate the robot.") +while True: + # Get leader joints observations + leader_joints_obs = leader.get_action() + + # Convert them to EE + leader_ee_act = leader_to_ee(leader_joints_obs) + + # Convert EE to follower joints actions + follower_joints_act = ee_to_follower_joints(leader_ee_act) + + if follower_joints_act: + follower.send_action(follower_joints_act) + + time.sleep(0.01) diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index 89256fe94..0a558f8e5 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -262,7 +262,7 @@ def robot_action_to_transition(action: RobotAction) -> EnvTransition: An `EnvTransition` containing the formatted action. """ - return create_transition(observation={}, action=action) + return create_transition(action=action) def observation_to_transition(observation: RobotObservation) -> EnvTransition: @@ -283,7 +283,7 @@ def observation_to_transition(observation: RobotObservation) -> EnvTransition: image_observations = {f"{OBS_IMAGES}.{cam}": img for cam, img in images.items()} - return create_transition(observation={**state, **image_observations}, action={}) + return create_transition(observation={**state, **image_observations}) def transition_to_robot_action(transition: EnvTransition) -> RobotAction: diff --git a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py index 523881151..a302a7b1f 100644 --- a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py @@ -406,9 +406,31 @@ class GripperVelocityToJoint(RobotActionProcessorStep): return features -@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee") +def compute_forward_kinematics_joints_to_ee( + joints: dict[str, Any], kinematics: RobotKinematics, motor_names: list[str] +) -> dict[str, Any]: + motor_joint_values = [joints[f"{n}.pos"] for n in motor_names] + + q = np.array(motor_joint_values, dtype=float) + t = kinematics.forward_kinematics(q) + pos = t[:3, 3] + tw = Rotation.from_matrix(t[:3, :3]).as_rotvec() + gripper_pos = joints["gripper.pos"] + for n in motor_names: + joints.pop(f"{n}.pos") + joints["ee.x"] = float(pos[0]) + joints["ee.y"] = float(pos[1]) + joints["ee.z"] = float(pos[2]) + joints["ee.wx"] = float(tw[0]) + joints["ee.wy"] = float(tw[1]) + joints["ee.wz"] = float(tw[2]) + joints["ee.gripper_pos"] = float(gripper_pos) + return joints + + +@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_observation") @dataclass -class ForwardKinematicsJointsToEE(ObservationProcessorStep): +class ForwardKinematicsJointsToEEObservation(ObservationProcessorStep): """ Computes the end-effector pose from joint positions using forward kinematics (FK). @@ -423,26 +445,7 @@ class ForwardKinematicsJointsToEE(ObservationProcessorStep): motor_names: list[str] def observation(self, observation: dict[str, Any]) -> dict[str, Any]: - motor_joint_values = [observation[f"{n}.pos"] for n in self.motor_names] - - q = np.array(motor_joint_values, dtype=float) - t = self.kinematics.forward_kinematics(q) - pos = t[:3, 3] - tw = Rotation.from_matrix(t[:3, :3]).as_rotvec() - - gripper_pos = observation["gripper.pos"] - - for n in self.motor_names: - observation.pop(f"{n}.pos") - - observation["ee.x"] = float(pos[0]) - observation["ee.y"] = float(pos[1]) - observation["ee.z"] = float(pos[2]) - observation["ee.wx"] = float(tw[0]) - observation["ee.wy"] = float(tw[1]) - observation["ee.wz"] = float(tw[2]) - observation["ee.gripper_pos"] = float(gripper_pos) - return observation + return compute_forward_kinematics_joints_to_ee(observation, self.kinematics, self.motor_names) def transform_features( self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] @@ -458,6 +461,39 @@ class ForwardKinematicsJointsToEE(ObservationProcessorStep): return features +@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_action") +@dataclass +class ForwardKinematicsJointsToEEAction(RobotActionProcessorStep): + """ + Computes the end-effector pose from joint positions using forward kinematics (FK). + + This step is typically used to add the robot's Cartesian pose to the observation space, + which can be useful for visualization or as an input to a policy. + + Attributes: + kinematics: The robot's kinematic model. + """ + + kinematics: RobotKinematics + motor_names: list[str] + + def action(self, action: RobotAction) -> RobotAction: + return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names) + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + # We only use the ee pose in the dataset, so we don't need the joint positions + for n in self.motor_names: + features[PipelineFeatureType.ACTION].pop(f"{n}.pos", None) + # We specify the dataset features of this step that we want to be stored in the dataset + for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]: + features[PipelineFeatureType.ACTION][f"ee.{k}"] = PolicyFeature( + type=FeatureType.STATE, shape=(1,) + ) + return features + + @ProcessorStepRegistry.register("add_robot_observation") @dataclass class AddRobotObservationAsComplimentaryData(ComplementaryDataProcessorStep): @@ -491,3 +527,34 @@ class AddRobotObservationAsComplimentaryData(ComplementaryDataProcessorStep): self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: return features + + +@ProcessorStepRegistry.register(name="forward_kinematics_joints_to_ee") +@dataclass +class ForwardKinematicsJointsToEE(ProcessorStep): + kinematics: RobotKinematics + motor_names: list[str] + + def __post_init__(self): + self.joints_to_ee_action_processor = ForwardKinematicsJointsToEEAction( + kinematics=self.kinematics, motor_names=self.motor_names + ) + self.joints_to_ee_observation_processor = ForwardKinematicsJointsToEEObservation( + kinematics=self.kinematics, motor_names=self.motor_names + ) + + def __call__(self, transition: EnvTransition) -> EnvTransition: + if transition.get(TransitionKey.ACTION) is not None: + transition = self.joints_to_ee_action_processor(transition) + if transition.get(TransitionKey.OBSERVATION) is not None: + transition = self.joints_to_ee_observation_processor(transition) + return transition + + def transform_features( + self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] + ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: + if features[PipelineFeatureType.ACTION] is not None: + features = self.joints_to_ee_action_processor.transform_features(features) + if features[PipelineFeatureType.OBSERVATION] is not None: + features = self.joints_to_ee_observation_processor.transform_features(features) + return features