From 4166eeb7da3d268c43bd3d9f428d2d9947b60226 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Fri, 9 Jan 2026 15:48:49 +0100 Subject: [PATCH] have only rtc thread read obs and expose it --- .../rac/rac_data_collection_openarms_rtc.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/examples/rac/rac_data_collection_openarms_rtc.py b/examples/rac/rac_data_collection_openarms_rtc.py index 030db0bc8..47fdf3b70 100644 --- a/examples/rac/rac_data_collection_openarms_rtc.py +++ b/examples/rac/rac_data_collection_openarms_rtc.py @@ -258,6 +258,7 @@ def get_actions_thread( robot: RobotWrapper, robot_observation_processor, action_queue_holder: dict, + obs_holder: dict, shutdown_event: Event, cfg: RaCRTCConfig, policy_active: Event, @@ -299,6 +300,8 @@ def get_actions_thread( inference_delay = math.ceil(inference_latency / time_per_chunk) if inference_latency else 0 obs = robot.get_observation() + obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features} + obs_holder["obs"] = obs_filtered # Share with main loop for recording obs_processed = robot_observation_processor(obs) obs_with_policy_features = build_dataset_frame( @@ -431,10 +434,11 @@ def main(cfg: RaCRTCConfig): logger.info(f"Policy loaded: {policy.name}") action_queue_holder = {"queue": ActionQueue(cfg.rtc)} + obs_holder = {"obs": None} get_actions_t = Thread( target=get_actions_thread, - args=(policy, robot, obs_proc, action_queue_holder, shutdown_event, cfg, policy_active, fps), + args=(policy, robot, obs_proc, action_queue_holder, obs_holder, shutdown_event, cfg, policy_active, fps), daemon=True, name="GetActions", ) @@ -499,11 +503,12 @@ def main(cfg: RaCRTCConfig): events["start_correction"] = False print("[RaC] Correction mode - you have control") - obs = robot.get_observation() - obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features} - obs_frame = build_dataset_frame(dataset_features, obs_filtered, prefix="observation") - if events["correction_active"]: + # Human controlling - read obs and record + obs = robot.get_observation() + obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features} + obs_frame = build_dataset_frame(dataset_features, obs_filtered, prefix="observation") + robot_action = teleop.get_action() for key in robot_action: if "gripper" in key: @@ -515,9 +520,11 @@ def main(cfg: RaCRTCConfig): frame_buffer.append(frame) elif events["policy_paused"]: + # Paused - don't touch robot, RTC thread is also paused pass else: + # Policy running - RTC thread handles observations, we execute actions and record if interp_idx >= len(interpolated_actions): new_action = action_queue.get() if new_action is not None: @@ -546,9 +553,12 @@ def main(cfg: RaCRTCConfig): robot.send_action(action_processed) robot_send_count += 1 - action_frame = build_dataset_frame(dataset_features, action_dict, prefix="action") - frame = {**obs_frame, **action_frame, "task": cfg.dataset.single_task} - frame_buffer.append(frame) + # Record frame using observation from RTC thread + if obs_holder["obs"] is not None: + obs_frame = build_dataset_frame(dataset_features, obs_holder["obs"], prefix="observation") + action_frame = build_dataset_frame(dataset_features, action_dict, prefix="action") + frame = {**obs_frame, **action_frame, "task": cfg.dataset.single_task} + frame_buffer.append(frame) now = time.perf_counter() if now - last_hz_print >= 5.0: