From aba42c805fe474e302f9b5dba3668d1c8f33f17d Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 31 Dec 2025 15:16:23 +0100 Subject: [PATCH] some changes to smooth --- examples/rac/rac_data_collection_openarms.py | 16 ++- .../openarms_mini/openarms_mini.py | 113 ++++++++++++++++-- 2 files changed, 115 insertions(+), 14 deletions(-) diff --git a/examples/rac/rac_data_collection_openarms.py b/examples/rac/rac_data_collection_openarms.py index 78d8a5f4b..2ee797ea8 100644 --- a/examples/rac/rac_data_collection_openarms.py +++ b/examples/rac/rac_data_collection_openarms.py @@ -253,9 +253,14 @@ def rac_rollout_loop( events["correction_active"] = False break - # Detect transition to paused state (enable torque to track robot) + # Detect transition to paused state - smooth move teleop to robot position if events["policy_paused"] and not was_paused: - teleop.enable_torque() + obs = robot.get_observation() + obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features} + robot_pos = {k: v for k, v in obs_filtered.items() if k.endswith(".pos")} + print("[RaC] Moving teleop to robot position (1s smooth transition)...") + teleop.smooth_move_to(robot_pos, duration_s=1.0, fps=50) + print("[RaC] Teleop aligned. Press 'c' to take control.") was_paused = True # Detect transition to correction mode (disable torque for human control) @@ -268,9 +273,12 @@ def rac_rollout_loop( obs_frame = build_dataset_frame(dataset.features, obs_filtered, prefix=OBS_STR) if events["correction_active"]: - # Human controlling - record correction data + # Human controlling - record correction data with soft gains robot_action = teleop.get_action() - robot.send_action(robot_action) + # Use lower gains for smoother human control + soft_kp = {k.removesuffix(".pos"): 60.0 for k in robot_action if k.endswith(".pos")} + soft_kd = {k.removesuffix(".pos"): 1.5 for k in robot_action if k.endswith(".pos")} + robot.send_action(robot_action, custom_kp=soft_kp, custom_kd=soft_kd) stats["correction_frames"] += 1 # Record this frame diff --git a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py index e42706cf5..d0eb22740 100644 --- a/src/lerobot/teleoperators/openarms_mini/openarms_mini.py +++ b/src/lerobot/teleoperators/openarms_mini/openarms_mini.py @@ -296,22 +296,27 @@ class OpenArmsMini(Teleoperator): right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] + # Joint mapping: leader joint 6 -> follower joint 7, leader joint 7 -> follower joint 6 + joint_remap = {"joint_6": "joint_7", "joint_7": "joint_6"} + # Read positions from both arms right_positions = self.bus_right.sync_read("Present_Position") left_positions = self.bus_left.sync_read("Present_Position") - # Combine into single action dict with arm prefixes and flip specified motors + # Combine into single action dict with arm prefixes, flip and remap joints action = {} for motor, val in right_positions.items(): + target_motor = joint_remap.get(motor, motor) if motor in right_motors_to_flip: - action[f"right_{motor}.pos"] = -val + action[f"right_{target_motor}.pos"] = -val else: - action[f"right_{motor}.pos"] = val + action[f"right_{target_motor}.pos"] = val for motor, val in left_positions.items(): + target_motor = joint_remap.get(motor, motor) if motor in left_motors_to_flip: - action[f"left_{motor}.pos"] = -val + action[f"left_{target_motor}.pos"] = -val else: - action[f"left_{motor}.pos"] = val + action[f"left_{target_motor}.pos"] = val dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read action: {dt_ms:.1f}ms") @@ -328,7 +333,10 @@ class OpenArmsMini(Teleoperator): right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] - # Split feedback by arm and flip motors as needed + # Reverse joint mapping: follower joint 7 -> leader joint 6, follower joint 6 -> leader joint 7 + joint_remap = {"joint_7": "joint_6", "joint_6": "joint_7"} + + # Split feedback by arm and flip/remap motors as needed right_positions = {} left_positions = {} @@ -337,16 +345,18 @@ class OpenArmsMini(Teleoperator): motor_name = key.removesuffix(".pos") if motor_name.startswith("right_"): base_name = motor_name.removeprefix("right_") + target_motor = joint_remap.get(base_name, base_name) if base_name in right_motors_to_flip: - right_positions[base_name] = -val + right_positions[target_motor] = -val else: - right_positions[base_name] = val + right_positions[target_motor] = val elif motor_name.startswith("left_"): base_name = motor_name.removeprefix("left_") + target_motor = joint_remap.get(base_name, base_name) if base_name in left_motors_to_flip: - left_positions[base_name] = -val + left_positions[target_motor] = -val else: - left_positions[base_name] = val + left_positions[target_motor] = val # Write positions to both arms if right_positions: @@ -363,6 +373,89 @@ class OpenArmsMini(Teleoperator): """Disable torque on both arms (for free movement).""" self.bus_right.disable_torque() self.bus_left.disable_torque() + + def smooth_move_to(self, target: dict[str, float], duration_s: float = 1.0, fps: int = 50) -> None: + """ + Smoothly move teleoperator to target position over specified duration. + Uses linear interpolation to avoid sudden jerky movements. + + Args: + target: Target positions dict (e.g., {"right_joint_1.pos": 30.0, ...}) + duration_s: Duration of movement in seconds (default 1.0) + fps: Control frequency (default 50 Hz) + """ + # Read current positions + current_right = self.bus_right.sync_read("Present_Position") + current_left = self.bus_left.sync_read("Present_Position") + + # Build current position dict with full names + current = {} + for motor, val in current_right.items(): + current[f"right_{motor}"] = val + for motor, val in current_left.items(): + current[f"left_{motor}"] = val + + # Reverse joint mapping for target (follower joint 7 -> leader joint 6) + joint_remap = {"joint_7": "joint_6", "joint_6": "joint_7"} + right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] + left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] + + # Parse target and apply remapping/flipping + target_parsed = {} + for key, val in target.items(): + if key.endswith(".pos"): + motor_name = key.removesuffix(".pos") + if motor_name.startswith("right_"): + base_name = motor_name.removeprefix("right_") + target_motor = joint_remap.get(base_name, base_name) + if base_name in right_motors_to_flip: + target_parsed[f"right_{target_motor}"] = -val + else: + target_parsed[f"right_{target_motor}"] = val + elif motor_name.startswith("left_"): + base_name = motor_name.removeprefix("left_") + target_motor = joint_remap.get(base_name, base_name) + if base_name in left_motors_to_flip: + target_parsed[f"left_{target_motor}"] = -val + else: + target_parsed[f"left_{target_motor}"] = val + + # Enable torque for movement + self.enable_torque() + + # Interpolate over duration + num_steps = int(duration_s * fps) + step_duration = 1.0 / fps + + for step in range(num_steps + 1): + t = step / num_steps # 0.0 to 1.0 + + # Smooth easing (ease-in-out) + t_smooth = t * t * (3 - 2 * t) + + # Interpolate positions + right_positions = {} + left_positions = {} + + for motor_full, target_val in target_parsed.items(): + current_val = current.get(motor_full, target_val) + interp_val = current_val + t_smooth * (target_val - current_val) + + if motor_full.startswith("right_"): + motor = motor_full.removeprefix("right_") + right_positions[motor] = interp_val + elif motor_full.startswith("left_"): + motor = motor_full.removeprefix("left_") + left_positions[motor] = interp_val + + # Send interpolated positions + if right_positions: + self.bus_right.sync_write("Goal_Position", right_positions) + if left_positions: + self.bus_left.sync_write("Goal_Position", left_positions) + + if step < num_steps: + time.sleep(step_duration) def disconnect(self) -> None: """Disconnect from both arms."""