mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-24 13:09:43 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 05764da0f1 | |||
| f40a36fe82 | |||
| 9b538d6cbf | |||
| a59900a339 | |||
| 5cea61708d |
@@ -0,0 +1,191 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""SO100 leader / follower teleop with HIL-SERL-style intervention toggle.
|
||||
|
||||
Position-only standalone demo of the leader-arm intervention pattern used by
|
||||
PR #2596's HIL-SERL training stack (see
|
||||
``lerobot.processor.LeaderFollowerProcessor`` and
|
||||
``lerobot.teleoperators.so_leader.SO101LeaderFollower``). Compared to the
|
||||
verbatim PR #2596 example (which builds the full ``EEReferenceAndDelta`` ->
|
||||
``EEBoundsAndSafety`` -> ``GripperVelocityToJoint`` -> ``InverseKinematicsRLStep``
|
||||
pipeline), this version computes the EE delta and the IK target inline against
|
||||
the follower's *measured* pose every tick. That removes the latched-reference
|
||||
feedback loop and produces noticeably smoother haptic teleop.
|
||||
|
||||
Behaviour:
|
||||
* **Following mode** (default): the follower is idle, the leader is
|
||||
torque-enabled with low PID gains and haptically tracks the follower.
|
||||
The user can grab the leader at any time without fighting the position
|
||||
loop.
|
||||
* **Intervention mode** (toggled by pressing SPACE): the leader's torque
|
||||
is released, the user moves the leader freely, and the follower mirrors
|
||||
the leader's end-effector position via ``[delta_x, delta_y, delta_z]``
|
||||
deltas, plus a direct gripper passthrough. This matches the action
|
||||
space recorded by ``LeaderFollowerProcessor`` during HIL-SERL recording.
|
||||
|
||||
Keyboard:
|
||||
* ``SPACE`` -- toggle intervention on/off.
|
||||
* ``ESC`` -- terminate (treated as failure event).
|
||||
* ``s`` -- mark current intervention as success.
|
||||
* ``r`` -- request re-record of current episode.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
|
||||
from lerobot.teleoperators.so_leader import SO101LeaderConfig, SO101LeaderFollower
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.utils.robot_utils import precise_sleep
|
||||
|
||||
FPS = 30
|
||||
|
||||
# Per-axis EE-delta normalisation (metres). The clamped delta is
|
||||
# ``clip((p_leader - p_follower) / step, -1, 1) * step``, so a single tick is
|
||||
# bounded by ``step`` in metres. Keep small for safe motion.
|
||||
EE_STEP_SIZES = {"x": 0.010, "y": 0.010, "z": 0.010}
|
||||
|
||||
# Workspace bounds (metres) - tight box around the rest pose to keep the
|
||||
# follower from running into joint limits during the demo. Adjust to your
|
||||
# workspace.
|
||||
EE_BOUNDS = {
|
||||
"min": np.array([-0.20, -0.30, 0.02]),
|
||||
"max": np.array([0.30, 0.30, 0.40]),
|
||||
}
|
||||
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
URDF_PATH = "./SO101/so101_new_calib.urdf"
|
||||
TARGET_FRAME = "gripper_frame_link"
|
||||
|
||||
# Set these to the actual ports on your machine.
|
||||
FOLLOWER_PORT = "/dev/usb_follower_arm_a"
|
||||
LEADER_PORT = "/dev/usb_leader_arm_a"
|
||||
|
||||
|
||||
def _joints_dict_to_array(joints: dict[str, float], motor_names: list[str]) -> np.ndarray:
|
||||
return np.array([joints[f"{m}.pos"] for m in motor_names], dtype=float)
|
||||
|
||||
|
||||
def _array_to_joints_dict(arr: np.ndarray, motor_names: list[str]) -> dict[str, float]:
|
||||
return {f"{m}.pos": float(v) for m, v in zip(motor_names, arr, strict=True)}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
follower_config = SO100FollowerConfig(port=FOLLOWER_PORT, id="my_follower_arm", use_degrees=True)
|
||||
leader_config = SO101LeaderConfig(
|
||||
port=LEADER_PORT,
|
||||
id="my_leader_arm",
|
||||
use_degrees=True,
|
||||
leader_follower_mode=True,
|
||||
use_gripper=True,
|
||||
)
|
||||
|
||||
follower = SO100Follower(follower_config)
|
||||
leader = SO101LeaderFollower(leader_config)
|
||||
|
||||
follower_motor_names = list(follower.bus.motors.keys())
|
||||
leader_motor_names = list(leader.bus.motors.keys())
|
||||
|
||||
follower_kinematics = RobotKinematics(
|
||||
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=follower_motor_names
|
||||
)
|
||||
leader_kinematics = RobotKinematics(
|
||||
urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=leader_motor_names
|
||||
)
|
||||
|
||||
follower.connect()
|
||||
leader.connect()
|
||||
|
||||
print("Starting leader-follower intervention demo...")
|
||||
print(" - Press SPACE to toggle intervention.")
|
||||
print(" - Press ESC to terminate, 's' for success, 'r' to re-record.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# 1. Read both arms.
|
||||
follower_obs = follower.get_observation()
|
||||
follower_joints_dict = {f"{m}.pos": float(follower_obs[f"{m}.pos"]) for m in follower_motor_names}
|
||||
leader_joints_dict = leader.get_action()
|
||||
|
||||
# 2. Haptic follow: push follower joints back to the leader. The
|
||||
# leader's ``send_action`` gates motor writes on its intervention
|
||||
# state internally (torque on while following, off while
|
||||
# intervening), so this call is safe in both modes.
|
||||
leader.send_action(follower_joints_dict)
|
||||
|
||||
# 3. Pull teleop events from the leader's keyboard listener.
|
||||
events = leader.get_teleop_events()
|
||||
if events.get(TeleopEvents.TERMINATE_EPISODE):
|
||||
print("Termination requested -- exiting.")
|
||||
break
|
||||
|
||||
is_intervention = events.get(TeleopEvents.IS_INTERVENTION, False)
|
||||
|
||||
if is_intervention:
|
||||
# 4a. INTERVENTION: take normalised position-only delta against
|
||||
# the follower's *measured* pose every tick (no latched
|
||||
# reference, no compounding lag), integrate onto the follower's
|
||||
# current EE pose, clip to the workspace, then IK.
|
||||
leader_arr = _joints_dict_to_array(leader_joints_dict, leader_motor_names)
|
||||
follower_arr = _joints_dict_to_array(follower_joints_dict, follower_motor_names)
|
||||
|
||||
p_leader = leader_kinematics.forward_kinematics(leader_arr)[:3, 3]
|
||||
p_follower_mat = follower_kinematics.forward_kinematics(follower_arr)
|
||||
p_follower = p_follower_mat[:3, 3]
|
||||
|
||||
step_vec = np.array([EE_STEP_SIZES["x"], EE_STEP_SIZES["y"], EE_STEP_SIZES["z"]], dtype=float)
|
||||
raw_delta = p_leader - p_follower
|
||||
delta_norm = np.clip(raw_delta / step_vec, -1.0, 1.0)
|
||||
delta_m = delta_norm * step_vec
|
||||
|
||||
target_pose = p_follower_mat.copy()
|
||||
target_pose[:3, 3] = np.clip(p_follower + delta_m, EE_BOUNDS["min"], EE_BOUNDS["max"])
|
||||
|
||||
# IK -> joint-space goal for the follower's arm chain. Position
|
||||
# only (orientation_weight=0.0) keeps it stable under the
|
||||
# rotation-noise that would otherwise come from leader FK.
|
||||
target_joints = follower_kinematics.inverse_kinematics(
|
||||
current_joint_pos=follower_arr,
|
||||
desired_ee_pose=target_pose,
|
||||
orientation_weight=0.0,
|
||||
)
|
||||
follower_action = _array_to_joints_dict(target_joints, follower_motor_names)
|
||||
# Gripper passthrough: leader gripper position drives follower
|
||||
# gripper directly (no IK).
|
||||
follower_action["gripper.pos"] = float(leader_joints_dict.get("gripper.pos", 50.0))
|
||||
follower.send_action(follower_action)
|
||||
|
||||
# 4b. FOLLOWING: leave the follower alone -- the leader haptically
|
||||
# tracks it via the ``leader.send_action`` call above. In real
|
||||
# HIL-SERL training this is where the policy would step the
|
||||
# follower forward.
|
||||
|
||||
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
|
||||
finally:
|
||||
leader.disconnect()
|
||||
follower.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -299,6 +299,14 @@ class HILSerlProcessorConfig:
|
||||
inverse_kinematics: InverseKinematicsConfig | None = None
|
||||
reward_classifier: RewardClassifierConfig | None = None
|
||||
max_gripper_pos: float | None = 100.0
|
||||
# Only used when ``control_mode == "leader"``. ``LeaderFollowerProcessor``
|
||||
# always builds the PR #2596 **7-D** vector ``[dx,dy,dz,wx,wy,wz,gripper]``.
|
||||
# When ``False`` (default), rotation is **disabled** (components 3–5 are
|
||||
# zeroed); when ``True``, full rotation deltas are used (requires
|
||||
# ``wx/wy/wz`` in ``inverse_kinematics.end_effector_step_sizes``). The
|
||||
# intervention step then turns that into a 4-D policy tensor when rotation
|
||||
# is disabled, matching the gamepad pipeline.
|
||||
use_rotation: bool = False
|
||||
|
||||
|
||||
@EnvConfig.register_subclass(name="gym_manipulator")
|
||||
|
||||
@@ -61,6 +61,7 @@ from .hil_processor import (
|
||||
RewardClassifierProcessorStep,
|
||||
TimeLimitProcessorStep,
|
||||
)
|
||||
from .leader_follower_processor import LeaderFollowerProcessor
|
||||
from .newline_task_processor import NewLineTaskProcessorStep
|
||||
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep, hotswap_stats
|
||||
from .observation_processor import VanillaObservationProcessorStep
|
||||
@@ -122,6 +123,7 @@ __all__ = [
|
||||
"ImageCropResizeProcessorStep",
|
||||
"InfoProcessorStep",
|
||||
"InterventionActionProcessorStep",
|
||||
"LeaderFollowerProcessor",
|
||||
"make_default_processors",
|
||||
"make_default_teleop_action_processor",
|
||||
"make_default_robot_action_processor",
|
||||
|
||||
@@ -443,6 +443,39 @@ class GripperPenaltyProcessorStep(ProcessorStep):
|
||||
return features
|
||||
|
||||
|
||||
def _ndarray_intervention_to_action_list(
|
||||
flat: np.ndarray, use_rotation: bool, use_gripper: bool
|
||||
) -> list[float]:
|
||||
"""Flatten ``LeaderFollowerProcessor`` / policy outputs into a policy action list.
|
||||
|
||||
PR #2596 leader mode always produces 7 elements ``[dx,dy,dz,wx,wy,wz,g]``. When
|
||||
``use_rotation`` is False, rotation is disabled (zeroed in the 7-D vector) and
|
||||
we still emit a 4-D tensor ``[dx,dy,dz,g]`` for the rest of the pipeline.
|
||||
"""
|
||||
n = int(flat.size)
|
||||
if not use_rotation and use_gripper and n == 7:
|
||||
return [float(flat[0]), float(flat[1]), float(flat[2]), float(flat[6])]
|
||||
if not use_rotation and not use_gripper and n == 6:
|
||||
return [float(flat[0]), float(flat[1]), float(flat[2])]
|
||||
return flat.tolist()
|
||||
|
||||
|
||||
def _tensor_intervention_to_action_list(
|
||||
flat: torch.Tensor, use_rotation: bool, use_gripper: bool
|
||||
) -> list[float]:
|
||||
n = int(flat.numel())
|
||||
if not use_rotation and use_gripper and n == 7:
|
||||
return [
|
||||
float(flat[0].item()),
|
||||
float(flat[1].item()),
|
||||
float(flat[2].item()),
|
||||
float(flat[6].item()),
|
||||
]
|
||||
if not use_rotation and not use_gripper and n == 6:
|
||||
return [float(flat[0].item()), float(flat[1].item()), float(flat[2].item())]
|
||||
return [float(x.item()) for x in flat]
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("intervention_action_processor")
|
||||
class InterventionActionProcessorStep(ProcessorStep):
|
||||
@@ -455,11 +488,16 @@ class InterventionActionProcessorStep(ProcessorStep):
|
||||
|
||||
Attributes:
|
||||
use_gripper: Whether to include the gripper in the teleoperated action.
|
||||
use_rotation: For dict-based teleop actions, whether to include delta_wx/y/z.
|
||||
For 7-D ndarray/tensors from ``LeaderFollowerProcessor``, when
|
||||
``False`` the policy action is sliced to ``[dx,dy,dz,gripper]``.
|
||||
terminate_on_success: If True, automatically sets the `done` flag when a
|
||||
`success` event is received.
|
||||
"""
|
||||
|
||||
use_gripper: bool = False
|
||||
use_rotation: bool = False
|
||||
gripper_neutral_action: float = 1.0
|
||||
terminate_on_success: bool = True
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
@@ -497,10 +535,22 @@ class InterventionActionProcessorStep(ProcessorStep):
|
||||
teleop_action.get("delta_y", 0.0),
|
||||
teleop_action.get("delta_z", 0.0),
|
||||
]
|
||||
if self.use_rotation:
|
||||
action_list.extend(
|
||||
[
|
||||
teleop_action.get("delta_wx", 0.0),
|
||||
teleop_action.get("delta_wy", 0.0),
|
||||
teleop_action.get("delta_wz", 0.0),
|
||||
]
|
||||
)
|
||||
if self.use_gripper:
|
||||
action_list.append(teleop_action.get(GRIPPER_KEY, 1.0))
|
||||
action_list.append(teleop_action.get(GRIPPER_KEY, self.gripper_neutral_action))
|
||||
elif isinstance(teleop_action, torch.Tensor):
|
||||
flat = teleop_action.detach().flatten()
|
||||
action_list = _tensor_intervention_to_action_list(flat, self.use_rotation, self.use_gripper)
|
||||
elif isinstance(teleop_action, np.ndarray):
|
||||
action_list = teleop_action.tolist()
|
||||
flat = np.asarray(teleop_action).reshape(-1)
|
||||
action_list = _ndarray_intervention_to_action_list(flat, self.use_rotation, self.use_gripper)
|
||||
else:
|
||||
action_list = teleop_action
|
||||
|
||||
@@ -536,6 +586,8 @@ class InterventionActionProcessorStep(ProcessorStep):
|
||||
"""
|
||||
return {
|
||||
"use_gripper": self.use_gripper,
|
||||
"use_rotation": self.use_rotation,
|
||||
"gripper_neutral_action": self.gripper_neutral_action,
|
||||
"terminate_on_success": self.terminate_on_success,
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,255 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
from lerobot.robots import Robot
|
||||
from lerobot.teleoperators import Teleoperator
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.utils.rotation import Rotation
|
||||
|
||||
from .pipeline import ProcessorStep
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("leader_follower_processor")
|
||||
@dataclass
|
||||
class LeaderFollowerProcessor(ProcessorStep):
|
||||
"""
|
||||
Processor for leader-follower teleoperation mode.
|
||||
|
||||
This processor:
|
||||
1. Sends follower positions to leader arm when not intervening
|
||||
2. Computes EE delta actions from leader when intervening
|
||||
3. Handles teleop events from the leader device
|
||||
"""
|
||||
|
||||
leader_device: Teleoperator
|
||||
motor_names: list[str]
|
||||
robot: Robot
|
||||
kinematics: RobotKinematics
|
||||
end_effector_step_sizes: np.ndarray | None = None
|
||||
use_gripper: bool = True
|
||||
# PR #2596 always produces a **7-D** intervention vector ``[dx, dy, dz, wx,
|
||||
# wy, wz, gripper]`` (normalised to ~[-1, 1] per axis). When ``use_rotation``
|
||||
# is False, the middle three components are **zeroed** (rotation disabled,
|
||||
# not removed): the same tensor layout and code path as PR #2596, but the
|
||||
# downstream policy / IK only sees position + gripper. ``wx/wy/wz`` step
|
||||
# sizes are only read when ``use_rotation`` is True.
|
||||
use_rotation: bool = True
|
||||
# prev_leader_gripper: float | None = None
|
||||
max_gripper_pos: float = 100.0
|
||||
use_ik_solution: bool = False
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
"""Process transition with leader-follower logic."""
|
||||
# Get current follower position from complementary data
|
||||
# raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions")
|
||||
raw_joint_pos = transition.get(TransitionKey.OBSERVATION)
|
||||
if raw_joint_pos is not None:
|
||||
# Send follower position to leader (for follow mode)
|
||||
# follower_action = {
|
||||
# f"{motor}.pos": float(raw_joint_pos[motor])
|
||||
# for motor in self.motor_names
|
||||
# }
|
||||
self.leader_device.send_action(raw_joint_pos)
|
||||
|
||||
# Only compute EE action if intervention is active
|
||||
# (AddTeleopEventsAsInfo already added IS_INTERVENTION to info)
|
||||
info = transition.get(TransitionKey.INFO, {})
|
||||
if info.get(TeleopEvents.IS_INTERVENTION, False):
|
||||
# Get leader joint positions from teleop_action
|
||||
# (AddTeleopActionAsComplimentaryData already got the action)
|
||||
complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
teleop_action = complementary.get("teleop_action", {})
|
||||
|
||||
if isinstance(teleop_action, dict) and raw_joint_pos is not None:
|
||||
leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names])
|
||||
|
||||
leader_ee = self.kinematics.forward_kinematics(leader_pos)
|
||||
|
||||
if self.use_ik_solution and "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
|
||||
follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
|
||||
else:
|
||||
follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
|
||||
|
||||
follower_ee = self.kinematics.forward_kinematics(follower_pos)
|
||||
|
||||
# follower_gripper_pos = raw_joint_pos["gripper.pos"]
|
||||
follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor
|
||||
|
||||
leader_ee_pos = leader_ee[:3, 3]
|
||||
leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec()
|
||||
leader_gripper_pos = np.clip(
|
||||
teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos
|
||||
)
|
||||
|
||||
follower_ee_pos = follower_ee[:3, 3]
|
||||
# follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec()
|
||||
|
||||
delta_pos = leader_ee_pos - follower_ee_pos
|
||||
delta_gripper = leader_gripper_pos - follower_gripper_pos
|
||||
|
||||
# Normalise position to ~[-1, 1] per axis (PR #2596).
|
||||
step_xyz = np.array(
|
||||
[
|
||||
self.end_effector_step_sizes["x"],
|
||||
self.end_effector_step_sizes["y"],
|
||||
self.end_effector_step_sizes["z"],
|
||||
]
|
||||
)
|
||||
delta_pos = delta_pos / step_xyz
|
||||
max_normalized_pos = max(
|
||||
abs(delta_pos[0]),
|
||||
abs(delta_pos[1]),
|
||||
abs(delta_pos[2]),
|
||||
)
|
||||
|
||||
# Relative rotation follower -> leader (same construction as PR #2596).
|
||||
r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3]
|
||||
delta_rvec = Rotation.from_matrix(r_delta).as_rotvec()
|
||||
|
||||
if self.use_rotation:
|
||||
desired = np.eye(4, dtype=float)
|
||||
desired[:3, :3] = follower_ee[:3, :3] @ r_delta
|
||||
desired[:3, 3] = follower_ee[:3, 3] + delta_pos * step_xyz
|
||||
|
||||
pos = desired[:3, 3]
|
||||
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
|
||||
|
||||
assert np.allclose(pos, leader_ee_pos), "Position delta computation error"
|
||||
assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error"
|
||||
assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), (
|
||||
"Gripper delta computation error"
|
||||
)
|
||||
|
||||
delta_rvec = delta_rvec / np.array(
|
||||
[
|
||||
self.end_effector_step_sizes["wx"],
|
||||
self.end_effector_step_sizes["wy"],
|
||||
self.end_effector_step_sizes["wz"],
|
||||
]
|
||||
)
|
||||
normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2]))
|
||||
max_normalized = max(max_normalized_pos, normalized_rot)
|
||||
if max_normalized > 1.0:
|
||||
delta_pos = delta_pos / max_normalized
|
||||
delta_rvec = delta_rvec / max_normalized
|
||||
else:
|
||||
# Rotation **disabled**: keep PR #2596 joint scaling on position only.
|
||||
if max_normalized_pos > 1.0:
|
||||
delta_pos = delta_pos / max_normalized_pos
|
||||
delta_rvec = np.zeros(3, dtype=float)
|
||||
|
||||
grip_norm = (
|
||||
np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos
|
||||
)
|
||||
|
||||
intervention_action = np.array(
|
||||
[
|
||||
delta_pos[0],
|
||||
delta_pos[1],
|
||||
delta_pos[2],
|
||||
delta_rvec[0],
|
||||
delta_rvec[1],
|
||||
delta_rvec[2],
|
||||
grip_norm,
|
||||
],
|
||||
dtype=float,
|
||||
)
|
||||
|
||||
# # Extract leader positions from teleop action dict
|
||||
# # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names])
|
||||
# # follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
|
||||
|
||||
# teleop_action = self.leader_device.bus.sync_read("Present_Position")
|
||||
# raw_joint_pos = self.robot.bus.sync_read("Present_Position")
|
||||
# leader_pos = np.array([teleop_action.get(f"{motor}", 0) for motor in self.motor_names])
|
||||
# follower_pos = np.array([raw_joint_pos[f"{motor}"] for motor in self.motor_names])
|
||||
|
||||
# # Compute EE positions
|
||||
# leader_ee_fi = self.kinematics.forward_kinematics(leader_pos)
|
||||
# leader_ee_pos = leader_ee_fi[:3, 3]
|
||||
# # leader_ee_rot = Rotation.from_matrix(leader_ee_fi[:3, :3]).as_rotvec()
|
||||
# leader_ee = np.concat([leader_ee_pos, [0,0,0]])
|
||||
|
||||
# if "IK_solution" in transition.get(TransitionKey.COMPLEMENTARY_DATA):
|
||||
# follower_ee = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
|
||||
# else:
|
||||
# follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names])
|
||||
# follower_ee_fi = self.kinematics.forward_kinematics(follower_pos)
|
||||
# follower_ee_pos = follower_ee_fi[:3, 3]
|
||||
# # follower_ee_rot = Rotation.from_matrix(follower_ee_fi[:3, :3]).as_rotvec()
|
||||
# follower_ee = np.concat([follower_ee_pos, [0,0,0]])
|
||||
|
||||
# # Compute normalized EE delta
|
||||
# if self.end_effector_step_sizes is not None:
|
||||
# ee_delta = np.clip(
|
||||
# leader_ee - follower_ee,
|
||||
# -self.end_effector_step_sizes,
|
||||
# self.end_effector_step_sizes
|
||||
# )
|
||||
# ee_delta_normalized = ee_delta / self.end_effector_step_sizes
|
||||
# else:
|
||||
# ee_delta_normalized = leader_ee - follower_ee
|
||||
|
||||
# # Handle gripper
|
||||
# if self.use_gripper and len(leader_pos) > 3:
|
||||
# if self.prev_leader_gripper is None:
|
||||
# self.prev_leader_gripper = np.clip(
|
||||
# leader_pos[-1], 0, self.max_gripper_pos
|
||||
# )
|
||||
|
||||
# leader_gripper = leader_pos[-1]
|
||||
# gripper_delta = leader_gripper - self.prev_leader_gripper
|
||||
# normalized_delta = gripper_delta / self.max_gripper_pos
|
||||
|
||||
# # Quantize gripper action
|
||||
# if normalized_delta >= 0.3:
|
||||
# gripper_action = 2
|
||||
# elif normalized_delta <= -0.1:
|
||||
# gripper_action = 0
|
||||
# else:
|
||||
# gripper_action = 1
|
||||
|
||||
# self.prev_leader_gripper = leader_gripper
|
||||
|
||||
# # Create intervention action
|
||||
# intervention_action = np.append(ee_delta_normalized, gripper_action)
|
||||
# else:
|
||||
# intervention_action = ee_delta_normalized
|
||||
|
||||
# # Override teleop_action with computed EE action
|
||||
complementary["teleop_action"] = torch.from_numpy(intervention_action).float()
|
||||
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc]
|
||||
|
||||
return transition
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset leader-follower state."""
|
||||
# self.prev_leader_gripper = None
|
||||
if hasattr(self.leader_device, "reset"):
|
||||
self.leader_device.reset()
|
||||
|
||||
def transform_features(
|
||||
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
|
||||
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
|
||||
return features
|
||||
@@ -39,6 +39,7 @@ from lerobot.processor import (
|
||||
GymHILAdapterProcessorStep,
|
||||
ImageCropResizeProcessorStep,
|
||||
InterventionActionProcessorStep,
|
||||
LeaderFollowerProcessor,
|
||||
MapDeltaActionToRobotActionStep,
|
||||
MapTensorToDeltaActionDictStep,
|
||||
Numpy2TorchActionProcessorStep,
|
||||
@@ -71,6 +72,7 @@ from lerobot.teleoperators import (
|
||||
make_teleoperator_from_config,
|
||||
so_leader, # noqa: F401
|
||||
)
|
||||
from lerobot.teleoperators.so_leader import SO101LeaderFollower
|
||||
from lerobot.teleoperators.teleoperator import Teleoperator
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.utils.constants import ACTION, DONE, OBS_IMAGES, OBS_STATE, REWARD
|
||||
@@ -481,14 +483,53 @@ def make_processors(
|
||||
env_pipeline_steps.append(AddBatchDimensionProcessorStep())
|
||||
env_pipeline_steps.append(DeviceProcessorStep(device=device))
|
||||
|
||||
action_pipeline_steps = [
|
||||
# Get control mode (gamepad / keyboard / leader -- see PR #2596)
|
||||
control_mode = cfg.processor.control_mode if cfg.processor is not None else "gamepad"
|
||||
|
||||
action_pipeline_steps: list = [
|
||||
AddTeleopActionAsComplimentaryDataStep(teleop_device=teleop_device),
|
||||
AddTeleopEventsAsInfoStep(teleop_device=teleop_device),
|
||||
]
|
||||
|
||||
# Leader-follower control mode: leader haptically tracks follower until the
|
||||
# human toggles intervention with SPACE, at which point ``LeaderFollowerProcessor``
|
||||
# builds the PR #2596 **7-D** EE delta tensor. Rotation can be toggled via
|
||||
# ``processor.use_rotation``: when False the ``wx/wy/wz`` channels are zeroed.
|
||||
# ``InterventionActionProcessorStep`` then maps that to either a full 7-D
|
||||
# or 4-D policy action tensor (see helpers in ``hil_processor``).
|
||||
leader_use_rotation = bool(getattr(cfg.processor, "use_rotation", False))
|
||||
if control_mode == "leader":
|
||||
if not isinstance(teleop_device, SO101LeaderFollower):
|
||||
raise ValueError(
|
||||
"Leader control mode requires SO101LeaderFollower teleop device. "
|
||||
"Set `--teleop.type=so101_leader --teleop.leader_follower_mode=true`."
|
||||
)
|
||||
if cfg.processor.inverse_kinematics is None or kinematics_solver is None:
|
||||
raise ValueError(
|
||||
"Leader control mode requires `cfg.processor.inverse_kinematics` and a kinematics solver."
|
||||
)
|
||||
action_pipeline_steps.append(
|
||||
LeaderFollowerProcessor(
|
||||
leader_device=teleop_device,
|
||||
motor_names=motor_names,
|
||||
robot=env.robot,
|
||||
kinematics=kinematics_solver,
|
||||
end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes,
|
||||
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
|
||||
use_rotation=leader_use_rotation,
|
||||
max_gripper_pos=cfg.processor.max_gripper_pos
|
||||
if cfg.processor.max_gripper_pos is not None
|
||||
else 100.0,
|
||||
)
|
||||
)
|
||||
|
||||
action_pipeline_steps.append(
|
||||
InterventionActionProcessorStep(
|
||||
use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False,
|
||||
use_rotation=(control_mode == "leader" and leader_use_rotation),
|
||||
terminate_on_success=terminate_on_success,
|
||||
),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Replace InverseKinematicsProcessor with new kinematic processors
|
||||
if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None:
|
||||
|
||||
@@ -355,12 +355,21 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
|
||||
clip_max: The maximum allowed gripper joint position.
|
||||
discrete_gripper: If True, interpret the input as a discrete class index
|
||||
{0 = close, 1 = stay, 2 = open}, matching `GamepadTeleop.GripperAction`.
|
||||
scale_velocity: If True, scale the continuous gripper velocity by ``clip_max``
|
||||
so a normalized [-1, 1] command produces a meaningful position delta
|
||||
(PR #2596).
|
||||
use_ik_solution: If True, integrate the gripper position on top of the
|
||||
previous IK solution stored in ``complementary_data['IK_solution']``
|
||||
instead of the raw joint observation (PR #2596). Useful for
|
||||
leader-follower haptic teleop where the IK solution is more stable.
|
||||
"""
|
||||
|
||||
speed_factor: float = 20.0
|
||||
clip_min: float = 0.0
|
||||
clip_max: float = 100.0
|
||||
discrete_gripper: bool = False
|
||||
scale_velocity: bool = False
|
||||
use_ik_solution: bool = False
|
||||
|
||||
def action(self, action: RobotAction) -> RobotAction:
|
||||
observation = self.transition.get(TransitionKey.OBSERVATION).copy()
|
||||
@@ -370,10 +379,15 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
|
||||
if observation is None:
|
||||
raise ValueError("Joints observation is require for computing robot kinematics")
|
||||
|
||||
q_raw = np.array(
|
||||
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")],
|
||||
dtype=float,
|
||||
)
|
||||
if self.use_ik_solution and "IK_solution" in self.transition.get(
|
||||
TransitionKey.COMPLEMENTARY_DATA, {}
|
||||
):
|
||||
q_raw = self.transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"]
|
||||
else:
|
||||
q_raw = np.array(
|
||||
[float(v) for k, v in observation.items() if isinstance(k, str) and k.endswith(".pos")],
|
||||
dtype=float,
|
||||
)
|
||||
if q_raw is None:
|
||||
raise ValueError("Joints observation is require for computing robot kinematics")
|
||||
|
||||
@@ -382,6 +396,9 @@ class GripperVelocityToJoint(RobotActionProcessorStep):
|
||||
# Negation accounts for SO100 sign (joint position increases on close).
|
||||
# 0 -> +clip_max (close), 1 -> 0 (stay), 2 -> -clip_max (open)
|
||||
gripper_vel = -(gripper_vel - 1) * self.clip_max
|
||||
elif self.scale_velocity:
|
||||
# Scale a continuous [-1, 1] velocity command into joint-position units.
|
||||
gripper_vel = gripper_vel * self.clip_max
|
||||
|
||||
# Compute desired gripper position
|
||||
delta = gripper_vel * float(self.speed_factor)
|
||||
|
||||
@@ -20,6 +20,7 @@ from .config_so_leader import (
|
||||
SOLeaderConfig,
|
||||
SOLeaderTeleopConfig,
|
||||
)
|
||||
from .so101_leader_follower import SO101LeaderFollower
|
||||
from .so_leader import SO100Leader, SO101Leader, SOLeader
|
||||
|
||||
__all__ = [
|
||||
@@ -27,6 +28,7 @@ __all__ = [
|
||||
"SO100LeaderConfig",
|
||||
"SO101Leader",
|
||||
"SO101LeaderConfig",
|
||||
"SO101LeaderFollower",
|
||||
"SOLeader",
|
||||
"SOLeaderConfig",
|
||||
"SOLeaderTeleopConfig",
|
||||
|
||||
@@ -29,6 +29,14 @@ class SOLeaderConfig:
|
||||
# Whether to use degrees for angles
|
||||
use_degrees: bool = True
|
||||
|
||||
# Enable leader-follower mode where leader can both lead and follow.
|
||||
# When True, ``make_teleoperator_from_config`` returns ``SO101LeaderFollower``
|
||||
# instead of the bare ``SOLeader`` -- see PR #2596.
|
||||
leader_follower_mode: bool = False
|
||||
|
||||
# Whether to include the gripper in the leader-follower action vector.
|
||||
use_gripper: bool = True
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("so101_leader")
|
||||
@TeleoperatorConfig.register_subclass("so100_leader")
|
||||
|
||||
@@ -0,0 +1,259 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from collections import deque
|
||||
from threading import Event, Thread
|
||||
|
||||
import numpy as np
|
||||
|
||||
from lerobot.teleoperators.so_leader.so_leader import SO101Leader
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
|
||||
PYNPUT_AVAILABLE = True
|
||||
try:
|
||||
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
|
||||
logging.info("No DISPLAY set. Skipping pynput import.")
|
||||
raise ImportError("pynput blocked intentionally due to no display.")
|
||||
|
||||
from pynput import keyboard
|
||||
except ImportError:
|
||||
keyboard = None
|
||||
PYNPUT_AVAILABLE = False
|
||||
except Exception as e:
|
||||
keyboard = None
|
||||
PYNPUT_AVAILABLE = False
|
||||
logging.info(f"Could not import pynput: {e}")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SO101LeaderFollower(SO101Leader):
|
||||
"""
|
||||
Extended SO101 Leader that can both lead (human control) and follow (mimic follower).
|
||||
|
||||
This class adds leader-follower functionality where:
|
||||
- In follow mode: The leader arm mimics the follower's position (torque enabled)
|
||||
- In lead mode: Human controls the leader (torque disabled) and provides actions
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(config)
|
||||
|
||||
# Leader-follower state
|
||||
self.is_intervening = False
|
||||
self.leader_torque_enabled = True
|
||||
|
||||
# Tracking error for automatic intervention detection
|
||||
self.leader_tracking_error_queue = deque(maxlen=4)
|
||||
|
||||
# Keyboard event handling
|
||||
self.keyboard_events = {
|
||||
"intervention": False,
|
||||
"success": False,
|
||||
"failure": False,
|
||||
"rerecord": False,
|
||||
}
|
||||
self.keyboard_thread = None
|
||||
self.stop_event = Event()
|
||||
|
||||
# Store last follower position for action computation
|
||||
self.last_follower_pos = None
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
if self.config.use_gripper:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {
|
||||
"delta_x": 0,
|
||||
"delta_y": 1,
|
||||
"delta_z": 2,
|
||||
"delta_wx": 3,
|
||||
"delta_wy": 4,
|
||||
"delta_wz": 5,
|
||||
"gripper": 6,
|
||||
},
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"delta_x": 0,
|
||||
"delta_y": 1,
|
||||
"delta_z": 2,
|
||||
"delta_wx": 3,
|
||||
"delta_wy": 4,
|
||||
"delta_wz": 5,
|
||||
},
|
||||
}
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
"""Connect and configure for leader-follower mode."""
|
||||
super().connect(calibrate)
|
||||
|
||||
# Configure for leader-follower mode with lower gains
|
||||
# Lower gains allow manual intervention without injury risk
|
||||
# self.bus.sync_write("Torque_Enable", 1)
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("P_Coefficient", motor, 16)
|
||||
self.bus.write("I_Coefficient", motor, 0)
|
||||
self.bus.write("D_Coefficient", motor, 16)
|
||||
|
||||
# Start keyboard listener
|
||||
self._start_keyboard_listener()
|
||||
|
||||
print("- Leader-Follower Mode:")
|
||||
print(" - Press SPACE to toggle intervention (leader control)")
|
||||
print(" - When not intervening, leader follows follower position")
|
||||
print(" - When intervening, follower follows leader in end-effector space")
|
||||
print(" - Press 's' to mark episode as success")
|
||||
print(" - Press ESC to end episode as failure")
|
||||
print(" - Press 'r' to re-record episode")
|
||||
|
||||
def _start_keyboard_listener(self):
|
||||
"""Start keyboard listener thread for intervention control."""
|
||||
|
||||
def on_press(key):
|
||||
try:
|
||||
if key == keyboard.Key.space:
|
||||
self.keyboard_events["intervention"] = not self.keyboard_events["intervention"]
|
||||
self.is_intervening = self.keyboard_events["intervention"]
|
||||
state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE"
|
||||
logger.info(f"Toggled to {state}")
|
||||
elif key == keyboard.Key.esc:
|
||||
self.keyboard_events["failure"] = True
|
||||
elif hasattr(key, "char"):
|
||||
if key.char == "s":
|
||||
self.keyboard_events["success"] = True
|
||||
elif key.char == "r":
|
||||
self.keyboard_events["rerecord"] = True
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling key press: {e}")
|
||||
|
||||
def listen():
|
||||
with keyboard.Listener(on_press=on_press) as listener:
|
||||
while not self.stop_event.is_set():
|
||||
time.sleep(0.1)
|
||||
listener.stop()
|
||||
|
||||
self.keyboard_thread = Thread(target=listen, daemon=True)
|
||||
self.keyboard_thread.start()
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> None:
|
||||
"""
|
||||
Send position commands to leader arm (follow mode).
|
||||
|
||||
Args:
|
||||
action: Dictionary of motor positions to command
|
||||
"""
|
||||
# Store follower position for later use
|
||||
self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors])
|
||||
|
||||
if not self.is_intervening:
|
||||
# Follow mode: enable torque and track follower
|
||||
if not self.leader_torque_enabled:
|
||||
self.bus.sync_write("Torque_Enable", 1)
|
||||
self.leader_torque_enabled = True
|
||||
|
||||
# Send follower positions to leader
|
||||
goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors}
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
|
||||
# Track error for automatic intervention detection
|
||||
current_pos = self.bus.sync_read("Present_Position")
|
||||
current_array = np.array([current_pos[motor] for motor in self.bus.motors])
|
||||
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
|
||||
self.leader_tracking_error_queue.append(error)
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
"""
|
||||
Get action from leader arm.
|
||||
|
||||
In follow mode: Returns neutral/current positions
|
||||
In lead mode: Returns actual leader positions for follower to track
|
||||
"""
|
||||
start = time.perf_counter()
|
||||
|
||||
if self.is_intervening:
|
||||
# Lead mode: disable torque if needed and return leader positions
|
||||
if self.leader_torque_enabled:
|
||||
self.bus.sync_write("Torque_Enable", 0)
|
||||
self.leader_torque_enabled = False
|
||||
|
||||
# Get current leader position
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
|
||||
# Track error
|
||||
if self.last_follower_pos is not None:
|
||||
current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors])
|
||||
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
|
||||
self.leader_tracking_error_queue.append(error)
|
||||
else:
|
||||
# Follow mode: return current/neutral positions
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
|
||||
"""Get current keyboard events."""
|
||||
events = {}
|
||||
|
||||
# Map keyboard events to TeleopEvents
|
||||
if self.keyboard_events["success"]:
|
||||
events[TeleopEvents.SUCCESS] = True
|
||||
self.keyboard_events["success"] = False
|
||||
if self.keyboard_events["failure"]:
|
||||
events[TeleopEvents.FAILURE] = True
|
||||
events[TeleopEvents.TERMINATE_EPISODE] = True
|
||||
self.keyboard_events["failure"] = False
|
||||
if self.keyboard_events["rerecord"]:
|
||||
events[TeleopEvents.RERECORD_EPISODE] = True
|
||||
events[TeleopEvents.TERMINATE_EPISODE] = True
|
||||
self.keyboard_events["rerecord"] = False
|
||||
|
||||
# Always report intervention state
|
||||
events[TeleopEvents.IS_INTERVENTION] = self.is_intervening
|
||||
|
||||
return events
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Disconnect and cleanup."""
|
||||
self.stop_event.set()
|
||||
if self.keyboard_thread:
|
||||
self.keyboard_thread.join(timeout=1.0)
|
||||
super().disconnect()
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset leader-follower state."""
|
||||
self.is_intervening = False
|
||||
self.leader_torque_enabled = True
|
||||
self.leader_tracking_error_queue.clear()
|
||||
self.keyboard_events = {
|
||||
"intervention": False,
|
||||
"success": False,
|
||||
"failure": False,
|
||||
"rerecord": False,
|
||||
}
|
||||
@@ -52,7 +52,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
|
||||
|
||||
return SO100Leader(config)
|
||||
elif config.type == "so101_leader":
|
||||
from .so_leader import SO101Leader
|
||||
from .so_leader import SO101Leader, SO101LeaderFollower
|
||||
|
||||
if getattr(config, "leader_follower_mode", False):
|
||||
return SO101LeaderFollower(config)
|
||||
|
||||
return SO101Leader(config)
|
||||
elif config.type == "mock_teleop":
|
||||
|
||||
Reference in New Issue
Block a user