From 8e430f323f67d8efa02526d7a206a3da40fc7873 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Fri, 9 Jan 2026 09:41:04 +0100 Subject: [PATCH] revert --- examples/openarms/evaluate_interpolation.py | 383 +++++++------------- 1 file changed, 137 insertions(+), 246 deletions(-) diff --git a/examples/openarms/evaluate_interpolation.py b/examples/openarms/evaluate_interpolation.py index 08613658f..c7a99a9fa 100644 --- a/examples/openarms/evaluate_interpolation.py +++ b/examples/openarms/evaluate_interpolation.py @@ -15,22 +15,20 @@ # limitations under the License. """ -OpenArms Policy Evaluation with Async Inference + Interpolation +OpenArms Policy Evaluation with Interpolation -Key features: -- ASYNC INFERENCE: Policy runs in background thread, never blocks robot loop -- Robot control loop runs at true ROBOT_FPS (50Hz+) -- Interpolation between policy outputs for smooth motion -- Speed multiplier to execute faster than training +Evaluates a trained policy with smooth action interpolation: +- Decoupled camera capture (CAMERA_FPS) from robot control (ROBOT_FPS) +- Speed multiplier to execute actions faster than training +- Velocity feedforward for smoother tracking +- Adjustable PID gains Example usage: python examples/openarms/evaluate_interpolation.py """ -import threading import time from collections import deque -from dataclasses import dataclass from pathlib import Path import numpy as np @@ -57,21 +55,23 @@ HF_EVAL_DATASET_ID = "lerobot-data-collection/three-folds-pi0_eval_interp" # TO TASK_DESCRIPTION = "three-folds-dataset" # TODO: Replace with your task # ======================== TIMING CONFIG ======================== +CAMERA_FPS = 30 # Camera hardware limit (fixed) POLICY_FPS = 30 # What the policy was trained with -SPEED_MULTIPLIER = 1.0 # Execute actions faster (1.0 = normal, 1.2 = 20% faster) +SPEED_MULTIPLIER = 1.2 # Execute actions faster (1.0 = normal, 1.2 = 20% faster) ROBOT_FPS = 50 # Robot command rate (higher = smoother interpolation) # Derived values -EFFECTIVE_POLICY_FPS = int(POLICY_FPS * SPEED_MULTIPLIER) +EFFECTIVE_POLICY_FPS = int(POLICY_FPS * SPEED_MULTIPLIER) # How fast we consume actions (36Hz at 1.2x) NUM_EPISODES = 1 EPISODE_TIME_SEC = 300 RESET_TIME_SEC = 60 # ======================== PID TUNING ======================== -CUSTOM_KP_SCALE = 0.7 -CUSTOM_KD_SCALE = 1.3 -USE_VELOCITY_FEEDFORWARD = True +# Set to None to use robot config defaults +CUSTOM_KP_SCALE = 0.7 # Scale factor for position gain (0.5-1.0, lower = smoother) +CUSTOM_KD_SCALE = 1.3 # Scale factor for damping gain (1.0-2.0, higher = less overshoot) +USE_VELOCITY_FEEDFORWARD = True # Enable velocity feedforward for smoother tracking # ======================== ROBOT CONFIG ======================== FOLLOWER_LEFT_PORT = "can0" @@ -81,7 +81,7 @@ USE_LEADER_FOR_RESETS = True LEADER_LEFT_PORT = "can2" LEADER_RIGHT_PORT = "can3" -CAMERA_FPS = 30 +# Camera config uses CAMERA_FPS (hardware limit) CAMERA_CONFIG = { "left_wrist": OpenCVCameraConfig(index_or_path="/dev/video5", width=640, height=480, fps=CAMERA_FPS), "right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=CAMERA_FPS), @@ -89,170 +89,47 @@ CAMERA_CONFIG = { } -@dataclass -class InferenceResult: - """Result from async inference thread.""" - robot_action: dict - observation_frame: dict - obs_processed: dict - act_processed: dict - timestamp: float - inference_time_ms: float - - -class AsyncInferenceThread(threading.Thread): - """Background thread for camera capture + policy inference.""" - - def __init__( - self, - robot, - policy, - preprocessor, - postprocessor, - robot_observation_processor, - robot_action_processor, - dataset, - task: str, - ): - super().__init__(daemon=True) - self.robot = robot - self.policy = policy - self.preprocessor = preprocessor - self.postprocessor = postprocessor - self.robot_observation_processor = robot_observation_processor - self.robot_action_processor = robot_action_processor - self.dataset = dataset - self.task = task - - self._lock = threading.Lock() - self._latest_result: InferenceResult | None = None - self._result_consumed = True - self._running = False - self._inference_hz_tracker = HzTracker(name="Inference", print_interval=5.0) - - def get_latest_result(self) -> InferenceResult | None: - """Get latest inference result (thread-safe). Returns None if no new result.""" - with self._lock: - if self._result_consumed: - return None - result = self._latest_result - self._result_consumed = True - return result - - def peek_latest_result(self) -> InferenceResult | None: - """Peek at latest result without marking as consumed.""" - with self._lock: - return self._latest_result - - def stop(self): - self._running = False - - def run(self): - from lerobot.scripts.lerobot_record import build_dataset_frame, make_robot_action - - self._running = True - self.policy.reset() - - while self._running: - try: - start = time.perf_counter() - - # Capture observation - obs = self.robot.get_observation() - obs_processed = self.robot_observation_processor(obs) - observation_frame = build_dataset_frame( - self.dataset.features, obs_processed, prefix="observation" - ) - - # Run inference - action_values = predict_action( - observation=observation_frame, - policy=self.policy, - device=get_safe_torch_device(self.policy.config.device), - preprocessor=self.preprocessor, - postprocessor=self.postprocessor, - use_amp=self.policy.config.use_amp, - task=self.task, - robot_type=self.robot.robot_type, - ) - - act_processed = make_robot_action(action_values, self.dataset.features) - robot_action = self.robot_action_processor((act_processed, obs)) - - inference_time_ms = (time.perf_counter() - start) * 1000 - - # Store result - result = InferenceResult( - robot_action=robot_action, - observation_frame=observation_frame, - obs_processed=obs_processed, - act_processed=act_processed, - timestamp=time.perf_counter(), - inference_time_ms=inference_time_ms, - ) - - with self._lock: - self._latest_result = result - self._result_consumed = False - - self._inference_hz_tracker.tick() - - except Exception as e: - print(f"Inference thread error: {e}") - time.sleep(0.01) - - # Print final inference stats - hz = self._inference_hz_tracker.get_avg_hz() - if hz: - print(f"Final inference Hz: {hz:.1f}") - - class ActionInterpolator: - """Interpolate between policy actions for smoother robot control.""" + """Interpolate between policy actions for smoother robot control with velocity estimation.""" - def __init__(self, robot_fps: int): + def __init__(self, effective_policy_fps: int, robot_fps: int): + self.effective_policy_fps = effective_policy_fps self.robot_fps = robot_fps + self.substeps_per_policy_step = robot_fps / effective_policy_fps self.prev_action: dict | None = None self.curr_action: dict | None = None - self.prev_time: float = 0 - self.curr_time: float = 0 + self.substep = 0 self.last_interpolated: dict | None = None - def update(self, new_action: dict, timestamp: float) -> None: + def update(self, new_action: dict) -> None: self.prev_action = self.curr_action - self.prev_time = self.curr_time self.curr_action = new_action - self.curr_time = timestamp + self.substep = 0 - def get_interpolated_action(self, current_time: float) -> tuple[dict | None, dict | None]: + def get_interpolated_action(self) -> tuple[dict | None, dict | None]: """Returns (interpolated_position, estimated_velocity_deg_per_sec)""" if self.curr_action is None: return None, None if self.prev_action is None: self.last_interpolated = self.curr_action.copy() return self.curr_action, {k: 0.0 for k in self.curr_action} - - # Time-based interpolation - dt_actions = self.curr_time - self.prev_time - if dt_actions <= 0: - dt_actions = 1.0 / 30 # Fallback - t = (current_time - self.prev_time) / dt_actions - t = max(0.0, min(t, 1.5)) # Allow slight extrapolation + t = min(self.substep / self.substeps_per_policy_step, 1.0) + self.substep += 1 interpolated = {} velocity = {} - dt_robot = 1.0 / self.robot_fps + dt = 1.0 / self.robot_fps for key in self.curr_action: prev = self.prev_action.get(key, self.curr_action[key]) curr = self.curr_action[key] - interpolated[key] = prev + t * (curr - prev) + interpolated[key] = prev * (1 - t) + curr * t if self.last_interpolated is not None and key in self.last_interpolated: - velocity[key] = (interpolated[key] - self.last_interpolated[key]) / dt_robot + velocity[key] = (interpolated[key] - self.last_interpolated[key]) / dt else: - velocity[key] = (curr - prev) / dt_actions + velocity[key] = (curr - prev) * self.effective_policy_fps self.last_interpolated = interpolated.copy() return interpolated, velocity @@ -260,15 +137,14 @@ class ActionInterpolator: def reset(self): self.prev_action = None self.curr_action = None - self.prev_time = 0 - self.curr_time = 0 + self.substep = 0 self.last_interpolated = None class HzTracker: """Track and display actual loop frequency.""" - def __init__(self, name: str = "Loop", window_size: int = 100, print_interval: float = 2.0): + def __init__(self, name: str = "Robot", window_size: int = 100, print_interval: float = 2.0): self.name = name self.timestamps = deque(maxlen=window_size) self.last_print_time = 0 @@ -299,44 +175,72 @@ class HzTracker: self.last_print_time = 0 -def async_eval_loop( +def interpolated_eval_loop( robot, - inference_thread: AsyncInferenceThread, - interpolator: ActionInterpolator, - robot_hz_tracker: HzTracker, + policy, + preprocessor, + postprocessor, + robot_observation_processor, + robot_action_processor, dataset, events, - robot_fps: int, + interpolator: ActionInterpolator, + robot_hz_tracker: HzTracker, + camera_fps: int, effective_policy_fps: int, + robot_fps: int, control_time_s: float, task: str, - custom_kp: dict | None = None, - custom_kd: dict | None = None, + kp_scale: float | None = None, + kd_scale: float | None = None, use_velocity_ff: bool = False, ): """ - Main robot control loop with async inference. - - - Inference runs in background thread (as fast as it can) - - This loop runs at ROBOT_FPS, never blocked by inference - - Interpolates between inference results for smooth motion + Run evaluation with decoupled camera and robot control: + - Camera captures at camera_fps (hardware limit) + - Policy inference runs when new camera frame is available + - Actions are consumed at effective_policy_fps (sped up by SPEED_MULTIPLIER) + - Robot receives interpolated commands at robot_fps (smoothest) """ - from lerobot.scripts.lerobot_record import build_dataset_frame + from lerobot.scripts.lerobot_record import build_dataset_frame, make_robot_action from lerobot.utils.visualization_utils import log_rerun_data - robot_dt = 1.0 / robot_fps + camera_dt = 1.0 / camera_fps policy_dt = 1.0 / effective_policy_fps + robot_dt = 1.0 / robot_fps interpolator.reset() robot_hz_tracker.reset() + policy.reset() + + # Build custom gains if scaling is enabled + custom_kp = None + custom_kd = None + if kp_scale is not None or kd_scale is not None: + custom_kp = {} + custom_kd = {} + for arm in ["right", "left"]: + bus = robot.bus_right if arm == "right" else robot.bus_left + for i, motor_name in enumerate(bus.motors): + full_name = f"{arm}_{motor_name}" + default_kp = robot.config.position_kp[i] if isinstance(robot.config.position_kp, list) else robot.config.position_kp + default_kd = robot.config.position_kd[i] if isinstance(robot.config.position_kd, list) else robot.config.position_kd + custom_kp[full_name] = default_kp * (kp_scale or 1.0) + custom_kd[full_name] = default_kd * (kd_scale or 1.0) + print(f"Custom gains: kp_scale={kp_scale}, kd_scale={kd_scale}") + + if use_velocity_ff: + print("Velocity feedforward: enabled") + + last_camera_time = -camera_dt + last_policy_action_time = -policy_dt + cached_observation = None + cached_robot_action = None - last_action_consume_time = 0 start_time = time.perf_counter() - print(f"\nAsync eval loop started:") - print(f" Robot control: {robot_fps}Hz (main thread, never blocked)") - print(f" Inference: background thread (as fast as possible)") - print(f" Action consume rate: {effective_policy_fps}Hz") + print(f"\nStarting interpolated eval loop:") + print(f" Camera: {camera_fps}Hz | Policy actions consumed: {effective_policy_fps}Hz | Robot: {robot_fps}Hz") while time.perf_counter() - start_time < control_time_s: if events["exit_early"] or events["stop_recording"]: @@ -345,26 +249,45 @@ def async_eval_loop( loop_start = time.perf_counter() elapsed = loop_start - start_time - # Check for new inference result (non-blocking) - result = inference_thread.get_latest_result() - if result is not None: - # Consume action at effective_policy_fps rate - if elapsed - last_action_consume_time >= policy_dt: - interpolator.update(result.robot_action, result.timestamp) - last_action_consume_time = elapsed - - # Save to dataset - if dataset is not None: - action_frame = build_dataset_frame( - dataset.features, result.act_processed, prefix="action" - ) - frame = {**result.observation_frame, **action_frame, "task": task} - dataset.add_frame(frame) - log_rerun_data(observation=result.obs_processed, action=result.act_processed) + # === CAMERA CAPTURE (at camera_fps, decoupled from robot) === + if elapsed - last_camera_time >= camera_dt: + obs = robot.get_observation() + obs_processed = robot_observation_processor(obs) + observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix="observation") + + # Run policy inference with fresh observation + action_values = predict_action( + observation=observation_frame, + policy=policy, + device=get_safe_torch_device(policy.config.device), + preprocessor=preprocessor, + postprocessor=postprocessor, + use_amp=policy.config.use_amp, + task=task, + robot_type=robot.robot_type, + ) + + act_processed = make_robot_action(action_values, dataset.features) + cached_robot_action = robot_action_processor((act_processed, obs)) + cached_observation = (obs_processed, observation_frame, act_processed) + + last_camera_time = elapsed - # Get interpolated action and send to robot (always runs at robot_fps) - current_time = time.perf_counter() - smooth_action, velocity = interpolator.get_interpolated_action(current_time) + # === ACTION UPDATE (at effective_policy_fps, faster than camera if speed > 1) === + if elapsed - last_policy_action_time >= policy_dt and cached_robot_action is not None: + interpolator.update(cached_robot_action) + last_policy_action_time = elapsed + + # Save to dataset at effective policy rate + if dataset is not None and cached_observation is not None: + obs_processed, observation_frame, act_processed = cached_observation + action_frame = build_dataset_frame(dataset.features, act_processed, prefix="action") + frame = {**observation_frame, **action_frame, "task": task} + dataset.add_frame(frame) + log_rerun_data(observation=obs_processed, action=act_processed) + + # === ROBOT COMMAND (at robot_fps, highest rate for smoothness) === + smooth_action, velocity = interpolator.get_interpolated_action() if smooth_action is not None: vel_ff = velocity if use_velocity_ff else None robot.send_action(smooth_action, custom_kp=custom_kp, custom_kd=custom_kd, velocity_feedforward=vel_ff) @@ -379,44 +302,32 @@ def async_eval_loop( # Print final stats robot_hz = robot_hz_tracker.get_avg_hz() if robot_hz: - print(f"\nFinal robot Hz: {robot_hz:.1f}") - - -def build_custom_gains(robot, kp_scale: float | None, kd_scale: float | None): - """Build custom PID gains dict.""" - if kp_scale is None and kd_scale is None: - return None, None - - custom_kp = {} - custom_kd = {} - for arm in ["right", "left"]: - bus = robot.bus_right if arm == "right" else robot.bus_left - for i, motor_name in enumerate(bus.motors): - full_name = f"{arm}_{motor_name}" - default_kp = robot.config.position_kp[i] if isinstance(robot.config.position_kp, list) else robot.config.position_kp - default_kd = robot.config.position_kd[i] if isinstance(robot.config.position_kd, list) else robot.config.position_kd - custom_kp[full_name] = default_kp * (kp_scale or 1.0) - custom_kd[full_name] = default_kd * (kd_scale or 1.0) - return custom_kp, custom_kd + print(f"\nFinal average robot Hz: {robot_hz:.1f}") def main(): """Main evaluation function.""" print("=" * 60) - print("OpenArms Async Inference + Interpolation Evaluation") + print("OpenArms Policy Evaluation with Interpolation") print("=" * 60) print(f"\nModel: {HF_MODEL_ID}") print(f"Dataset: {HF_EVAL_DATASET_ID}") print(f"Task: {TASK_DESCRIPTION}") print(f"\n--- Timing ---") + print(f"Camera FPS: {CAMERA_FPS} (hardware limit)") print(f"Policy trained at: {POLICY_FPS}Hz") print(f"Speed multiplier: {SPEED_MULTIPLIER}x") - print(f"Effective policy FPS: {EFFECTIVE_POLICY_FPS}Hz") - print(f"Robot FPS: {ROBOT_FPS}Hz (interpolated, non-blocking)") + print(f"Effective policy FPS: {EFFECTIVE_POLICY_FPS}Hz (actions consumed)") + print(f"Robot FPS: {ROBOT_FPS}Hz (interpolated commands)") print(f"\n--- PID Tuning ---") print(f"KP scale: {CUSTOM_KP_SCALE}") print(f"KD scale: {CUSTOM_KD_SCALE}") print(f"Velocity feedforward: {USE_VELOCITY_FEEDFORWARD}") + print(f"\n--- Episodes ---") + print(f"Episodes: {NUM_EPISODES}") + print(f"Duration: {EPISODE_TIME_SEC}s per episode") + print(f"Reset time: {RESET_TIME_SEC}s") + print(f"Leader for resets: {USE_LEADER_FOR_RESETS}") print("=" * 60) follower_config = OpenArmsFollowerConfig( @@ -456,6 +367,8 @@ def main(): leader.bus_left.enable_torque() time.sleep(0.1) print(f"Leader connected with gravity compensation") + else: + print(f"Leader connected (no gravity compensation)") teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors() @@ -488,6 +401,7 @@ def main(): leader.disconnect() return + # Dataset uses effective policy FPS (sped up rate) dataset = LeRobotDataset.create( repo_id=HF_EVAL_DATASET_ID, fps=EFFECTIVE_POLICY_FPS, @@ -513,13 +427,9 @@ def main(): print(f"\nRunning evaluation...") listener, events = init_keyboard_listener() - init_rerun(session_name="openarms_async_eval") + init_rerun(session_name="openarms_evaluation_interp") - custom_kp, custom_kd = build_custom_gains(follower, CUSTOM_KP_SCALE, CUSTOM_KD_SCALE) - if custom_kp: - print(f"Custom gains: kp_scale={CUSTOM_KP_SCALE}, kd_scale={CUSTOM_KD_SCALE}") - - interpolator = ActionInterpolator(robot_fps=ROBOT_FPS) + interpolator = ActionInterpolator(effective_policy_fps=EFFECTIVE_POLICY_FPS, robot_fps=ROBOT_FPS) robot_hz_tracker = HzTracker(name="Robot", window_size=100, print_interval=2.0) episode_idx = 0 @@ -529,8 +439,7 @@ def main(): log_say(f"Evaluating episode {episode_idx + 1} of {NUM_EPISODES}") print(f"\n--- Episode {episode_idx + 1}/{NUM_EPISODES} ---") - # Start async inference thread - inference_thread = AsyncInferenceThread( + interpolated_eval_loop( robot=follower, policy=policy, preprocessor=preprocessor, @@ -538,37 +447,19 @@ def main(): robot_observation_processor=robot_observation_processor, robot_action_processor=robot_action_processor, dataset=dataset, - task=TASK_DESCRIPTION, - ) - inference_thread.start() - - # Wait for first inference result - print("Waiting for first inference...") - while inference_thread.peek_latest_result() is None: - time.sleep(0.01) - print("First inference complete, starting control loop") - - # Run the async evaluation loop - async_eval_loop( - robot=follower, - inference_thread=inference_thread, + events=events, interpolator=interpolator, robot_hz_tracker=robot_hz_tracker, - dataset=dataset, - events=events, - robot_fps=ROBOT_FPS, + camera_fps=CAMERA_FPS, effective_policy_fps=EFFECTIVE_POLICY_FPS, + robot_fps=ROBOT_FPS, control_time_s=EPISODE_TIME_SEC, task=TASK_DESCRIPTION, - custom_kp=custom_kp, - custom_kd=custom_kd, + kp_scale=CUSTOM_KP_SCALE, + kd_scale=CUSTOM_KD_SCALE, use_velocity_ff=USE_VELOCITY_FEEDFORWARD, ) - # Stop inference thread - inference_thread.stop() - inference_thread.join(timeout=2.0) - if events["rerecord_episode"]: log_say("Re-recording episode") events["rerecord_episode"] = False @@ -581,7 +472,6 @@ def main(): dataset.save_episode() episode_idx += 1 - # Reset phase if not events["stop_recording"] and episode_idx < NUM_EPISODES: if USE_LEADER_FOR_RESETS and leader: log_say("Reset the environment using leader arms") @@ -694,3 +584,4 @@ def main(): if __name__ == "__main__": main() +