diff --git a/examples/backward_compatibility/replay.py b/examples/backward_compatibility/replay.py index ed78d016f..8de5ba197 100644 --- a/examples/backward_compatibility/replay.py +++ b/examples/backward_compatibility/replay.py @@ -81,24 +81,25 @@ def replay(cfg: ReplayConfig): actions = dataset.hf_dataset.select_columns(ACTION) robot.connect() - log_say("Replaying episode", cfg.play_sounds, blocking=True) - for idx in range(dataset.num_frames): - start_episode_t = time.perf_counter() + try: + log_say("Replaying episode", cfg.play_sounds, blocking=True) + for idx in range(dataset.num_frames): + start_episode_t = time.perf_counter() - action_array = actions[idx][ACTION] - action = {} - for i, name in enumerate(dataset.features[ACTION]["names"]): - key = f"{name.removeprefix('main_')}.pos" - action[key] = action_array[i].item() + action_array = actions[idx][ACTION] + action = {} + for i, name in enumerate(dataset.features[ACTION]["names"]): + key = f"{name.removeprefix('main_')}.pos" + action[key] = action_array[i].item() - action["shoulder_lift.pos"] = -(action["shoulder_lift.pos"] - 90) - action["elbow_flex.pos"] -= 90 - robot.send_action(action) + action["shoulder_lift.pos"] = -(action["shoulder_lift.pos"] - 90) + action["elbow_flex.pos"] -= 90 + robot.send_action(action) - dt_s = time.perf_counter() - start_episode_t - precise_sleep(max(1 / dataset.fps - dt_s, 0.0)) - - robot.disconnect() + dt_s = time.perf_counter() - start_episode_t + precise_sleep(max(1 / dataset.fps - dt_s, 0.0)) + finally: + robot.disconnect() if __name__ == "__main__": diff --git a/examples/lekiwi/evaluate.py b/examples/lekiwi/evaluate.py index 2f7f9f95f..a3144a442 100644 --- a/examples/lekiwi/evaluate.py +++ b/examples/lekiwi/evaluate.py @@ -78,40 +78,24 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="lekiwi_evaluate") - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting evaluate loop...") - recorded_episodes = 0 - while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: - log_say(f"Running inference, recording eval episode {recorded_episodes} of {NUM_EPISODES}") + print("Starting evaluate loop...") + recorded_episodes = 0 + while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Running inference, recording eval episode {recorded_episodes} of {NUM_EPISODES}") - # Main record loop - record_loop( - robot=robot, - events=events, - fps=FPS, - policy=policy, - preprocessor=preprocessor, # Pass the pre and post policy processors - postprocessor=postprocessor, - dataset=dataset, - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and ( - (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] - ): - log_say("Reset the environment") + # Main record loop record_loop( robot=robot, events=events, fps=FPS, + policy=policy, + preprocessor=preprocessor, # Pass the pre and post policy processors + postprocessor=postprocessor, + dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, @@ -120,24 +104,42 @@ def main(): robot_observation_processor=robot_observation_processor, ) - if events["rerecord_episode"]: - log_say("Re-record episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=teleop_action_processor, + robot_action_processor=robot_action_processor, + robot_observation_processor=robot_observation_processor, + ) - # Save episode - dataset.save_episode() - recorded_episodes += 1 + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - robot.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + recorded_episodes += 1 - dataset.finalize() - dataset.push_to_hub() + finally: + # Clean up + log_say("Stop recording") + robot.disconnect() + listener.stop() + + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/lekiwi/record.py b/examples/lekiwi/record.py index 18b9f857e..9292157f7 100644 --- a/examples/lekiwi/record.py +++ b/examples/lekiwi/record.py @@ -74,40 +74,23 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="lekiwi_record") - if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: - raise ValueError("Robot or teleop is not connected!") + try: + if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: + raise ValueError("Robot or teleop is not connected!") - print("Starting record loop...") - recorded_episodes = 0 - while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: - log_say(f"Recording episode {recorded_episodes}") + print("Starting record loop...") + recorded_episodes = 0 + while recorded_episodes < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {recorded_episodes}") - # Main record loop - record_loop( - robot=robot, - events=events, - fps=FPS, - dataset=dataset, - teleop=[leader_arm, keyboard], - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=teleop_action_processor, - robot_action_processor=robot_action_processor, - robot_observation_processor=robot_observation_processor, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and ( - (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] - ): - log_say("Reset the environment") + # Main record loop record_loop( robot=robot, events=events, fps=FPS, + dataset=dataset, teleop=[leader_arm, keyboard], - control_time_s=RESET_TIME_SEC, + control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, teleop_action_processor=teleop_action_processor, @@ -115,26 +98,44 @@ def main(): robot_observation_processor=robot_observation_processor, ) - if events["rerecord_episode"]: - log_say("Re-record episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + (recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + teleop=[leader_arm, keyboard], + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=teleop_action_processor, + robot_action_processor=robot_action_processor, + robot_observation_processor=robot_observation_processor, + ) - # Save episode - dataset.save_episode() - recorded_episodes += 1 + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - robot.disconnect() - leader_arm.disconnect() - keyboard.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + recorded_episodes += 1 + finally: + # Clean up + log_say("Stop recording") + robot.disconnect() + leader_arm.disconnect() + keyboard.disconnect() + listener.stop() - dataset.finalize() - dataset.push_to_hub() + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/lekiwi/replay.py b/examples/lekiwi/replay.py index 872dacf27..cf89aea16 100644 --- a/examples/lekiwi/replay.py +++ b/examples/lekiwi/replay.py @@ -42,25 +42,27 @@ def main(): # Connect to the robot robot.connect() - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting replay loop...") - log_say(f"Replaying episode {EPISODE_IDX}") - for idx in range(len(episode_frames)): - t0 = time.perf_counter() + print("Starting replay loop...") + log_say(f"Replaying episode {EPISODE_IDX}") + for idx in range(len(episode_frames)): + t0 = time.perf_counter() - # Get recorded action from dataset - action = { - name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"]) - } + # Get recorded action from dataset + action = { + name: float(actions[idx][ACTION][i]) + for i, name in enumerate(dataset.features[ACTION]["names"]) + } - # Send action to robot - _ = robot.send_action(action) + # Send action to robot + _ = robot.send_action(action) - precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) - - robot.disconnect() + precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) + finally: + robot.disconnect() if __name__ == "__main__": diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index 246c923aa..837217eda 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -142,38 +142,24 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="phone_so100_evaluate") - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting evaluate loop...") - episode_idx = 0 - for episode_idx in range(NUM_EPISODES): - log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") + print("Starting evaluate loop...") + episode_idx = 0 + for episode_idx in range(NUM_EPISODES): + log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop - record_loop( - robot=robot, - events=events, - fps=FPS, - policy=policy, - preprocessor=preprocessor, # Pass the pre and post policy processors - postprocessor=postprocessor, - dataset=dataset, - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and ((episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]): - log_say("Reset the environment") + # Main record loop record_loop( robot=robot, events=events, fps=FPS, + policy=policy, + preprocessor=preprocessor, # Pass the pre and post policy processors + postprocessor=postprocessor, + dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, @@ -182,24 +168,41 @@ def main(): robot_observation_processor=robot_joints_to_ee_pose_processor, ) - if events["rerecord_episode"]: - log_say("Re-record episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + (episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=make_default_teleop_action_processor(), + robot_action_processor=robot_ee_to_joints_processor, + robot_observation_processor=robot_joints_to_ee_pose_processor, + ) - # Save episode - dataset.save_episode() - episode_idx += 1 + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - robot.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + episode_idx += 1 + finally: + # Clean up + log_say("Stop recording") + robot.disconnect() + listener.stop() - dataset.finalize() - dataset.push_to_hub() + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index 7b5b704e2..1f5005db9 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -149,38 +149,23 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="phone_so100_record") - if not robot.is_connected or not phone.is_connected: - raise ValueError("Robot or teleop is not connected!") + try: + if not robot.is_connected or not phone.is_connected: + raise ValueError("Robot or teleop is not connected!") - print("Starting record loop. Move your phone to teleoperate the robot...") - episode_idx = 0 - while episode_idx < NUM_EPISODES and not events["stop_recording"]: - log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") + print("Starting record loop. Move your phone to teleoperate the robot...") + episode_idx = 0 + while episode_idx < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop - record_loop( - robot=robot, - events=events, - fps=FPS, - teleop=phone, - dataset=dataset, - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=phone_to_robot_ee_pose_processor, - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]): - log_say("Reset the environment") + # Main record loop record_loop( robot=robot, events=events, fps=FPS, teleop=phone, - control_time_s=RESET_TIME_SEC, + dataset=dataset, + control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, teleop_action_processor=phone_to_robot_ee_pose_processor, @@ -188,25 +173,43 @@ def main(): robot_observation_processor=robot_joints_to_ee_pose, ) - if events["rerecord_episode"]: - log_say("Re-recording episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + teleop=phone, + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=phone_to_robot_ee_pose_processor, + robot_action_processor=robot_ee_to_joints_processor, + robot_observation_processor=robot_joints_to_ee_pose, + ) - # Save episode - dataset.save_episode() - episode_idx += 1 + if events["rerecord_episode"]: + log_say("Re-recording episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - robot.disconnect() - phone.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + episode_idx += 1 + finally: + # Clean up + log_say("Stop recording") + robot.disconnect() + phone.disconnect() + listener.stop() - dataset.finalize() - dataset.push_to_hub() + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/phone_to_so100/replay.py b/examples/phone_to_so100/replay.py index 875025dfc..9d7806cf4 100644 --- a/examples/phone_to_so100/replay.py +++ b/examples/phone_to_so100/replay.py @@ -73,32 +73,34 @@ def main(): # Connect to the robot robot.connect() - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting replay loop...") - log_say(f"Replaying episode {EPISODE_IDX}") - for idx in range(len(episode_frames)): - t0 = time.perf_counter() + print("Starting replay loop...") + log_say(f"Replaying episode {EPISODE_IDX}") + for idx in range(len(episode_frames)): + t0 = time.perf_counter() - # Get recorded action from dataset - ee_action = { - name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"]) - } + # Get recorded action from dataset + ee_action = { + name: float(actions[idx][ACTION][i]) + for i, name in enumerate(dataset.features[ACTION]["names"]) + } - # Get robot observation - robot_obs = robot.get_observation() + # Get robot observation + robot_obs = robot.get_observation() - # Dataset EE -> robot joints - joint_action = robot_ee_to_joints_processor((ee_action, robot_obs)) + # Dataset EE -> robot joints + joint_action = robot_ee_to_joints_processor((ee_action, robot_obs)) - # Send action to robot - _ = robot.send_action(joint_action) + # Send action to robot + _ = robot.send_action(joint_action) - precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) - - # Clean up - robot.disconnect() + precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) + finally: + # Clean up + robot.disconnect() if __name__ == "__main__": diff --git a/examples/so100_to_so100_EE/evaluate.py b/examples/so100_to_so100_EE/evaluate.py index 87d188f99..b614b89f2 100644 --- a/examples/so100_to_so100_EE/evaluate.py +++ b/examples/so100_to_so100_EE/evaluate.py @@ -142,38 +142,24 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="so100_so100_evaluate") - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting evaluate loop...") - episode_idx = 0 - for episode_idx in range(NUM_EPISODES): - log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") + print("Starting evaluate loop...") + episode_idx = 0 + for episode_idx in range(NUM_EPISODES): + log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop - record_loop( - robot=robot, - events=events, - fps=FPS, - policy=policy, - preprocessor=preprocessor, # Pass the pre and post policy processors - postprocessor=postprocessor, - dataset=dataset, - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=make_default_teleop_action_processor(), - robot_action_processor=robot_ee_to_joints_processor, - robot_observation_processor=robot_joints_to_ee_pose_processor, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and ((episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]): - log_say("Reset the environment") + # Main record loop record_loop( robot=robot, events=events, fps=FPS, + policy=policy, + preprocessor=preprocessor, # Pass the pre and post policy processors + postprocessor=postprocessor, + dataset=dataset, control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, @@ -182,24 +168,41 @@ def main(): robot_observation_processor=robot_joints_to_ee_pose_processor, ) - if events["rerecord_episode"]: - log_say("Re-record episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + (episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=robot, + events=events, + fps=FPS, + control_time_s=EPISODE_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=make_default_teleop_action_processor(), + robot_action_processor=robot_ee_to_joints_processor, + robot_observation_processor=robot_joints_to_ee_pose_processor, + ) - # Save episode - dataset.save_episode() - episode_idx += 1 + if events["rerecord_episode"]: + log_say("Re-record episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - robot.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + episode_idx += 1 + finally: + # Clean up + log_say("Stop recording") + robot.disconnect() + listener.stop() - dataset.finalize() - dataset.push_to_hub() + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/so100_to_so100_EE/record.py b/examples/so100_to_so100_EE/record.py index eead7a9a8..d85a1c5cc 100644 --- a/examples/so100_to_so100_EE/record.py +++ b/examples/so100_to_so100_EE/record.py @@ -146,38 +146,23 @@ def main(): listener, events = init_keyboard_listener() init_rerun(session_name="recording_phone") - if not leader.is_connected or not follower.is_connected: - raise ValueError("Robot or teleop is not connected!") + try: + if not leader.is_connected or not follower.is_connected: + raise ValueError("Robot or teleop is not connected!") - print("Starting record loop...") - episode_idx = 0 - while episode_idx < NUM_EPISODES and not events["stop_recording"]: - log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") + print("Starting record loop...") + episode_idx = 0 + while episode_idx < NUM_EPISODES and not events["stop_recording"]: + log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}") - # Main record loop - record_loop( - robot=follower, - events=events, - fps=FPS, - teleop=leader, - dataset=dataset, - control_time_s=EPISODE_TIME_SEC, - single_task=TASK_DESCRIPTION, - display_data=True, - teleop_action_processor=leader_joints_to_ee, - robot_action_processor=ee_to_follower_joints, - robot_observation_processor=follower_joints_to_ee, - ) - - # Reset the environment if not stopping or re-recording - if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]): - log_say("Reset the environment") + # Main record loop record_loop( robot=follower, events=events, fps=FPS, teleop=leader, - control_time_s=RESET_TIME_SEC, + dataset=dataset, + control_time_s=EPISODE_TIME_SEC, single_task=TASK_DESCRIPTION, display_data=True, teleop_action_processor=leader_joints_to_ee, @@ -185,25 +170,44 @@ def main(): robot_observation_processor=follower_joints_to_ee, ) - if events["rerecord_episode"]: - log_say("Re-recording episode") - events["rerecord_episode"] = False - events["exit_early"] = False - dataset.clear_episode_buffer() - continue + # Reset the environment if not stopping or re-recording + if not events["stop_recording"] and ( + episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"] + ): + log_say("Reset the environment") + record_loop( + robot=follower, + events=events, + fps=FPS, + teleop=leader, + control_time_s=RESET_TIME_SEC, + single_task=TASK_DESCRIPTION, + display_data=True, + teleop_action_processor=leader_joints_to_ee, + robot_action_processor=ee_to_follower_joints, + robot_observation_processor=follower_joints_to_ee, + ) - # Save episode - dataset.save_episode() - episode_idx += 1 + if events["rerecord_episode"]: + log_say("Re-recording episode") + events["rerecord_episode"] = False + events["exit_early"] = False + dataset.clear_episode_buffer() + continue - # Clean up - log_say("Stop recording") - leader.disconnect() - follower.disconnect() - listener.stop() + # Save episode + dataset.save_episode() + episode_idx += 1 - dataset.finalize() - dataset.push_to_hub() + finally: + # Clean up + log_say("Stop recording") + leader.disconnect() + follower.disconnect() + listener.stop() + + dataset.finalize() + dataset.push_to_hub() if __name__ == "__main__": diff --git a/examples/so100_to_so100_EE/replay.py b/examples/so100_to_so100_EE/replay.py index 7d35a7b44..47a2f6635 100644 --- a/examples/so100_to_so100_EE/replay.py +++ b/examples/so100_to_so100_EE/replay.py @@ -74,32 +74,35 @@ def main(): # Connect to the robot robot.connect() - if not robot.is_connected: - raise ValueError("Robot is not connected!") + try: + if not robot.is_connected: + raise ValueError("Robot is not connected!") - print("Starting replay loop...") - log_say(f"Replaying episode {EPISODE_IDX}") - for idx in range(len(episode_frames)): - t0 = time.perf_counter() + print("Starting replay loop...") + log_say(f"Replaying episode {EPISODE_IDX}") + for idx in range(len(episode_frames)): + t0 = time.perf_counter() - # Get recorded action from dataset - ee_action = { - name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"]) - } + # Get recorded action from dataset + ee_action = { + name: float(actions[idx][ACTION][i]) + for i, name in enumerate(dataset.features[ACTION]["names"]) + } - # Get robot observation - robot_obs = robot.get_observation() + # Get robot observation + robot_obs = robot.get_observation() - # Dataset EE -> robot joints - joint_action = robot_ee_to_joints_processor((ee_action, robot_obs)) + # Dataset EE -> robot joints + joint_action = robot_ee_to_joints_processor((ee_action, robot_obs)) - # Send action to robot - _ = robot.send_action(joint_action) + # Send action to robot + _ = robot.send_action(joint_action) - precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) + precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0)) - # Clean up - robot.disconnect() + finally: + # Clean up + robot.disconnect() if __name__ == "__main__": diff --git a/src/lerobot/cameras/camera.py b/src/lerobot/cameras/camera.py index bfdb571a7..2894e0215 100644 --- a/src/lerobot/cameras/camera.py +++ b/src/lerobot/cameras/camera.py @@ -15,11 +15,12 @@ # limitations under the License. import abc +import warnings from typing import Any from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing -from .configs import CameraConfig, ColorMode +from .configs import CameraConfig class Camera(abc.ABC): @@ -30,20 +31,12 @@ class Camera(abc.ABC): Manages basic camera properties (FPS, resolution) and core operations: - Connection/disconnection - - Frame capture (sync/async) + - Frame capture (sync/async/latest) Attributes: fps (int | None): Configured frames per second width (int | None): Frame width in pixels height (int | None): Frame height in pixels - - Example: - class MyCamera(Camera): - def __init__(self, config): ... - @property - def is_connected(self) -> bool: ... - def connect(self, warmup=True): ... - # Plus other required methods """ def __init__(self, config: CameraConfig): @@ -56,6 +49,32 @@ class Camera(abc.ABC): self.width: int | None = config.width self.height: int | None = config.height + def __enter__(self): + """ + Context manager entry. + Automatically connects to the camera. + """ + self.connect() + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + """ + Context manager exit. + Automatically disconnects, ensuring resources are released even on error. + """ + self.disconnect() + + def __del__(self) -> None: + """ + Destructor safety net. + Attempts to disconnect if the object is garbage collected without cleanup. + """ + try: + if self.is_connected: + self.disconnect() + except Exception: # nosec B110 + pass + @property @abc.abstractmethod def is_connected(self) -> bool: @@ -89,12 +108,10 @@ class Camera(abc.ABC): pass @abc.abstractmethod - def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]: - """Capture and return a single frame from the camera. + def read(self) -> NDArray[Any]: + """Capture and return a single frame from the camera synchronously. - Args: - color_mode: Desired color mode for the output frame. If None, - uses the camera's default color mode. + This is a blocking call that will wait for the hardware and its SDK. Returns: np.ndarray: Captured frame as a numpy array. @@ -103,17 +120,64 @@ class Camera(abc.ABC): @abc.abstractmethod def async_read(self, timeout_ms: float = ...) -> NDArray[Any]: - """Asynchronously capture and return a single frame from the camera. + """Return the most recent new frame. + + This method retrieves the latest frame captured by the background thread. + If a new frame is already available in the buffer (captured since the last call), + it returns it immediately. + + It blocks up to `timeout_ms` only if the buffer is empty or if the latest frame + was already consumed by a previous `async_read` call. + + Essentially, this method return the latest unconsumed frame, waiting if necessary + for a new one to arrive within the specified timeout. + + Usage: + - Ideal for control loops where you want to ensure every processed frame + is fresh, effectively synchronizing your loop to the camera's FPS. + - Causes of a timeout usually include: very low camera FPS, heavy processing load, + or if the camera is disconnected. Args: - timeout_ms: Maximum time to wait for a frame in milliseconds. - Defaults to implementation-specific timeout. + timeout_ms: Maximum time to wait for a new frame in milliseconds. + Defaults to 200ms (0.2s). Returns: np.ndarray: Captured frame as a numpy array. + + Raises: + TimeoutError: If no new frame arrives within `timeout_ms`. """ pass + def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]: + """Return the most recent frame captured immediately (Peeking). + + This method is non-blocking and returns whatever is currently in the + memory buffer. The frame may be stale, + meaning it could have been captured a while ago (hanging camera scenario e.g.). + + Usage: + Ideal for scenarios requiring zero latency or decoupled frequencies & when + we want a guaranteed frame, such as UI visualization, logging, or + non-critical monitoring. + + Returns: + NDArray[Any]: The frame image (numpy array). + + Raises: + TimeoutError: If the latest frame is older than `max_age_ms`. + NotConnectedError: If the camera is not connected. + RuntimeError: If the camera is connected but has not captured any frames yet. + """ + warnings.warn( + f"{self.__class__.__name__}.read_latest() is not implemented. " + "Please override read_latest(); it will be required in future releases.", + FutureWarning, + stacklevel=2, + ) + return self.async_read() + @abc.abstractmethod def disconnect(self) -> None: """Disconnect from the camera and release resources.""" diff --git a/src/lerobot/cameras/opencv/camera_opencv.py b/src/lerobot/cameras/opencv/camera_opencv.py index b1043ba64..d581e1425 100644 --- a/src/lerobot/cameras/opencv/camera_opencv.py +++ b/src/lerobot/cameras/opencv/camera_opencv.py @@ -70,34 +70,24 @@ class OpenCVCamera(Camera): Example: ```python from lerobot.cameras.opencv import OpenCVCamera - from lerobot.cameras.configuration_opencv import OpenCVCameraConfig, ColorMode, Cv2Rotation + from lerobot.cameras.configuration_opencv import OpenCVCameraConfig # Basic usage with camera index 0 config = OpenCVCameraConfig(index_or_path=0) camera = OpenCVCamera(config) camera.connect() - # Read 1 frame synchronously + # Read 1 frame synchronously (blocking) color_image = camera.read() - print(color_image.shape) - # Read 1 frame asynchronously + # Read 1 frame asynchronously (waits for new frame with a timeout) async_image = camera.async_read() + # Get the latest frame immediately (no wait, returns timestamp) + latest_image, timestamp = camera.read_latest() + # When done, properly disconnect the camera using camera.disconnect() - - # Example with custom settings - custom_config = OpenCVCameraConfig( - index_or_path='/dev/video0', # Or use an index - fps=30, - width=1280, - height=720, - color_mode=ColorMode.RGB, - rotation=Cv2Rotation.ROTATE_90 - ) - custom_camera = OpenCVCamera(custom_config) - # ... connect, read, disconnect ... ``` """ @@ -123,6 +113,7 @@ class OpenCVCamera(Camera): self.stop_event: Event | None = None self.frame_lock: Lock = Lock() self.latest_frame: NDArray[Any] | None = None + self.latest_timestamp: float | None = None self.new_frame_event: Event = Event() self.rotation: int | None = get_cv2_rotation(config.rotation) @@ -146,12 +137,16 @@ class OpenCVCamera(Camera): Connects to the OpenCV camera specified in the configuration. Initializes the OpenCV VideoCapture object, sets desired camera properties - (FPS, width, height), and performs initial checks. + (FPS, width, height), starts the background reading thread and performs initial checks. + + Args: + warmup (bool): If True, waits at connect() time until at least one valid frame + has been captured by the background thread. Defaults to True. Raises: DeviceAlreadyConnectedError: If the camera is already connected. - ConnectionError: If the specified camera index/path is not found or the camera is found but fails to open. - RuntimeError: If the camera opens but fails to apply requested FPS/resolution settings. + ConnectionError: If the specified camera index/path is not found or fails to open. + RuntimeError: If the camera opens but fails to apply requested settings. """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} is already connected.") @@ -170,12 +165,16 @@ class OpenCVCamera(Camera): ) self._configure_capture_settings() + self._start_read_thread() - if warmup: + if warmup and self.warmup_s > 0: start_time = time.time() while time.time() - start_time < self.warmup_s: - self.read() + self.async_read(timeout_ms=self.warmup_s * 1000) time.sleep(0.1) + with self.frame_lock: + if self.latest_frame is None: + raise ConnectionError(f"{self} failed to capture frames during warmup.") logger.info(f"{self} connected.") @@ -196,8 +195,7 @@ class OpenCVCamera(Camera): Raises: RuntimeError: If the camera fails to set any of the specified properties to the requested value. - DeviceNotConnectedError: If the camera is not connected when attempting - to configure settings. + DeviceNotConnectedError: If the camera is not connected. """ if not self.is_connected: raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.") @@ -339,6 +337,17 @@ class OpenCVCamera(Camera): return found_cameras_info + def _read_from_hardware(self) -> NDArray[Any]: + if self.videocapture is None: + raise DeviceNotConnectedError(f"{self} videocapture is not initialized") + + ret, frame = self.videocapture.read() + + if not ret: + raise RuntimeError(f"{self} read failed (status={ret}).") + + return frame + def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]: """ Reads a single frame synchronously from the camera. @@ -346,11 +355,6 @@ class OpenCVCamera(Camera): This is a blocking call. It waits for the next available frame from the camera hardware via OpenCV. - Args: - color_mode (Optional[ColorMode]): If specified, overrides the default - color mode (`self.color_mode`) for this read operation (e.g., - request RGB even if default is BGR). - Returns: np.ndarray: The captured frame as a NumPy array in the format (height, width, channels), using the specified or default @@ -362,34 +366,34 @@ class OpenCVCamera(Camera): received frame dimensions don't match expectations before rotation. ValueError: If an invalid `color_mode` is requested. """ - if not self.is_connected: - raise DeviceNotConnectedError(f"{self} is not connected.") start_time = time.perf_counter() - if self.videocapture is None: - raise DeviceNotConnectedError(f"{self} videocapture is not initialized") + if color_mode is not None: + logger.warning( + f"{self} read() color_mode parameter is deprecated and will be removed in future versions." + ) - ret, frame = self.videocapture.read() + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") - if not ret or frame is None: - raise RuntimeError(f"{self} read failed (status={ret}).") + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") - processed_frame = self._postprocess_image(frame, color_mode) + self.new_frame_event.clear() + frame = self.async_read(timeout_ms=10000) read_duration_ms = (time.perf_counter() - start_time) * 1e3 logger.debug(f"{self} read took: {read_duration_ms:.1f}ms") - return processed_frame + return frame - def _postprocess_image(self, image: NDArray[Any], color_mode: ColorMode | None = None) -> NDArray[Any]: + def _postprocess_image(self, image: NDArray[Any]) -> NDArray[Any]: """ Applies color conversion, dimension validation, and rotation to a raw frame. Args: image (np.ndarray): The raw image frame (expected BGR format from OpenCV). - color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None, - uses the instance's default `self.color_mode`. Returns: np.ndarray: The processed image frame. @@ -399,11 +403,10 @@ class OpenCVCamera(Camera): RuntimeError: If the raw frame dimensions do not match the configured `width` and `height`. """ - requested_color_mode = self.color_mode if color_mode is None else color_mode - if requested_color_mode not in (ColorMode.RGB, ColorMode.BGR): + if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): raise ValueError( - f"Invalid color mode '{requested_color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." + f"Invalid color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." ) h, w, c = image.shape @@ -417,7 +420,7 @@ class OpenCVCamera(Camera): raise RuntimeError(f"{self} frame channels={c} do not match expected 3 channels (RGB/BGR).") processed_image = image - if requested_color_mode == ColorMode.RGB: + if self.color_mode == ColorMode.RGB: processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE, cv2.ROTATE_180]: @@ -431,7 +434,7 @@ class OpenCVCamera(Camera): On each iteration: 1. Reads a color frame - 2. Stores result in latest_frame (thread-safe) + 2. Stores result in latest_frame and updates timestamp (thread-safe) 3. Sets new_frame_event to notify listeners Stops on DeviceNotConnectedError, logs other errors and continues. @@ -439,30 +442,37 @@ class OpenCVCamera(Camera): if self.stop_event is None: raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.") + failure_count = 0 while not self.stop_event.is_set(): try: - color_image = self.read() + raw_frame = self._read_from_hardware() + processed_frame = self._postprocess_image(raw_frame) + capture_time = time.perf_counter() with self.frame_lock: - self.latest_frame = color_image + self.latest_frame = processed_frame + self.latest_timestamp = capture_time self.new_frame_event.set() + failure_count = 0 except DeviceNotConnectedError: break except Exception as e: - logger.warning(f"Error reading frame in background thread for {self}: {e}") + if failure_count <= 10: + failure_count += 1 + logger.warning(f"Error reading frame in background thread for {self}: {e}") + else: + raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e def _start_read_thread(self) -> None: """Starts or restarts the background read thread if it's not running.""" - if self.thread is not None and self.thread.is_alive(): - self.thread.join(timeout=0.1) - if self.stop_event is not None: - self.stop_event.set() + self._stop_read_thread() self.stop_event = Event() self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop") self.thread.daemon = True self.thread.start() + time.sleep(0.1) def _stop_read_thread(self) -> None: """Signals the background read thread to stop and waits for it to join.""" @@ -475,6 +485,11 @@ class OpenCVCamera(Camera): self.thread = None self.stop_event = None + with self.frame_lock: + self.latest_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: """ Reads the latest available frame asynchronously. @@ -482,6 +497,7 @@ class OpenCVCamera(Camera): This method retrieves the most recent frame captured by the background read thread. It does not block waiting for the camera hardware directly, but may wait up to timeout_ms for the background thread to provide a frame. + It is “best effort” under high FPS. Args: timeout_ms (float): Maximum time in milliseconds to wait for a frame @@ -500,13 +516,12 @@ class OpenCVCamera(Camera): raise DeviceNotConnectedError(f"{self} is not connected.") if self.thread is None or not self.thread.is_alive(): - self._start_read_thread() + raise RuntimeError(f"{self} read thread is not running.") if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): - thread_alive = self.thread is not None and self.thread.is_alive() raise TimeoutError( f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. " - f"Read thread alive: {thread_alive}." + f"Read thread alive: {self.thread.is_alive()}." ) with self.frame_lock: @@ -518,6 +533,42 @@ class OpenCVCamera(Camera): return frame + def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]: + """Return the most recent frame captured immediately (Peeking). + + This method is non-blocking and returns whatever is currently in the + memory buffer. The frame may be stale, + meaning it could have been captured a while ago (hanging camera scenario e.g.). + + Returns: + NDArray[Any]: The frame image (numpy array). + + Raises: + TimeoutError: If the latest frame is older than `max_age_ms`. + DeviceNotConnectedError: If the camera is not connected. + RuntimeError: If the camera is connected but has not captured any frames yet. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") + + with self.frame_lock: + frame = self.latest_frame + timestamp = self.latest_timestamp + + if frame is None or timestamp is None: + raise RuntimeError(f"{self} has not captured any frames yet.") + + age_ms = (time.perf_counter() - timestamp) * 1e3 + if age_ms > max_age_ms: + raise TimeoutError( + f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)." + ) + + return frame + def disconnect(self) -> None: """ Disconnects from the camera and cleans up resources. @@ -538,4 +589,9 @@ class OpenCVCamera(Camera): self.videocapture.release() self.videocapture = None + with self.frame_lock: + self.latest_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/cameras/reachy2_camera/reachy2_camera.py b/src/lerobot/cameras/reachy2_camera/reachy2_camera.py index c8916c5ee..5cede466d 100644 --- a/src/lerobot/cameras/reachy2_camera/reachy2_camera.py +++ b/src/lerobot/cameras/reachy2_camera/reachy2_camera.py @@ -80,6 +80,8 @@ class Reachy2Camera(Camera): self.config = config self.color_mode = config.color_mode + self.latest_frame: NDArray[Any] | None = None + self.latest_timestamp: float | None = None self.cam_manager: CameraManager | None = None @@ -125,12 +127,7 @@ class Reachy2Camera(Camera): """ Reads a single frame synchronously from the camera. - This is a blocking call. - - Args: - color_mode (Optional[ColorMode]): If specified, overrides the default - color mode (`self.color_mode`) for this read operation (e.g., - request RGB even if default is BGR). + This method retrieves the most recent frame available in Reachy 2's low-level software. Returns: np.ndarray: The captured frame as a NumPy array in the format @@ -145,6 +142,11 @@ class Reachy2Camera(Camera): if self.cam_manager is None: raise DeviceNotConnectedError(f"{self} is not connected.") + if color_mode is not None: + logger.warning( + f"{self} read() color_mode parameter is deprecated and will be removed in future versions." + ) + frame: NDArray[Any] = np.empty((0, 0, 3), dtype=np.uint8) if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"): @@ -165,11 +167,18 @@ class Reachy2Camera(Camera): raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.") if frame is None: - return np.empty((0, 0, 3), dtype=np.uint8) + raise RuntimeError(f"Internal error: No frame available for {self}.") - if self.config.color_mode == "rgb": + if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): + raise ValueError( + f"Invalid color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." + ) + if self.color_mode == ColorMode.RGB: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + self.latest_frame = frame + self.latest_timestamp = time.perf_counter() + read_duration_ms = (time.perf_counter() - start_time) * 1e3 logger.debug(f"{self} read took: {read_duration_ms:.1f}ms") @@ -177,13 +186,7 @@ class Reachy2Camera(Camera): def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: """ - Reads the latest available frame. - - This method retrieves the most recent frame available in Reachy 2's low-level software. - - Args: - timeout_ms (float): Maximum time in milliseconds to wait for a frame - to become available. Defaults to 200ms (0.2 seconds). + Same as read() Returns: np.ndarray: The latest captured frame as a NumPy array in the format @@ -197,12 +200,38 @@ class Reachy2Camera(Camera): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - frame = self.read() + return self.read() - if frame is None: - raise RuntimeError(f"Internal error: No frame available for {self}.") + def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]: + """Return the most recent frame captured immediately (Peeking). - return frame + This method is non-blocking and returns whatever is currently in the + memory buffer. The frame may be stale, + meaning it could have been captured a while ago (hanging camera scenario e.g.). + + Returns: + tuple[NDArray, float]: + - The frame image (numpy array). + - The timestamp (time.perf_counter) when this frame was captured. + + Raises: + TimeoutError: If the latest frame is older than `max_age_ms`. + DeviceNotConnectedError: If the camera is not connected. + RuntimeError: If the camera is connected but has not captured any frames yet. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + if self.latest_frame is None or self.latest_timestamp is None: + raise RuntimeError(f"{self} has not captured any frames yet.") + + age_ms = (time.perf_counter() - self.latest_timestamp) * 1e3 + if age_ms > max_age_ms: + raise TimeoutError( + f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)." + ) + + return self.latest_frame def disconnect(self) -> None: """ diff --git a/src/lerobot/cameras/realsense/camera_realsense.py b/src/lerobot/cameras/realsense/camera_realsense.py index f2906fdd8..e47f25381 100644 --- a/src/lerobot/cameras/realsense/camera_realsense.py +++ b/src/lerobot/cameras/realsense/camera_realsense.py @@ -72,15 +72,14 @@ class RealSenseCamera(Camera): camera = RealSenseCamera(config) camera.connect() - # Read 1 frame synchronously + # Read 1 frame synchronously (blocking) color_image = camera.read() - print(color_image.shape) - # Read 1 frame asynchronously + # Read 1 frame asynchronously (waits for new frame with a timeout) async_image = camera.async_read() - # When done, properly disconnect the camera using - camera.disconnect() + # Get the latest frame immediately (no wait, returns timestamp) + latest_image, timestamp = camera.read_latest() # Example with depth capture and custom settings custom_config = RealSenseCameraConfig( @@ -133,7 +132,9 @@ class RealSenseCamera(Camera): self.thread: Thread | None = None self.stop_event: Event | None = None self.frame_lock: Lock = Lock() - self.latest_frame: NDArray[Any] | None = None + self.latest_color_frame: NDArray[Any] | None = None + self.latest_depth_frame: NDArray[Any] | None = None + self.latest_timestamp: float | None = None self.new_frame_event: Event = Event() self.rotation: int | None = get_cv2_rotation(config.rotation) @@ -158,6 +159,10 @@ class RealSenseCamera(Camera): Initializes the RealSense pipeline, configures the required streams (color and optionally depth), starts the pipeline, and validates the actual stream settings. + Args: + warmup (bool): If True, waits at connect() time until at least one valid frame + has been captured by the background thread. Defaults to True. + Raises: DeviceAlreadyConnectedError: If the camera is already connected. ValueError: If the configuration is invalid (e.g., missing serial/name, name not unique). @@ -181,15 +186,18 @@ class RealSenseCamera(Camera): ) from e self._configure_capture_settings() + self._start_read_thread() - if warmup: - time.sleep( - 1 - ) # NOTE(Steven): RS cameras need a bit of time to warm up before the first read. If we don't wait, the first read from the warmup will raise. - start_time = time.time() - while time.time() - start_time < self.warmup_s: - self.read() - time.sleep(0.1) + # NOTE(Steven/Caroline): Enforcing at least one second of warmup as RS cameras need a bit of time before the first read. If we don't wait, the first read from the warmup will raise. + self.warmup_s = max(self.warmup_s, 1) + + start_time = time.time() + while time.time() - start_time < self.warmup_s: + self.async_read(timeout_ms=self.warmup_s * 1000) + time.sleep(0.1) + with self.frame_lock: + if self.latest_color_frame is None or self.use_depth and self.latest_depth_frame is None: + raise ConnectionError(f"{self} failed to capture frames during warmup.") logger.info(f"{self} connected.") @@ -319,9 +327,6 @@ class RealSenseCamera(Camera): This is a blocking call. It waits for a coherent set of frames (depth) from the camera hardware via the RealSense pipeline. - Args: - timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 200ms. - Returns: np.ndarray: The depth map as a NumPy array (height, width) of type `np.uint16` (raw depth values in millimeters) and rotation. @@ -330,44 +335,52 @@ class RealSenseCamera(Camera): DeviceNotConnectedError: If the camera is not connected. RuntimeError: If reading frames from the pipeline fails or frames are invalid. """ + if timeout_ms: + logger.warning( + f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions." + ) - if not self.is_connected: - raise DeviceNotConnectedError(f"{self} is not connected.") if not self.use_depth: raise RuntimeError( f"Failed to capture depth frame '.read_depth()'. Depth stream is not enabled for {self}." ) - start_time = time.perf_counter() + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") + + self.new_frame_event.clear() + + _ = self.async_read(timeout_ms=10000) + + with self.frame_lock: + depth_map = self.latest_depth_frame + + if depth_map is None: + raise RuntimeError("No depth frame available. Ensure camera is streaming.") + + return depth_map + + def _read_from_hardware(self): if self.rs_pipeline is None: raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.") - ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms) + ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=10000) if not ret or frame is None: - raise RuntimeError(f"{self} read_depth failed (status={ret}).") + raise RuntimeError(f"{self} read failed (status={ret}).") - depth_frame = frame.get_depth_frame() - depth_map = np.asanyarray(depth_frame.get_data()) + return frame - depth_map_processed = self._postprocess_image(depth_map, depth_frame=True) - - read_duration_ms = (time.perf_counter() - start_time) * 1e3 - logger.debug(f"{self} read took: {read_duration_ms:.1f}ms") - - return depth_map_processed - - def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 200) -> NDArray[Any]: + def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 0) -> NDArray[Any]: """ Reads a single frame (color) synchronously from the camera. This is a blocking call. It waits for a coherent set of frames (color) from the camera hardware via the RealSense pipeline. - Args: - timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 200ms. - Returns: np.ndarray: The captured color frame as a NumPy array (height, width, channels), processed according to `color_mode` and rotation. @@ -378,39 +391,39 @@ class RealSenseCamera(Camera): ValueError: If an invalid `color_mode` is requested. """ + start_time = time.perf_counter() + + if color_mode is not None: + logger.warning( + f"{self} read() color_mode parameter is deprecated and will be removed in future versions." + ) + + if timeout_ms: + logger.warning( + f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions." + ) + if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - start_time = time.perf_counter() + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") - if self.rs_pipeline is None: - raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.") + self.new_frame_event.clear() - ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms) - - if not ret or frame is None: - raise RuntimeError(f"{self} read failed (status={ret}).") - - color_frame = frame.get_color_frame() - color_image_raw = np.asanyarray(color_frame.get_data()) - - color_image_processed = self._postprocess_image(color_image_raw, color_mode) + frame = self.async_read(timeout_ms=10000) read_duration_ms = (time.perf_counter() - start_time) * 1e3 logger.debug(f"{self} read took: {read_duration_ms:.1f}ms") - return color_image_processed + return frame - def _postprocess_image( - self, image: NDArray[Any], color_mode: ColorMode | None = None, depth_frame: bool = False - ) -> NDArray[Any]: + def _postprocess_image(self, image: NDArray[Any], depth_frame: bool = False) -> NDArray[Any]: """ Applies color conversion, dimension validation, and rotation to a raw color frame. Args: image (np.ndarray): The raw image frame (expected RGB format from RealSense). - color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None, - uses the instance's default `self.color_mode`. Returns: np.ndarray: The processed image frame according to `self.color_mode` and `self.rotation`. @@ -421,9 +434,9 @@ class RealSenseCamera(Camera): `width` and `height`. """ - if color_mode and color_mode not in (ColorMode.RGB, ColorMode.BGR): + if self.color_mode and self.color_mode not in (ColorMode.RGB, ColorMode.BGR): raise ValueError( - f"Invalid requested color mode '{color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." + f"Invalid requested color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." ) if depth_frame: @@ -454,7 +467,7 @@ class RealSenseCamera(Camera): On each iteration: 1. Reads a color frame with 500ms timeout - 2. Stores result in latest_frame (thread-safe) + 2. Stores result in latest_frame and updates timestamp (thread-safe) 3. Sets new_frame_event to notify listeners Stops on DeviceNotConnectedError, logs other errors and continues. @@ -462,25 +475,41 @@ class RealSenseCamera(Camera): if self.stop_event is None: raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.") + failure_count = 0 while not self.stop_event.is_set(): try: - color_image = self.read(timeout_ms=500) + frame = self._read_from_hardware() + color_frame_raw = frame.get_color_frame() + color_frame = np.asanyarray(color_frame_raw.get_data()) + processed_color_frame = self._postprocess_image(color_frame) + + if self.use_depth: + depth_frame_raw = frame.get_depth_frame() + depth_frame = np.asanyarray(depth_frame_raw.get_data()) + processed_depth_frame = self._postprocess_image(depth_frame, depth_frame=True) + + capture_time = time.perf_counter() with self.frame_lock: - self.latest_frame = color_image + self.latest_color_frame = processed_color_frame + if self.use_depth: + self.latest_depth_frame = processed_depth_frame + self.latest_timestamp = capture_time self.new_frame_event.set() + failure_count = 0 except DeviceNotConnectedError: break except Exception as e: - logger.warning(f"Error reading frame in background thread for {self}: {e}") + if failure_count <= 10: + failure_count += 1 + logger.warning(f"Error reading frame in background thread for {self}: {e}") + else: + raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e def _start_read_thread(self) -> None: """Starts or restarts the background read thread if it's not running.""" - if self.thread is not None and self.thread.is_alive(): - self.thread.join(timeout=0.1) - if self.stop_event is not None: - self.stop_event.set() + self._stop_read_thread() self.stop_event = Event() self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop") @@ -498,6 +527,12 @@ class RealSenseCamera(Camera): self.thread = None self.stop_event = None + with self.frame_lock: + self.latest_color_frame = None + self.latest_depth_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + # NOTE(Steven): Missing implementation for depth for now def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: """ @@ -506,6 +541,7 @@ class RealSenseCamera(Camera): This method retrieves the most recent color frame captured by the background read thread. It does not block waiting for the camera hardware directly, but may wait up to timeout_ms for the background thread to provide a frame. + It is “best effort” under high FPS. Args: timeout_ms (float): Maximum time in milliseconds to wait for a frame @@ -524,17 +560,16 @@ class RealSenseCamera(Camera): raise DeviceNotConnectedError(f"{self} is not connected.") if self.thread is None or not self.thread.is_alive(): - self._start_read_thread() + raise RuntimeError(f"{self} read thread is not running.") if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): - thread_alive = self.thread is not None and self.thread.is_alive() raise TimeoutError( f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. " - f"Read thread alive: {thread_alive}." + f"Read thread alive: {self.thread.is_alive()}." ) with self.frame_lock: - frame = self.latest_frame + frame = self.latest_color_frame self.new_frame_event.clear() if frame is None: @@ -542,6 +577,43 @@ class RealSenseCamera(Camera): return frame + # NOTE(Steven): Missing implementation for depth for now + def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]: + """Return the most recent (color) frame captured immediately (Peeking). + + This method is non-blocking and returns whatever is currently in the + memory buffer. The frame may be stale, + meaning it could have been captured a while ago (hanging camera scenario e.g.). + + Returns: + NDArray[Any]: The frame image (numpy array). + + Raises: + TimeoutError: If the latest frame is older than `max_age_ms`. + DeviceNotConnectedError: If the camera is not connected. + RuntimeError: If the camera is connected but has not captured any frames yet. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") + + with self.frame_lock: + frame = self.latest_color_frame + timestamp = self.latest_timestamp + + if frame is None or timestamp is None: + raise RuntimeError(f"{self} has not captured any frames yet.") + + age_ms = (time.perf_counter() - timestamp) * 1e3 + if age_ms > max_age_ms: + raise TimeoutError( + f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)." + ) + + return frame + def disconnect(self) -> None: """ Disconnects from the camera, stops the pipeline, and cleans up resources. @@ -565,4 +637,10 @@ class RealSenseCamera(Camera): self.rs_pipeline = None self.rs_profile = None + with self.frame_lock: + self.latest_color_frame = None + self.latest_depth_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/cameras/zmq/camera_zmq.py b/src/lerobot/cameras/zmq/camera_zmq.py index 1a4155f4b..a231a582a 100644 --- a/src/lerobot/cameras/zmq/camera_zmq.py +++ b/src/lerobot/cameras/zmq/camera_zmq.py @@ -45,6 +45,12 @@ logger = logging.getLogger(__name__) class ZMQCamera(Camera): """ + Manages camera interactions via ZeroMQ for receiving frames from a remote server. + + This class connects to a ZMQ Publisher, subscribes to frame topics, and decodes + incoming JSON messages containing Base64 encoded images. It supports both + synchronous and asynchronous frame reading patterns. + Example usage: ```python from lerobot.cameras.zmq import ZMQCamera, ZMQCameraConfig @@ -52,7 +58,16 @@ class ZMQCamera(Camera): config = ZMQCameraConfig(server_address="192.168.123.164", port=5555, camera_name="head_camera") camera = ZMQCamera(config) camera.connect() - frame = camera.read() + + # Read 1 frame synchronously (blocking) + color_image = camera.read() + + # Read 1 frame asynchronously (waits for new frame with a timeout) + async_image = camera.async_read() + + # Get the latest frame immediately (no wait, returns timestamp) + latest_image, timestamp = camera.read_latest() + camera.disconnect() ``` """ @@ -68,14 +83,17 @@ class ZMQCamera(Camera): self.color_mode = config.color_mode self.timeout_ms = config.timeout_ms + # ZMQ Context and Socket self.context: zmq.Context | None = None self.socket: zmq.Socket | None = None self._connected = False + # Threading resources self.thread: Thread | None = None self.stop_event: Event | None = None self.frame_lock: Lock = Lock() self.latest_frame: NDArray[Any] | None = None + self.latest_timestamp: float | None = None self.new_frame_event: Event = Event() def __str__(self) -> str: @@ -83,10 +101,16 @@ class ZMQCamera(Camera): @property def is_connected(self) -> bool: + """Checks if the ZMQ socket is initialized and connected.""" return self._connected and self.context is not None and self.socket is not None def connect(self, warmup: bool = True) -> None: - """Connect to ZMQ camera server.""" + """Connect to ZMQ camera server. + + Args: + warmup (bool): If True, waits for the camera to provide at least one + valid frame before returning. Defaults to True. + """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} is already connected.") @@ -103,17 +127,28 @@ class ZMQCamera(Camera): self.socket.connect(f"tcp://{self.server_address}:{self.port}") self._connected = True - # Auto-detect resolution + # Auto-detect resolution if not provided if self.width is None or self.height is None: - h, w = self.read().shape[:2] + # Read directly from hardware because the thread isn't running yet + temp_frame = self._read_from_hardware() + h, w = temp_frame.shape[:2] self.height = h self.width = w - logger.info(f"{self} resolution: {w}x{h}") + logger.info(f"{self} resolution detected: {w}x{h}") + self._start_read_thread() logger.info(f"{self} connected.") if warmup: - time.sleep(0.1) + # Ensure we have captured at least one frame via the thread + start_time = time.time() + while time.time() - start_time < (self.config.warmup_s): # Wait a bit more than timeout + self.async_read(timeout_ms=self.config.warmup_s * 1000) + time.sleep(0.1) + + with self.frame_lock: + if self.latest_frame is None: + raise ConnectionError(f"{self} failed to capture frames during warmup.") except Exception as e: self._cleanup() @@ -134,12 +169,9 @@ class ZMQCamera(Camera): """ZMQ cameras require manual configuration (server address/port).""" return [] - def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]: + def _read_from_hardware(self) -> NDArray[Any]: """ - Read a single frame from the ZMQ camera. - - Returns: - np.ndarray: Decoded frame (height, width, 3) + Reads a single frame directly from the ZMQ socket. """ if not self.is_connected or self.socket is None: raise DeviceNotConnectedError(f"{self} is not connected.") @@ -147,6 +179,7 @@ class ZMQCamera(Camera): try: message = self.socket.recv_string() except Exception as e: + # Check for ZMQ timeout (EAGAIN/Again) without requiring global zmq import if type(e).__name__ == "Again": raise TimeoutError(f"{self} timeout after {self.timeout_ms}ms") from e raise @@ -176,42 +209,117 @@ class ZMQCamera(Camera): return frame - def _read_loop(self) -> None: - while self.stop_event and not self.stop_event.is_set(): - try: - frame = self.read() - with self.frame_lock: - self.latest_frame = frame - self.new_frame_event.set() - except DeviceNotConnectedError: - break - except TimeoutError: - pass - except Exception as e: - logger.warning(f"Read error: {e}") + def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]: + """ + Reads a single frame synchronously from the camera. - def _start_read_thread(self) -> None: - if self.thread and self.thread.is_alive(): - return - self.stop_event = Event() - self.thread = Thread(target=self._read_loop, daemon=True) - self.thread.start() + This is a blocking call. It waits for the next available frame from the + camera background thread. - def _stop_read_thread(self) -> None: - if self.stop_event: - self.stop_event.set() - if self.thread and self.thread.is_alive(): - self.thread.join(timeout=2.0) - self.thread = None - self.stop_event = None + Returns: + np.ndarray: Decoded frame (height, width, 3) + """ + start_time = time.perf_counter() + + if color_mode is not None: + logger.warning( + f"{self} read() color_mode parameter is deprecated and will be removed in future versions." + ) - def async_read(self, timeout_ms: float = 10000) -> NDArray[Any]: - """Read latest frame asynchronously (non-blocking).""" if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - if not self.thread or not self.thread.is_alive(): - self._start_read_thread() + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") + + self.new_frame_event.clear() + frame = self.async_read(timeout_ms=10000) + + read_duration_ms = (time.perf_counter() - start_time) * 1e3 + logger.debug(f"{self} read took: {read_duration_ms:.1f}ms") + + return frame + + def _read_loop(self) -> None: + """ + Internal loop run by the background thread for asynchronous reading. + """ + if self.stop_event is None: + raise RuntimeError(f"{self}: stop_event is not initialized.") + + failure_count = 0 + while not self.stop_event.is_set(): + try: + frame = self._read_from_hardware() + capture_time = time.perf_counter() + + with self.frame_lock: + self.latest_frame = frame + self.latest_timestamp = capture_time + self.new_frame_event.set() + failure_count = 0 + + except DeviceNotConnectedError: + break + except (TimeoutError, Exception) as e: + if failure_count <= 10: + failure_count += 1 + logger.warning(f"Read error: {e}") + else: + raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e + + def _start_read_thread(self) -> None: + if self.stop_event is not None: + self.stop_event.set() + if self.thread is not None and self.thread.is_alive(): + self.thread.join(timeout=2.0) + + with self.frame_lock: + self.latest_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + + self.stop_event = Event() + self.thread = Thread(target=self._read_loop, daemon=True, name=f"{self}_read_loop") + self.thread.start() + time.sleep(0.1) + + def _stop_read_thread(self) -> None: + if self.stop_event is not None: + self.stop_event.set() + + if self.thread is not None and self.thread.is_alive(): + self.thread.join(timeout=2.0) + + self.thread = None + self.stop_event = None + + with self.frame_lock: + self.latest_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + + def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: + """ + Reads the latest available frame asynchronously. + + Args: + timeout_ms (float): Maximum time in milliseconds to wait for a frame + to become available. Defaults to 200ms. + + Returns: + np.ndarray: The latest captured frame. + + Raises: + DeviceNotConnectedError: If the camera is not connected. + TimeoutError: If no frame data becomes available within the specified timeout. + RuntimeError: If the background thread is not running. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): raise TimeoutError(f"{self} async_read timeout after {timeout_ms}ms") @@ -225,11 +333,55 @@ class ZMQCamera(Camera): return frame + def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]: + """Return the most recent frame captured immediately (Peeking). + + This method is non-blocking and returns whatever is currently in the + memory buffer. The frame may be stale, + meaning it could have been captured a while ago (hanging camera scenario e.g.). + + Returns: + NDArray[Any]: The frame image (numpy array). + + Raises: + TimeoutError: If the latest frame is older than `max_age_ms`. + DeviceNotConnectedError: If the camera is not connected. + RuntimeError: If the camera is connected but has not captured any frames yet. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + if self.thread is None or not self.thread.is_alive(): + raise RuntimeError(f"{self} read thread is not running.") + + with self.frame_lock: + frame = self.latest_frame + timestamp = self.latest_timestamp + + if frame is None or timestamp is None: + raise RuntimeError(f"{self} has not captured any frames yet.") + + age_ms = (time.perf_counter() - timestamp) * 1e3 + if age_ms > max_age_ms: + raise TimeoutError( + f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)." + ) + + return frame + def disconnect(self) -> None: """Disconnect from ZMQ camera.""" - if not self.is_connected and not self.thread: + if not self.is_connected and self.thread is None: raise DeviceNotConnectedError(f"{self} not connected.") - self._stop_read_thread() + if self.thread is not None: + self._stop_read_thread() + self._cleanup() + + with self.frame_lock: + self.latest_frame = None + self.latest_timestamp = None + self.new_frame_event.clear() + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/cameras/zmq/configuration_zmq.py b/src/lerobot/cameras/zmq/configuration_zmq.py index 027ae12b5..4e7732cfc 100644 --- a/src/lerobot/cameras/zmq/configuration_zmq.py +++ b/src/lerobot/cameras/zmq/configuration_zmq.py @@ -29,6 +29,7 @@ class ZMQCameraConfig(CameraConfig): camera_name: str = "zmq_camera" color_mode: ColorMode = ColorMode.RGB timeout_ms: int = 5000 + warmup_s: int = 1 def __post_init__(self) -> None: if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): diff --git a/src/lerobot/scripts/lerobot_calibrate.py b/src/lerobot/scripts/lerobot_calibrate.py index eb3df6872..1b30021dd 100644 --- a/src/lerobot/scripts/lerobot_calibrate.py +++ b/src/lerobot/scripts/lerobot_calibrate.py @@ -86,8 +86,11 @@ def calibrate(cfg: CalibrateConfig): device = make_teleoperator_from_config(cfg.device) device.connect(calibrate=False) - device.calibrate() - device.disconnect() + + try: + device.calibrate() + finally: + device.disconnect() def main(): diff --git a/src/lerobot/scripts/lerobot_replay.py b/src/lerobot/scripts/lerobot_replay.py index 5717dffb6..c9a559d07 100644 --- a/src/lerobot/scripts/lerobot_replay.py +++ b/src/lerobot/scripts/lerobot_replay.py @@ -110,25 +110,26 @@ def replay(cfg: ReplayConfig): robot.connect() - log_say("Replaying episode", cfg.play_sounds, blocking=True) - for idx in range(len(episode_frames)): - start_episode_t = time.perf_counter() + try: + log_say("Replaying episode", cfg.play_sounds, blocking=True) + for idx in range(len(episode_frames)): + start_episode_t = time.perf_counter() - action_array = actions[idx][ACTION] - action = {} - for i, name in enumerate(dataset.features[ACTION]["names"]): - action[name] = action_array[i] + action_array = actions[idx][ACTION] + action = {} + for i, name in enumerate(dataset.features[ACTION]["names"]): + action[name] = action_array[i] - robot_obs = robot.get_observation() + robot_obs = robot.get_observation() - processed_action = robot_action_processor((action, robot_obs)) + processed_action = robot_action_processor((action, robot_obs)) - _ = robot.send_action(processed_action) + _ = robot.send_action(processed_action) - dt_s = time.perf_counter() - start_episode_t - precise_sleep(max(1 / dataset.fps - dt_s, 0.0)) - - robot.disconnect() + dt_s = time.perf_counter() - start_episode_t + precise_sleep(max(1 / dataset.fps - dt_s, 0.0)) + finally: + robot.disconnect() def main(): diff --git a/tests/cameras/test_opencv.py b/tests/cameras/test_opencv.py index 3cf3793b6..feb700631 100644 --- a/tests/cameras/test_opencv.py +++ b/tests/cameras/test_opencv.py @@ -20,7 +20,9 @@ # ``` from pathlib import Path +from unittest.mock import patch +import cv2 import numpy as np import pytest @@ -28,6 +30,50 @@ from lerobot.cameras.configs import Cv2Rotation from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +RealVideoCapture = cv2.VideoCapture + + +class MockLoopingVideoCapture: + """ + Wraps the real OpenCV VideoCapture. + Motivation: cv2.VideoCapture(file.png) is only valid for one read. + Strategy: Read the file once & return the cached frame for subsequent reads. + Consequence: No recurrent I/O operations, but we keep the test artifacts simple. + """ + + def __init__(self, *args, **kwargs): + args_clean = [str(a) if isinstance(a, Path) else a for a in args] + self._real_vc = RealVideoCapture(*args_clean, **kwargs) + self._cached_frame = None + + def read(self): + ret, frame = self._real_vc.read() + + if ret: + self._cached_frame = frame + return ret, frame + + if not ret and self._cached_frame is not None: + return True, self._cached_frame.copy() + + return ret, frame + + def __getattr__(self, name): + return getattr(self._real_vc, name) + + +@pytest.fixture(autouse=True) +def patch_opencv_videocapture(): + """ + Automatically patches cv2.VideoCapture for all tests. + """ + module_path = OpenCVCamera.__module__ + target = f"{module_path}.cv2.VideoCapture" + + with patch(target, new=MockLoopingVideoCapture): + yield + + # NOTE(Steven): more tests + assertions? TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras" DEFAULT_PNG_FILE_PATH = TEST_ARTIFACTS_DIR / "image_160x120.png" @@ -43,25 +89,22 @@ def test_abc_implementation(): def test_connect(): - config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH) - camera = OpenCVCamera(config) + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) - camera.connect(warmup=False) - - assert camera.is_connected + with OpenCVCamera(config) as camera: + assert camera.is_connected def test_connect_already_connected(): - config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH) - camera = OpenCVCamera(config) - camera.connect(warmup=False) + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) - with pytest.raises(DeviceAlreadyConnectedError): - camera.connect(warmup=False) + with OpenCVCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError): + camera.connect() def test_connect_invalid_camera_path(): config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png") + camera = OpenCVCamera(config) with pytest.raises(ConnectionError): @@ -74,27 +117,25 @@ def test_invalid_width_connect(): width=99999, # Invalid width to trigger error height=480, ) - camera = OpenCVCamera(config) + camera = OpenCVCamera(config) with pytest.raises(RuntimeError): camera.connect(warmup=False) @pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES) def test_read(index_or_path): - config = OpenCVCameraConfig(index_or_path=index_or_path) - camera = OpenCVCamera(config) - camera.connect(warmup=False) + config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0) - img = camera.read() - - assert isinstance(img, np.ndarray) + with OpenCVCamera(config) as camera: + img = camera.read() + assert isinstance(img, np.ndarray) def test_read_before_connect(): config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH) - camera = OpenCVCamera(config) + camera = OpenCVCamera(config) with pytest.raises(DeviceNotConnectedError): _ = camera.read() @@ -119,32 +160,22 @@ def test_disconnect_before_connect(): @pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES) def test_async_read(index_or_path): - config = OpenCVCameraConfig(index_or_path=index_or_path) - camera = OpenCVCamera(config) - camera.connect(warmup=False) + config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0) - try: + with OpenCVCamera(config) as camera: img = camera.async_read() assert camera.thread is not None assert camera.thread.is_alive() assert isinstance(img, np.ndarray) - finally: - if camera.is_connected: - camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends def test_async_read_timeout(): - config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH) - camera = OpenCVCamera(config) - camera.connect(warmup=False) + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) - try: - with pytest.raises(TimeoutError): - camera.async_read(timeout_ms=0) - finally: - if camera.is_connected: - camera.disconnect() + with OpenCVCamera(config) as camera, pytest.raises(TimeoutError): + camera.async_read(timeout_ms=0) # consumes any available frame by then + camera.async_read(timeout_ms=0) # request immediately another one def test_async_read_before_connect(): @@ -155,6 +186,50 @@ def test_async_read_before_connect(): _ = camera.async_read() +def test_read_latest(): + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) + + with OpenCVCamera(config) as camera: + # ensure at least one fresh frame is captured + frame = camera.read() + latest = camera.read_latest() + + assert isinstance(latest, np.ndarray) + assert latest.shape == frame.shape + + +def test_read_latest_before_connect(): + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH) + + camera = OpenCVCamera(config) + with pytest.raises(DeviceNotConnectedError): + _ = camera.read_latest() + + +def test_read_latest_high_frequency(): + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) + + with OpenCVCamera(config) as camera: + # prime to ensure frames are available + ref = camera.read() + + for _ in range(20): + latest = camera.read_latest() + assert isinstance(latest, np.ndarray) + assert latest.shape == ref.shape + + +def test_read_latest_too_old(): + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0) + + with OpenCVCamera(config) as camera: + # prime to ensure frames are available + _ = camera.read() + + with pytest.raises(TimeoutError): + _ = camera.read_latest(max_age_ms=0) # immediately too old + + def test_fourcc_configuration(): """Test FourCC configuration validation and application.""" @@ -181,18 +256,15 @@ def test_fourcc_configuration(): def test_fourcc_with_camera(): """Test FourCC functionality with actual camera connection.""" - config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG") - camera = OpenCVCamera(config) + config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG", warmup_s=0) # Connect should work with MJPG specified - camera.connect(warmup=False) - assert camera.is_connected + with OpenCVCamera(config) as camera: + assert camera.is_connected - # Read should work normally - img = camera.read() - assert isinstance(img, np.ndarray) - - camera.disconnect() + # Read should work normally + img = camera.read() + assert isinstance(img, np.ndarray) @pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES) @@ -211,18 +283,16 @@ def test_rotation(rotation, index_or_path): dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png) original_width, original_height = map(int, dimensions.split("x")) - config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation) - camera = OpenCVCamera(config) - camera.connect(warmup=False) + config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation, warmup_s=0) + with OpenCVCamera(config) as camera: + img = camera.read() + assert isinstance(img, np.ndarray) - img = camera.read() - assert isinstance(img, np.ndarray) - - if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270): - assert camera.width == original_height - assert camera.height == original_width - assert img.shape[:2] == (original_width, original_height) - else: - assert camera.width == original_width - assert camera.height == original_height - assert img.shape[:2] == (original_height, original_width) + if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270): + assert camera.width == original_height + assert camera.height == original_width + assert img.shape[:2] == (original_width, original_height) + else: + assert camera.width == original_width + assert camera.height == original_height + assert img.shape[:2] == (original_height, original_width) diff --git a/tests/cameras/test_reachy2_camera.py b/tests/cameras/test_reachy2_camera.py index 14774bf38..2aebfdf0a 100644 --- a/tests/cameras/test_reachy2_camera.py +++ b/tests/cameras/test_reachy2_camera.py @@ -150,6 +150,44 @@ def test_async_read_before_connect(camera): _ = camera.async_read() +def test_read_latest(camera): + camera.connect() + + frame = camera.read() + latest = camera.read_latest() + + assert isinstance(latest, np.ndarray) + assert latest.shape == frame.shape + + +def test_read_latest_before_connect(camera): + # camera fixture yields an unconnected camera instance + with pytest.raises(DeviceNotConnectedError): + _ = camera.read_latest() + + +def test_read_latest_high_frequency(camera): + camera.connect() + + # prime to ensure frames are available + ref = camera.read() + + for _ in range(20): + latest = camera.read_latest() + assert isinstance(latest, np.ndarray) + assert latest.shape == ref.shape + + +def test_read_latest_too_old(camera): + camera.connect() + + # prime to ensure frames are available + _ = camera.read() + + with pytest.raises(TimeoutError): + _ = camera.read_latest(max_age_ms=0) # immediately too old + + def test_wrong_camera_name(): with pytest.raises(ValueError): _ = Reachy2CameraConfig(name="wrong-name", image_type="left") diff --git a/tests/cameras/test_realsense.py b/tests/cameras/test_realsense.py index fb9912257..1deb73f05 100644 --- a/tests/cameras/test_realsense.py +++ b/tests/cameras/test_realsense.py @@ -62,19 +62,15 @@ def test_abc_implementation(): def test_connect(): - config = RealSenseCameraConfig(serial_number_or_name="042") - camera = RealSenseCamera(config) + config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0) - camera.connect(warmup=False) - assert camera.is_connected + with RealSenseCamera(config) as camera: + assert camera.is_connected def test_connect_already_connected(): - config = RealSenseCameraConfig(serial_number_or_name="042") - camera = RealSenseCamera(config) - camera.connect(warmup=False) - - with pytest.raises(DeviceAlreadyConnectedError): + config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0) + with RealSenseCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError): camera.connect(warmup=False) @@ -96,12 +92,10 @@ def test_invalid_width_connect(): def test_read(): - config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30) - camera = RealSenseCamera(config) - camera.connect(warmup=False) - - img = camera.read() - assert isinstance(img, np.ndarray) + config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0) + with RealSenseCamera(config) as camera: + img = camera.read() + assert isinstance(img, np.ndarray) # TODO(Steven): Fix this test for the latest version of pyrealsense2. @@ -142,32 +136,21 @@ def test_disconnect_before_connect(): def test_async_read(): - config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30) - camera = RealSenseCamera(config) - camera.connect(warmup=False) + config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0) - try: + with RealSenseCamera(config) as camera: img = camera.async_read() assert camera.thread is not None assert camera.thread.is_alive() assert isinstance(img, np.ndarray) - finally: - if camera.is_connected: - camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends def test_async_read_timeout(): - config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30) - camera = RealSenseCamera(config) - camera.connect(warmup=False) - - try: - with pytest.raises(TimeoutError): - camera.async_read(timeout_ms=0) - finally: - if camera.is_connected: - camera.disconnect() + config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0) + with RealSenseCamera(config) as camera, pytest.raises(TimeoutError): + camera.async_read(timeout_ms=0) # consumes any available frame by then + camera.async_read(timeout_ms=0) # request immediately another one def test_async_read_before_connect(): @@ -178,6 +161,47 @@ def test_async_read_before_connect(): _ = camera.async_read() +def test_read_latest(): + config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0) + with RealSenseCamera(config) as camera: + img = camera.read() + latest = camera.read_latest() + + assert isinstance(latest, np.ndarray) + assert latest.shape == img.shape + + +def test_read_latest_high_frequency(): + config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0) + with RealSenseCamera(config) as camera: + # prime with one read to ensure frames are available + ref = camera.read() + + for _ in range(20): + latest = camera.read_latest() + assert isinstance(latest, np.ndarray) + assert latest.shape == ref.shape + + +def test_read_latest_before_connect(): + config = RealSenseCameraConfig(serial_number_or_name="042") + camera = RealSenseCamera(config) + + with pytest.raises(DeviceNotConnectedError): + _ = camera.read_latest() + + +def test_read_latest_too_old(): + config = RealSenseCameraConfig(serial_number_or_name="042") + + with RealSenseCamera(config) as camera: + # prime to ensure frames are available + _ = camera.read() + + with pytest.raises(TimeoutError): + _ = camera.read_latest(max_age_ms=0) # immediately too old + + @pytest.mark.parametrize( "rotation", [ @@ -189,18 +213,16 @@ def test_async_read_before_connect(): ids=["no_rot", "rot90", "rot180", "rot270"], ) def test_rotation(rotation): - config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation) - camera = RealSenseCamera(config) - camera.connect(warmup=False) + config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation, warmup_s=0) + with RealSenseCamera(config) as camera: + img = camera.read() + assert isinstance(img, np.ndarray) - img = camera.read() - assert isinstance(img, np.ndarray) - - if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270): - assert camera.width == 480 - assert camera.height == 640 - assert img.shape[:2] == (640, 480) - else: - assert camera.width == 640 - assert camera.height == 480 - assert img.shape[:2] == (480, 640) + if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270): + assert camera.width == 480 + assert camera.height == 640 + assert img.shape[:2] == (640, 480) + else: + assert camera.width == 640 + assert camera.height == 480 + assert img.shape[:2] == (480, 640)