# !/usr/bin/env python # Copyright 2025 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from lerobot.cameras.opencv import OpenCVCameraConfig from lerobot.common.control_utils import init_keyboard_listener from lerobot.datasets import LeRobotDataset from lerobot.processor import make_default_processors from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig from lerobot.scripts.lerobot_record import record_loop from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig from lerobot.utils.constants import ACTION, OBS_STR from lerobot.utils.feature_utils import hw_to_dataset_features from lerobot.utils.utils import log_say from lerobot.utils.visualization_utils import init_rerun NUM_EPISODES = 2 FPS = 30 EPISODE_TIME_SEC = 60 RESET_TIME_SEC = 30 TASK_DESCRIPTION = "My task description" HF_REPO_ID = "/" def main(): # Create the robot and teleoperator configurations camera_config = {"front": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=FPS)} follower_config = SO100FollowerConfig( port="/dev/tty.usbmodem5A460814411", id="my_awesome_follower_arm", cameras=camera_config, use_degrees=True, ) leader_config = SO100LeaderConfig(port="/dev/tty.usbmodem5A460819811", id="my_awesome_leader_arm") # Initialize the robot and teleoperator follower = SO100Follower(follower_config) leader = SO100Leader(leader_config) # Configure the dataset features action_features = hw_to_dataset_features(follower.action_features, ACTION) obs_features = hw_to_dataset_features(follower.observation_features, OBS_STR) dataset_features = {**action_features, **obs_features} # Create the dataset dataset = LeRobotDataset.create( repo_id=HF_REPO_ID, fps=FPS, features=dataset_features, robot_type=follower.name, use_videos=True, image_writer_threads=4, ) # Connect the robot and teleoperator leader.connect() follower.connect() # Initialize the keyboard listener and rerun visualization listener, events = init_keyboard_listener() init_rerun(session_name="recording_phone") try: if not leader.is_connected or not follower.is_connected: raise ValueError("Robot or teleop is not connected!") teleop_action_processor, robot_action_processor, robot_observation_processor = ( make_default_processors() ) print("Starting record loop...") episode_idx = 0 while episode_idx < NUM_EPISODES and not events["stop_recording"]: log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") # Main record loop record_loop( robot=follower, events=events, fps=FPS, teleop_action_processor=teleop_action_processor, robot_action_processor=robot_action_processor, robot_observation_processor=robot_observation_processor, teleop=leader, dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, ) # Reset the environment if not stopping or re-recording if not events["stop_recording"] and ( episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"] ): log_say("Reset the environment") record_loop( robot=follower, events=events, fps=FPS, teleop_action_processor=teleop_action_processor, robot_action_processor=robot_action_processor, robot_observation_processor=robot_observation_processor, teleop=leader, control_time_s=RESET_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, ) if events["rerecord_episode"]: log_say("Re-recording episode") events["rerecord_episode"] = False events["exit_early"] = False dataset.clear_episode_buffer() continue # Save episode dataset.save_episode() episode_idx += 1 finally: # Clean up log_say("Stop recording") leader.disconnect() follower.disconnect() listener.stop() dataset.finalize() dataset.push_to_hub() if __name__ == "__main__": main()