diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index 58aa6e16d..948e92bb8 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -404,82 +404,63 @@ def record(cfg: RecordConfig) -> LeRobotDataset: ), ) - if cfg.resume: - dataset = LeRobotDataset( - cfg.dataset.repo_id, - root=cfg.dataset.root, - batch_encoding_size=cfg.dataset.video_encoding_batch_size, - ) + dataset = None + listener = None - if hasattr(robot, "cameras") and len(robot.cameras) > 0: - dataset.start_image_writer( - num_processes=cfg.dataset.num_image_writer_processes, - num_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), - ) - sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features) - else: - # Create empty dataset or load existing saved episodes - sanity_check_dataset_name(cfg.dataset.repo_id, cfg.policy) - dataset = LeRobotDataset.create( - cfg.dataset.repo_id, - cfg.dataset.fps, - root=cfg.dataset.root, - robot_type=robot.name, - features=dataset_features, - use_videos=cfg.dataset.video, - image_writer_processes=cfg.dataset.num_image_writer_processes, - image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), - batch_encoding_size=cfg.dataset.video_encoding_batch_size, - ) - - # Load pretrained policy - policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta) - preprocessor = None - postprocessor = None - if cfg.policy is not None: - preprocessor, postprocessor = make_pre_post_processors( - policy_cfg=cfg.policy, - pretrained_path=cfg.policy.pretrained_path, - dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map), - preprocessor_overrides={ - "device_processor": {"device": cfg.policy.device}, - "rename_observations_processor": {"rename_map": cfg.dataset.rename_map}, - }, - ) - - robot.connect() - if teleop is not None: - teleop.connect() - - listener, events = init_keyboard_listener() - - with VideoEncodingManager(dataset): - recorded_episodes = 0 - while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]: - log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds) - record_loop( - robot=robot, - events=events, - fps=cfg.dataset.fps, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, - teleop=teleop, - policy=policy, - preprocessor=preprocessor, - postprocessor=postprocessor, - dataset=dataset, - control_time_s=cfg.dataset.episode_time_s, - single_task=cfg.dataset.single_task, - display_data=cfg.display_data, + try: + if cfg.resume: + dataset = LeRobotDataset( + cfg.dataset.repo_id, + root=cfg.dataset.root, + batch_encoding_size=cfg.dataset.video_encoding_batch_size, ) - # Execute a few seconds without recording to give time to manually reset the environment - # Skip reset for the last episode to be recorded - if not events["stop_recording"] and ( - (recorded_episodes < cfg.dataset.num_episodes - 1) or events["rerecord_episode"] - ): - log_say("Reset the environment", cfg.play_sounds) + if hasattr(robot, "cameras") and len(robot.cameras) > 0: + dataset.start_image_writer( + num_processes=cfg.dataset.num_image_writer_processes, + num_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), + ) + sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features) + else: + # Create empty dataset or load existing saved episodes + sanity_check_dataset_name(cfg.dataset.repo_id, cfg.policy) + dataset = LeRobotDataset.create( + cfg.dataset.repo_id, + cfg.dataset.fps, + root=cfg.dataset.root, + robot_type=robot.name, + features=dataset_features, + use_videos=cfg.dataset.video, + image_writer_processes=cfg.dataset.num_image_writer_processes, + image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), + batch_encoding_size=cfg.dataset.video_encoding_batch_size, + ) + + # Load pretrained policy + policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta) + preprocessor = None + postprocessor = None + if cfg.policy is not None: + preprocessor, postprocessor = make_pre_post_processors( + policy_cfg=cfg.policy, + pretrained_path=cfg.policy.pretrained_path, + dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map), + preprocessor_overrides={ + "device_processor": {"device": cfg.policy.device}, + "rename_observations_processor": {"rename_map": cfg.dataset.rename_map}, + }, + ) + + robot.connect() + if teleop is not None: + teleop.connect() + + listener, events = init_keyboard_listener() + + with VideoEncodingManager(dataset): + recorded_episodes = 0 + while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]: + log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds) record_loop( robot=robot, events=events, @@ -488,34 +469,61 @@ def record(cfg: RecordConfig) -> LeRobotDataset: robot_action_processor=robot_action_processor, robot_observation_processor=robot_observation_processor, teleop=teleop, - control_time_s=cfg.dataset.reset_time_s, + policy=policy, + preprocessor=preprocessor, + postprocessor=postprocessor, + dataset=dataset, + control_time_s=cfg.dataset.episode_time_s, single_task=cfg.dataset.single_task, display_data=cfg.display_data, ) - if events["rerecord_episode"]: - log_say("Re-record episode", cfg.play_sounds) - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Execute a few seconds without recording to give time to manually reset the environment + # Skip reset for the last episode to be recorded + if not events["stop_recording"] and ( + (recorded_episodes < cfg.dataset.num_episodes - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment", cfg.play_sounds) + record_loop( + robot=robot, + events=events, + fps=cfg.dataset.fps, + teleop_action_processor=teleop_action_processor, + robot_action_processor=robot_action_processor, + robot_observation_processor=robot_observation_processor, + teleop=teleop, + control_time_s=cfg.dataset.reset_time_s, + single_task=cfg.dataset.single_task, + display_data=cfg.display_data, + ) - dataset.save_episode() - recorded_episodes += 1 + if events["rerecord_episode"]: + log_say("Re-record episode", cfg.play_sounds) + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - log_say("Stop recording", cfg.play_sounds, blocking=True) + dataset.save_episode() + recorded_episodes += 1 + finally: + log_say("Stop recording", cfg.play_sounds, blocking=True) - robot.disconnect() - if teleop is not None: - teleop.disconnect() + if dataset: + dataset.finalize() - if not is_headless() and listener is not None: - listener.stop() + if robot.is_connected: + robot.disconnect() + if teleop and teleop.is_connected: + teleop.disconnect() - if cfg.dataset.push_to_hub: - dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private) + if not is_headless() and listener: + listener.stop() - log_say("Exiting", cfg.play_sounds) + if cfg.dataset.push_to_hub: + dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private) + + log_say("Exiting", cfg.play_sounds) return dataset