have only rtc thread read obs and expose it

This commit is contained in:
Pepijn
2026-01-09 15:48:49 +01:00
parent 1f93a74d8c
commit 4166eeb7da
@@ -258,6 +258,7 @@ def get_actions_thread(
robot: RobotWrapper,
robot_observation_processor,
action_queue_holder: dict,
obs_holder: dict,
shutdown_event: Event,
cfg: RaCRTCConfig,
policy_active: Event,
@@ -299,6 +300,8 @@ def get_actions_thread(
inference_delay = math.ceil(inference_latency / time_per_chunk) if inference_latency else 0
obs = robot.get_observation()
obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features}
obs_holder["obs"] = obs_filtered # Share with main loop for recording
obs_processed = robot_observation_processor(obs)
obs_with_policy_features = build_dataset_frame(
@@ -431,10 +434,11 @@ def main(cfg: RaCRTCConfig):
logger.info(f"Policy loaded: {policy.name}")
action_queue_holder = {"queue": ActionQueue(cfg.rtc)}
obs_holder = {"obs": None}
get_actions_t = Thread(
target=get_actions_thread,
args=(policy, robot, obs_proc, action_queue_holder, shutdown_event, cfg, policy_active, fps),
args=(policy, robot, obs_proc, action_queue_holder, obs_holder, shutdown_event, cfg, policy_active, fps),
daemon=True,
name="GetActions",
)
@@ -499,11 +503,12 @@ def main(cfg: RaCRTCConfig):
events["start_correction"] = False
print("[RaC] Correction mode - you have control")
obs = robot.get_observation()
obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features}
obs_frame = build_dataset_frame(dataset_features, obs_filtered, prefix="observation")
if events["correction_active"]:
# Human controlling - read obs and record
obs = robot.get_observation()
obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features}
obs_frame = build_dataset_frame(dataset_features, obs_filtered, prefix="observation")
robot_action = teleop.get_action()
for key in robot_action:
if "gripper" in key:
@@ -515,9 +520,11 @@ def main(cfg: RaCRTCConfig):
frame_buffer.append(frame)
elif events["policy_paused"]:
# Paused - don't touch robot, RTC thread is also paused
pass
else:
# Policy running - RTC thread handles observations, we execute actions and record
if interp_idx >= len(interpolated_actions):
new_action = action_queue.get()
if new_action is not None:
@@ -546,9 +553,12 @@ def main(cfg: RaCRTCConfig):
robot.send_action(action_processed)
robot_send_count += 1
action_frame = build_dataset_frame(dataset_features, action_dict, prefix="action")
frame = {**obs_frame, **action_frame, "task": cfg.dataset.single_task}
frame_buffer.append(frame)
# Record frame using observation from RTC thread
if obs_holder["obs"] is not None:
obs_frame = build_dataset_frame(dataset_features, obs_holder["obs"], prefix="observation")
action_frame = build_dataset_frame(dataset_features, action_dict, prefix="action")
frame = {**obs_frame, **action_frame, "task": cfg.dataset.single_task}
frame_buffer.append(frame)
now = time.perf_counter()
if now - last_hz_print >= 5.0: