add feedback to openarms mini

This commit is contained in:
Pepijn
2025-12-30 10:48:55 +01:00
parent 48a963793b
commit 38b814f3d4
2 changed files with 63 additions and 2 deletions
@@ -209,6 +209,9 @@ def rac_rollout_loop(
The human intervention (recovery + correction) is recorded as training data.
This teaches the policy how to recover from errors.
During autonomous execution, the teleoperator mirrors the robot's position so
intervention is seamless - the teleoperator is already in the same configuration.
"""
policy.reset()
preprocessor.reset()
@@ -224,6 +227,10 @@ def rac_rollout_loop(
"intervention_occurred": False,
}
# Enable torque on teleoperator so it can track robot position
teleop.enable_torque()
was_intervention_active = False
timestamp = 0
start_t = time.perf_counter()
@@ -235,6 +242,11 @@ def rac_rollout_loop(
events["intervention_active"] = False
break
# Detect transition to intervention mode
if events["intervention_active"] and not was_intervention_active:
teleop.disable_torque()
was_intervention_active = True
obs = robot.get_observation()
# Filter out internal timing data before building frame
obs_filtered = {k: v for k, v in obs.items() if not k.startswith("_")}
@@ -253,6 +265,9 @@ def rac_rollout_loop(
)
robot_action: RobotAction = make_robot_action(action_values, dataset.features)
stats["autonomous_frames"] += 1
# Mirror robot position to teleoperator for seamless intervention
teleop.send_feedback(robot_action)
else:
stats["intervention_occurred"] = True
robot_action = teleop.get_action()
@@ -273,6 +288,9 @@ def rac_rollout_loop(
precise_sleep(1 / fps - dt)
timestamp = time.perf_counter() - start_t
# Ensure teleoperator torque is disabled at end
teleop.disable_torque()
for frame in frame_buffer:
dataset.add_frame(frame)
@@ -318,8 +318,51 @@ class OpenArmsMini(Teleoperator):
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
"""Send feedback to teleoperator (not implemented)."""
raise NotImplementedError
"""
Send position feedback to teleoperator motors.
Args:
feedback: Dictionary with motor positions (e.g., "right_joint_1.pos", "left_joint_2.pos")
"""
# Motors to flip (invert direction) -> matches get_action()
right_motors_to_flip = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
left_motors_to_flip = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Split feedback by arm and flip motors as needed
right_positions = {}
left_positions = {}
for key, val in feedback.items():
if key.endswith(".pos"):
motor_name = key.removesuffix(".pos")
if motor_name.startswith("right_"):
base_name = motor_name.removeprefix("right_")
if base_name in right_motors_to_flip:
right_positions[base_name] = -val
else:
right_positions[base_name] = val
elif motor_name.startswith("left_"):
base_name = motor_name.removeprefix("left_")
if base_name in left_motors_to_flip:
left_positions[base_name] = -val
else:
left_positions[base_name] = val
# Write positions to both arms
if right_positions:
self.bus_right.sync_write("Goal_Position", right_positions)
if left_positions:
self.bus_left.sync_write("Goal_Position", left_positions)
def enable_torque(self) -> None:
"""Enable torque on both arms (for position tracking)."""
self.bus_right.enable_torque()
self.bus_left.enable_torque()
def disable_torque(self) -> None:
"""Disable torque on both arms (for free movement)."""
self.bus_right.disable_torque()
self.bus_left.disable_torque()
def disconnect(self) -> None:
"""Disconnect from both arms."""