feat(dagger): Add HIL/Dagger/HG-Dagger/RaC style data collection (#2833)

* feat: HIL data collection, RTC interpolator, and action queue improvements

- Add Human-in-the-Loop (HIL) data collection examples (sync + RTC)
- Add HIL data collection documentation
- Add ActionInterpolator for smoother policy control at higher rates
- Integrate interpolator into lerobot-record and eval_with_real_robot
- Add action queue clear() and get_processed_left_over() methods
- Add rtc/__init__.py for cleaner imports

* docs: expand Related Work section with paper summaries

* fix: only record dataset frames at original fps, not at interpolated rate

The interpolator speeds up robot control (e.g. 2x) but dataset frames
should still be recorded at the original fps. Interpolated-only
iterations now only send actions to the robot without writing to the
dataset.

* refactor: merge HIL sync and RTC scripts into single file with --rtc.enabled toggle

Combines hil_data_collection.py and hil_data_collection_rtc.py into one
script. RTC is toggled via --rtc.enabled=true (defaults to off for sync
inference). Deletes the separate hil_data_collection_rtc.py and updates
docs to reflect the single-script usage.

* test: add ActionInterpolator test suite (29 tests)

Covers constructor validation, passthrough (multiplier=1), 2x and 3x
interpolation with exact value checks, reset/episode boundaries,
control interval calculation, multi-dim actions, and simulated
control loop integration.

* test: add ActionQueue + ActionInterpolator integration tests

Verifies the interpolator doesn't interfere with RTC's leftover chunk
tracking: queue consumption rate matches base fps regardless of
multiplier, get_left_over/get_processed_left_over only change on
queue.get(), merge preserves smooth interpolation across chunks,
and interpolator reset is independent of queue state.

* feat: register SO follower/leader configs in HIL script

Adds SOFollowerRobotConfig and SOLeaderTeleopConfig imports so
SO100/SO101 robots can be used via --robot.type=so_follower
and --teleop.type=so_leader. Updates docs accordingly.

Made-with: Cursor

* docs: remove em dashes from HIL documentation

Made-with: Cursor

* refactor: rename examples/rac to examples/hil

Updates directory name and all references in docs and script docstrings.

Made-with: Cursor

* fix: encorperate pr feedback comments

* refactor(tests): enhance ActionInterpolator test structure and add detailed docstrings

* feedback pr and test fix

* fix(test): pass correct real_delay in interpolator delay test

The test was passing real_delay=0 and relying on _check_delays to
silently override it with the index-based diff. Now passes real_delay=3
to match the 3 actions consumed during the simulated inference period.


* fix pr feedback

* ordering

* update hil script

* fix

* default name

* fix(bi_openarm): use kw_only=True to fix dataclass field ordering

BiOpenArmFollowerConfig overrides `id` with a default, making it
positional in the child — non-default `left_arm_config` then follows a
default field, which Python dataclasses forbid. Adding kw_only=True
(matching the parent RobotConfig) removes positional constraints.

Made-with: Cursor

* style: format long line in hil_data_collection.py

Made-with: Cursor

* pr feedback

---------

Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co>
This commit is contained in:
Pepijn
2026-04-02 19:53:59 +02:00
committed by GitHub
parent 66fef25ded
commit 818892a38b
13 changed files with 2605 additions and 61 deletions
+29
View File
@@ -0,0 +1,29 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Real-Time Chunking (RTC) utilities for action-chunking policies."""
from lerobot.policies.rtc.action_interpolator import ActionInterpolator
from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.rtc.latency_tracker import LatencyTracker
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
__all__ = [
"ActionInterpolator",
"ActionQueue",
"LatencyTracker",
"RTCConfig",
"RTCProcessor",
]
@@ -0,0 +1,116 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Action interpolation for smoother robot control.
Provides configurable Nx control rate by interpolating between consecutive actions.
Useful with RTC and action-chunking policies to reduce jerkiness.
"""
from torch import Tensor
class ActionInterpolator:
"""Interpolates between consecutive actions for smoother control.
When enabled with multiplier N, produces N actions per policy action
by linearly interpolating between the previous and current action.
Example with multiplier=3:
prev_action -> [1/3 interpolated, 2/3 interpolated, current_action]
This effectively multiplies the control rate for smoother motion.
Usage:
interpolator = ActionInterpolator(multiplier=2) # 2x control rate
# In control loop:
if interpolator.needs_new_action():
new_action = queue.get()
if new_action:
interpolator.add(new_action.cpu())
action = interpolator.get()
if action:
robot.send_action(action)
"""
def __init__(self, multiplier: int = 1):
"""Initialize the interpolator.
Args:
multiplier: Control rate multiplier (1 = no interpolation, 2 = 2x, 3 = 3x, etc.)
"""
if multiplier < 1:
raise ValueError(f"multiplier must be >= 1, got {multiplier}")
self.multiplier = multiplier
self._prev: Tensor | None = None
self._buffer: list[Tensor] = []
self._idx = 0
@property
def enabled(self) -> bool:
"""Whether interpolation is active (multiplier > 1)."""
return self.multiplier > 1
def reset(self):
"""Reset interpolation state (call between episodes)."""
self._prev = None
self._buffer = []
self._idx = 0
def needs_new_action(self) -> bool:
"""Check if a new action is needed from the queue."""
return self._idx >= len(self._buffer)
def add(self, action: Tensor) -> None:
"""Add a new action and compute interpolated sequence.
Args:
action: New action tensor from policy/queue (already on CPU).
"""
if self.multiplier > 1 and self._prev is not None:
self._buffer = []
for i in range(1, self.multiplier + 1):
t = i / self.multiplier
interp = self._prev + t * (action - self._prev)
self._buffer.append(interp)
else:
# First step: no previous action yet, so run at base FPS without interpolation.
self._buffer = [action.clone()]
self._prev = action.clone()
self._idx = 0
def get(self) -> Tensor | None:
"""Get the next interpolated action.
Returns:
Next action tensor, or None if buffer is exhausted.
"""
if self._idx >= len(self._buffer):
return None
action = self._buffer[self._idx]
self._idx += 1
return action
def get_control_interval(self, fps: float) -> float:
"""Get the control interval based on interpolation multiplier.
Args:
fps: Base frames per second.
Returns:
Control interval in seconds (divided by multiplier).
"""
return 1.0 / (fps * self.multiplier)
+46 -19
View File
@@ -79,6 +79,13 @@ class ActionQueue:
self.last_index += 1
return action.clone()
def clear(self) -> None:
"""Clear queued actions and reset consumption index."""
with self.lock:
self.queue = None
self.original_queue = None
self.last_index = 0
def qsize(self) -> int:
"""Get the number of remaining actions in the queue.
@@ -123,14 +130,26 @@ class ActionQueue:
with self.lock:
if self.original_queue is None:
return None
return self.original_queue[self.last_index :]
return self.original_queue[self.last_index :].clone()
def get_processed_left_over(self) -> Tensor | None:
"""Get leftover processed actions (the actions currently executed by the robot).
Returns:
Tensor | None: Remaining processed actions (remaining_steps, action_dim),
or None if no processed queue exists.
"""
with self.lock:
if self.queue is None:
return None
return self.queue[self.last_index :].clone()
def merge(
self,
original_actions: Tensor,
processed_actions: Tensor,
real_delay: int,
action_index_before_inference: int | None = 0,
action_index_before_inference: int | None = None,
):
"""Merge new actions into the queue.
@@ -145,10 +164,10 @@ class ActionQueue:
action_index_before_inference: Index before inference started, for validation.
"""
with self.lock:
self._check_delays(real_delay, action_index_before_inference)
delay = self._check_and_resolve_delays(real_delay, action_index_before_inference)
if self.cfg.enabled:
self._replace_actions_queue(original_actions, processed_actions, real_delay)
self._replace_actions_queue(original_actions, processed_actions, delay)
return
self._append_actions_queue(original_actions, processed_actions)
@@ -164,12 +183,13 @@ class ActionQueue:
processed_actions: Post-processed actions for robot.
real_delay: Number of time steps to skip due to inference delay.
"""
self.original_queue = original_actions[real_delay:].clone()
self.queue = processed_actions[real_delay:].clone()
clamped_delay = max(0, min(real_delay, len(original_actions), len(processed_actions)))
self.original_queue = original_actions[clamped_delay:].clone()
self.queue = processed_actions[clamped_delay:].clone()
logger.debug(f"original_actions shape: {self.original_queue.shape}")
logger.debug(f"processed_actions shape: {self.queue.shape}")
logger.debug(f"real_delay: {real_delay}")
logger.debug(f"real_delay: {real_delay}, clamped_delay: {clamped_delay}")
self.last_index = 0
@@ -196,7 +216,9 @@ class ActionQueue:
self.last_index = 0
def _check_delays(self, real_delay: int, action_index_before_inference: int | None = None):
def _check_and_resolve_delays(
self, real_delay: int, action_index_before_inference: int | None = None
) -> int:
"""Validate that computed delays match expectations.
Compares the delay computed from inference latency with the actual
@@ -205,15 +227,20 @@ class ActionQueue:
Args:
real_delay: Delay computed from inference latency.
action_index_before_inference: Action index when inference started.
"""
if action_index_before_inference is None:
return
indexes_diff = self.last_index - action_index_before_inference
if indexes_diff != real_delay:
# Let's check that action index difference (real delay calculated based on action queue)
# is the same as delay calculated based on inference latency
logger.warning(
f"[ACTION_QUEUE] Indexes diff is not equal to real delay. "
f"Indexes diff: {indexes_diff}, real delay: {real_delay}"
)
Returns:
int: Delay to use.
"""
effective_delay = max(0, real_delay)
if action_index_before_inference is not None:
indexes_diff = max(0, self.last_index - action_index_before_inference)
if indexes_diff != real_delay:
logger.warning(
"Indexes diff is not equal to real delay. indexes_diff=%d, real_delay=%d",
indexes_diff,
real_delay,
)
return real_delay
return effective_delay
@@ -96,9 +96,11 @@ class BiOpenArmFollower(Robot):
left_arm_motors_ft = self.left_arm._motors_ft
right_arm_motors_ft = self.right_arm._motors_ft
# Right first, then left — matches the teleoperator (OpenArmMini) ordering
# and the dataset feature names recorded during data collection.
return {
**{f"left_{k}": v for k, v in left_arm_motors_ft.items()},
**{f"right_{k}": v for k, v in right_arm_motors_ft.items()},
**{f"left_{k}": v for k, v in left_arm_motors_ft.items()},
}
@property
@@ -150,14 +152,16 @@ class BiOpenArmFollower(Robot):
left_cam_keys = set(self.left_arm.cameras.keys())
right_cam_keys = set(self.right_arm.cameras.keys())
left_obs = self.left_arm.get_observation()
for key, value in left_obs.items():
obs_dict[key if key in left_cam_keys else f"left_{key}"] = value
# Right first, then left — matches the teleoperator (OpenArmMini) ordering
# and the dataset feature names recorded during data collection.
right_obs = self.right_arm.get_observation()
for key, value in right_obs.items():
obs_dict[key if key in right_cam_keys else f"right_{key}"] = value
left_obs = self.left_arm.get_observation()
for key, value in left_obs.items():
obs_dict[key if key in left_cam_keys else f"left_{key}"] = value
return obs_dict
@check_if_not_connected
@@ -183,7 +187,7 @@ class BiOpenArmFollower(Robot):
prefixed_sent_action_left = {f"left_{key}": value for key, value in sent_action_left.items()}
prefixed_sent_action_right = {f"right_{key}": value for key, value in sent_action_right.items()}
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
return {**prefixed_sent_action_right, **prefixed_sent_action_left}
@check_if_not_connected
def disconnect(self):
@@ -23,10 +23,12 @@ from ..config import RobotConfig
@RobotConfig.register_subclass("bi_openarm_follower")
@dataclass
@dataclass(kw_only=True)
class BiOpenArmFollowerConfig(RobotConfig):
"""Configuration class for Bi OpenArm Follower robots."""
id: str | None = "bi_openarm_follower"
left_arm_config: OpenArmFollowerConfigBase
right_arm_config: OpenArmFollowerConfigBase
+79 -23
View File
@@ -74,6 +74,8 @@ from pathlib import Path
from pprint import pformat
from typing import Any
import torch
from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
@@ -90,6 +92,7 @@ from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_featur
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rtc import ActionInterpolator
from lerobot.policies.utils import make_robot_action
from lerobot.processor import (
PolicyAction,
@@ -226,6 +229,9 @@ class RecordConfig:
play_sounds: bool = True
# Resume recording on an existing dataset.
resume: bool = False
# Action interpolation multiplier for smoother policy control (1=off, 2=2x, 3=3x)
# Only applies when using a policy (not teleop)
interpolation_multiplier: int = 1
def __post_init__(self):
# HACK: We parse again the cli args here to get the pretrained path if there was one.
@@ -298,6 +304,7 @@ def record_loop(
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
interpolator: ActionInterpolator | None = None,
display_compressed_images: bool = False,
):
if dataset is not None and dataset.fps != fps:
@@ -334,6 +341,16 @@ def record_loop(
preprocessor.reset()
postprocessor.reset()
# Reset interpolator if provided
if interpolator is not None:
interpolator.reset()
# Calculate control interval based on interpolation
use_interpolation = interpolator is not None and interpolator.enabled and policy is not None
control_interval = interpolator.get_control_interval(fps) if interpolator else 1 / fps
# Pre-compute action key order outside the hot loop — it won't change mid-episode.
action_keys = sorted(robot.action_features) if use_interpolation else []
no_action_count = 0
timestamp = 0
start_episode_t = time.perf_counter()
@@ -353,28 +370,67 @@ def record_loop(
if policy is not None or dataset is not None:
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
# Track whether this iteration should be recorded to the dataset.
# Interpolated-only iterations send actions to the robot but don't record frames,
# keeping the dataset at the original fps while the robot moves at the higher rate.
is_record_frame = True
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
# With interpolation: only call policy when interpolator needs new action
if use_interpolation:
ran_inference = False
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
if interpolator.needs_new_action():
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy = make_robot_action(action_values, dataset.features)
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
action_tensor = torch.tensor([robot_action_to_send[k] for k in action_keys])
interpolator.add(action_tensor)
ran_inference = True
interp_action = interpolator.get()
if interp_action is not None:
robot_action_to_send = {k: interp_action[i].item() for i, k in enumerate(action_keys)}
action_values = robot_action_to_send
else:
continue
is_record_frame = ran_inference
else:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# Applies a pipeline to the action, default is IdentityProcessor
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
elif policy is None and isinstance(teleop, Teleoperator):
act = teleop.get_action()
if robot.name == "unitree_g1":
teleop.send_feedback(obs)
act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
act_processed_teleop = teleop_action_processor((act, obs))
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
elif policy is None and isinstance(teleop, list):
arm_action = teleop_arm.get_action()
@@ -383,6 +439,8 @@ def record_loop(
base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
act_processed_teleop = teleop_action_processor((act, obs))
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
else:
no_action_count += 1
if no_action_count == 1 or no_action_count % 10 == 0:
@@ -393,22 +451,14 @@ def record_loop(
)
continue
# Applies a pipeline to the action, default is IdentityProcessor
if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
else:
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
# Send action to robot
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
_sent_action = robot.send_action(robot_action_to_send)
# Write to dataset
if dataset is not None:
# Write to dataset (only on real policy frames, not interpolated-only iterations)
if dataset is not None and is_record_frame:
action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
frame = {**observation_frame, **action_frame, "task": single_task}
dataset.add_frame(frame)
@@ -420,7 +470,7 @@ def record_loop(
dt_s = time.perf_counter() - start_loop_t
sleep_time_s: float = 1 / fps - dt_s
sleep_time_s: float = control_interval - dt_s
if sleep_time_s < 0:
logging.warning(
f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
@@ -506,6 +556,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
preprocessor = None
postprocessor = None
interpolator = None
if cfg.policy is not None:
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
@@ -516,6 +567,10 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map},
},
)
# Create interpolator for smoother policy control
if cfg.interpolation_multiplier > 1:
interpolator = ActionInterpolator(multiplier=cfg.interpolation_multiplier)
logging.info(f"Action interpolation enabled: {cfg.interpolation_multiplier}x control rate")
robot.connect()
if teleop is not None:
@@ -547,6 +602,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
control_time_s=cfg.dataset.episode_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
interpolator=interpolator,
display_compressed_images=display_compressed_images,
)
@@ -32,9 +32,15 @@ from .config_openarm_mini import OpenArmMiniConfig
logger = logging.getLogger(__name__)
# Motors whose direction is inverted during readout
RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5"]
RIGHT_MOTORS_TO_FLIP = ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"]
LEFT_MOTORS_TO_FLIP = ["joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"]
# Leader joint 6 maps to follower joint 7 and vice versa
JOINT_REMAP = {"joint_6": "joint_7", "joint_7": "joint_6"}
JOINT_REMAP_REVERSE = {"joint_7": "joint_6", "joint_6": "joint_7"}
GRIPPER_TELEOP_TO_DEGREES = -0.65
class OpenArmMini(Teleoperator):
"""
@@ -95,6 +101,8 @@ class OpenArmMini(Teleoperator):
@property
def action_features(self) -> dict[str, type]:
# Right first, then left — matches the robot (BiOpenArmFollower) ordering
# and the dataset feature names recorded during data collection.
features: dict[str, type] = {}
for motor in self.bus_right.motors:
features[f"right_{motor}.pos"] = float
@@ -276,16 +284,70 @@ class OpenArmMini(Teleoperator):
right_positions = self.bus_right.sync_read("Present_Position")
left_positions = self.bus_left.sync_read("Present_Position")
# Right first, then left — matches the robot (BiOpenArmFollower) ordering
# and the dataset feature names recorded during data collection.
# Joint 6↔7 remap: leader joint_6 → follower joint_7 and vice versa.
action: dict[str, Any] = {}
for motor, val in right_positions.items():
action[f"right_{motor}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val
target = JOINT_REMAP.get(motor, motor)
if motor == "gripper":
# Convert gripper from teleop 0-100 to openarms degrees: 0→0°, 100→-65°
action[f"right_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES
else:
action[f"right_{target}.pos"] = -val if motor in RIGHT_MOTORS_TO_FLIP else val
for motor, val in left_positions.items():
action[f"left_{motor}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val
target = JOINT_REMAP.get(motor, motor)
if motor == "gripper":
action[f"left_{target}.pos"] = val * GRIPPER_TELEOP_TO_DEGREES
else:
action[f"left_{target}.pos"] = -val if motor in LEFT_MOTORS_TO_FLIP else val
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def enable_torque(self) -> None:
"""Enable torque on both arms for position control."""
self.bus_right.enable_torque()
self.bus_left.enable_torque()
def disable_torque(self) -> None:
"""Disable torque on both arms for free movement."""
self.bus_right.disable_torque()
self.bus_left.disable_torque()
def write_goal_positions(self, positions: dict[str, float]) -> None:
"""Write goal positions to motors (inverse of get_action flip/gripper/remap logic)."""
right_goals: dict[str, float] = {}
left_goals: dict[str, float] = {}
for key, val in positions.items():
if not key.endswith(".pos"):
continue
motor_name = key.removesuffix(".pos")
if motor_name.startswith("right_"):
base = motor_name.removeprefix("right_")
# Reverse remap: follower joint_7 → leader joint_6 and vice versa
target = JOINT_REMAP_REVERSE.get(base, base)
if base == "gripper":
# Convert robot degrees to teleop 0-100: 0°→0, -65°→100
right_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES
else:
# Un-flip using the ORIGINAL motor name (target = leader motor)
right_goals[target] = -val if target in RIGHT_MOTORS_TO_FLIP else val
elif motor_name.startswith("left_"):
base = motor_name.removeprefix("left_")
target = JOINT_REMAP_REVERSE.get(base, base)
if base == "gripper":
left_goals[target] = val / GRIPPER_TELEOP_TO_DEGREES
else:
left_goals[target] = -val if target in LEFT_MOTORS_TO_FLIP else val
if right_goals:
self.bus_right.sync_write("Goal_Position", right_goals)
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm Mini.")