some changes to smooth

This commit is contained in:
Pepijn
2025-12-31 15:16:23 +01:00
parent 8b6b41f8dc
commit aba42c805f
2 changed files with 115 additions and 14 deletions
+12 -4
View File
@@ -253,9 +253,14 @@ def rac_rollout_loop(
events["correction_active"] = False events["correction_active"] = False
break break
# Detect transition to paused state (enable torque to track robot) # Detect transition to paused state - smooth move teleop to robot position
if events["policy_paused"] and not was_paused: if events["policy_paused"] and not was_paused:
teleop.enable_torque() obs = robot.get_observation()
obs_filtered = {k: v for k, v in obs.items() if k in robot.observation_features}
robot_pos = {k: v for k, v in obs_filtered.items() if k.endswith(".pos")}
print("[RaC] Moving teleop to robot position (1s smooth transition)...")
teleop.smooth_move_to(robot_pos, duration_s=1.0, fps=50)
print("[RaC] Teleop aligned. Press 'c' to take control.")
was_paused = True was_paused = True
# Detect transition to correction mode (disable torque for human control) # Detect transition to correction mode (disable torque for human control)
@@ -268,9 +273,12 @@ def rac_rollout_loop(
obs_frame = build_dataset_frame(dataset.features, obs_filtered, prefix=OBS_STR) obs_frame = build_dataset_frame(dataset.features, obs_filtered, prefix=OBS_STR)
if events["correction_active"]: if events["correction_active"]:
# Human controlling - record correction data # Human controlling - record correction data with soft gains
robot_action = teleop.get_action() robot_action = teleop.get_action()
robot.send_action(robot_action) # Use lower gains for smoother human control
soft_kp = {k.removesuffix(".pos"): 60.0 for k in robot_action if k.endswith(".pos")}
soft_kd = {k.removesuffix(".pos"): 1.5 for k in robot_action if k.endswith(".pos")}
robot.send_action(robot_action, custom_kp=soft_kp, custom_kd=soft_kd)
stats["correction_frames"] += 1 stats["correction_frames"] += 1
# Record this frame # Record this frame
@@ -296,22 +296,27 @@ class OpenArmsMini(Teleoperator):
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Joint mapping: leader joint 6 -> follower joint 7, leader joint 7 -> follower joint 6
joint_remap = {"joint_6": "joint_7", "joint_7": "joint_6"}
# Read positions from both arms # Read positions from both arms
right_positions = self.bus_right.sync_read("Present_Position") right_positions = self.bus_right.sync_read("Present_Position")
left_positions = self.bus_left.sync_read("Present_Position") left_positions = self.bus_left.sync_read("Present_Position")
# Combine into single action dict with arm prefixes and flip specified motors # Combine into single action dict with arm prefixes, flip and remap joints
action = {} action = {}
for motor, val in right_positions.items(): for motor, val in right_positions.items():
target_motor = joint_remap.get(motor, motor)
if motor in right_motors_to_flip: if motor in right_motors_to_flip:
action[f"right_{motor}.pos"] = -val action[f"right_{target_motor}.pos"] = -val
else: else:
action[f"right_{motor}.pos"] = val action[f"right_{target_motor}.pos"] = val
for motor, val in left_positions.items(): for motor, val in left_positions.items():
target_motor = joint_remap.get(motor, motor)
if motor in left_motors_to_flip: if motor in left_motors_to_flip:
action[f"left_{motor}.pos"] = -val action[f"left_{target_motor}.pos"] = -val
else: else:
action[f"left_{motor}.pos"] = val action[f"left_{target_motor}.pos"] = val
dt_ms = (time.perf_counter() - start) * 1e3 dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms") logger.debug(f"{self} read action: {dt_ms:.1f}ms")
@@ -328,7 +333,10 @@ class OpenArmsMini(Teleoperator):
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"] right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"] left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Split feedback by arm and flip motors as needed # Reverse joint mapping: follower joint 7 -> leader joint 6, follower joint 6 -> leader joint 7
joint_remap = {"joint_7": "joint_6", "joint_6": "joint_7"}
# Split feedback by arm and flip/remap motors as needed
right_positions = {} right_positions = {}
left_positions = {} left_positions = {}
@@ -337,16 +345,18 @@ class OpenArmsMini(Teleoperator):
motor_name = key.removesuffix(".pos") motor_name = key.removesuffix(".pos")
if motor_name.startswith("right_"): if motor_name.startswith("right_"):
base_name = motor_name.removeprefix("right_") base_name = motor_name.removeprefix("right_")
target_motor = joint_remap.get(base_name, base_name)
if base_name in right_motors_to_flip: if base_name in right_motors_to_flip:
right_positions[base_name] = -val right_positions[target_motor] = -val
else: else:
right_positions[base_name] = val right_positions[target_motor] = val
elif motor_name.startswith("left_"): elif motor_name.startswith("left_"):
base_name = motor_name.removeprefix("left_") base_name = motor_name.removeprefix("left_")
target_motor = joint_remap.get(base_name, base_name)
if base_name in left_motors_to_flip: if base_name in left_motors_to_flip:
left_positions[base_name] = -val left_positions[target_motor] = -val
else: else:
left_positions[base_name] = val left_positions[target_motor] = val
# Write positions to both arms # Write positions to both arms
if right_positions: if right_positions:
@@ -363,6 +373,89 @@ class OpenArmsMini(Teleoperator):
"""Disable torque on both arms (for free movement).""" """Disable torque on both arms (for free movement)."""
self.bus_right.disable_torque() self.bus_right.disable_torque()
self.bus_left.disable_torque() self.bus_left.disable_torque()
def smooth_move_to(self, target: dict[str, float], duration_s: float = 1.0, fps: int = 50) -> None:
"""
Smoothly move teleoperator to target position over specified duration.
Uses linear interpolation to avoid sudden jerky movements.
Args:
target: Target positions dict (e.g., {"right_joint_1.pos": 30.0, ...})
duration_s: Duration of movement in seconds (default 1.0)
fps: Control frequency (default 50 Hz)
"""
# Read current positions
current_right = self.bus_right.sync_read("Present_Position")
current_left = self.bus_left.sync_read("Present_Position")
# Build current position dict with full names
current = {}
for motor, val in current_right.items():
current[f"right_{motor}"] = val
for motor, val in current_left.items():
current[f"left_{motor}"] = val
# Reverse joint mapping for target (follower joint 7 -> leader joint 6)
joint_remap = {"joint_7": "joint_6", "joint_6": "joint_7"}
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Parse target and apply remapping/flipping
target_parsed = {}
for key, val in target.items():
if key.endswith(".pos"):
motor_name = key.removesuffix(".pos")
if motor_name.startswith("right_"):
base_name = motor_name.removeprefix("right_")
target_motor = joint_remap.get(base_name, base_name)
if base_name in right_motors_to_flip:
target_parsed[f"right_{target_motor}"] = -val
else:
target_parsed[f"right_{target_motor}"] = val
elif motor_name.startswith("left_"):
base_name = motor_name.removeprefix("left_")
target_motor = joint_remap.get(base_name, base_name)
if base_name in left_motors_to_flip:
target_parsed[f"left_{target_motor}"] = -val
else:
target_parsed[f"left_{target_motor}"] = val
# Enable torque for movement
self.enable_torque()
# Interpolate over duration
num_steps = int(duration_s * fps)
step_duration = 1.0 / fps
for step in range(num_steps + 1):
t = step / num_steps # 0.0 to 1.0
# Smooth easing (ease-in-out)
t_smooth = t * t * (3 - 2 * t)
# Interpolate positions
right_positions = {}
left_positions = {}
for motor_full, target_val in target_parsed.items():
current_val = current.get(motor_full, target_val)
interp_val = current_val + t_smooth * (target_val - current_val)
if motor_full.startswith("right_"):
motor = motor_full.removeprefix("right_")
right_positions[motor] = interp_val
elif motor_full.startswith("left_"):
motor = motor_full.removeprefix("left_")
left_positions[motor] = interp_val
# Send interpolated positions
if right_positions:
self.bus_right.sync_write("Goal_Position", right_positions)
if left_positions:
self.bus_left.sync_write("Goal_Position", left_positions)
if step < num_steps:
time.sleep(step_duration)
def disconnect(self) -> None: def disconnect(self) -> None:
"""Disconnect from both arms.""" """Disconnect from both arms."""