[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot]
2025-03-24 13:16:38 +00:00
committed by AdilZouitine
parent 761a2dbcb3
commit 8e6d5f504c
97 changed files with 1596 additions and 492 deletions
@@ -57,7 +57,9 @@ class OpenCVCameraConfig(CameraConfig):
self.channels = 3
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
raise ValueError(
f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})"
)
@CameraConfig.register_subclass("intelrealsense")
@@ -102,8 +104,12 @@ class IntelRealSenseCameraConfig(CameraConfig):
self.channels = 3
at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None
at_least_one_is_none = self.fps is None or self.width is None or self.height is None
at_least_one_is_not_none = (
self.fps is not None or self.width is not None or self.height is not None
)
at_least_one_is_none = (
self.fps is None or self.width is None or self.height is None
)
if at_least_one_is_not_none and at_least_one_is_none:
raise ValueError(
"For `fps`, `width` and `height`, either all of them need to be set, or none of them, "
@@ -111,4 +117,6 @@ class IntelRealSenseCameraConfig(CameraConfig):
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
raise ValueError(
f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})"
)
@@ -303,7 +303,11 @@ class IntelRealSenseCamera:
if self.fps and self.capture_width and self.capture_height:
# TODO(rcadene): can we set rgb8 directly?
config.enable_stream(
rs.stream.color, self.capture_width, self.capture_height, rs.format.rgb8, self.fps
rs.stream.color,
self.capture_width,
self.capture_height,
rs.format.rgb8,
self.fps,
)
else:
config.enable_stream(rs.stream.color)
@@ -311,7 +315,11 @@ class IntelRealSenseCamera:
if self.use_depth:
if self.fps and self.capture_width and self.capture_height:
config.enable_stream(
rs.stream.depth, self.capture_width, self.capture_height, rs.format.z16, self.fps
rs.stream.depth,
self.capture_width,
self.capture_height,
rs.format.z16,
self.fps,
)
else:
config.enable_stream(rs.stream.depth)
+11 -3
View File
@@ -144,7 +144,9 @@ def save_images_from_cameras(
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock)
config = OpenCVCameraConfig(
camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock
)
camera = OpenCVCamera(config)
camera.connect()
print(
@@ -250,7 +252,9 @@ class OpenCVCamera:
# Retrieve the camera index from a potentially symlinked path
self.camera_index = get_camera_index_from_unix_port(self.port)
else:
raise ValueError(f"Please check the provided camera_index: {self.camera_index}")
raise ValueError(
f"Please check the provided camera_index: {self.camera_index}"
)
# Store the raw (capture) resolution from the config.
self.capture_width = config.width
@@ -314,7 +318,11 @@ class OpenCVCamera:
else cv2.CAP_ANY
)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
camera_idx = (
f"/dev/video{self.camera_index}"
if platform.system() == "Linux"
else self.camera_index
)
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(camera_idx, backend)
@@ -41,7 +41,9 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[C
cameras[key] = OpenCVCamera(cfg)
elif cfg.type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
from lerobot.common.robot_devices.cameras.intelrealsense import (
IntelRealSenseCamera,
)
cameras[key] = IntelRealSenseCamera(cfg)
else:
@@ -58,7 +60,9 @@ def make_camera(camera_type, **kwargs) -> Camera:
return OpenCVCamera(config)
elif camera_type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
from lerobot.common.robot_devices.cameras.intelrealsense import (
IntelRealSenseCamera,
)
config = IntelRealSenseCameraConfig(**kwargs)
return IntelRealSenseCamera(config)
@@ -93,7 +93,9 @@ class RecordControlConfig(ControlConfig):
policy_path = parser.get_path_arg("control.policy")
if policy_path:
cli_overrides = parser.get_cli_overrides("control.policy")
self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides)
self.policy = PreTrainedConfig.from_pretrained(
policy_path, cli_overrides=cli_overrides
)
self.policy.pretrained_path = policy_path
@@ -282,7 +282,10 @@ def control_loop(
if policy is not None:
pred_action = predict_action(
observation, policy, get_safe_torch_device(policy.config.device), policy.config.use_amp
observation,
policy,
get_safe_torch_device(policy.config.device),
policy.config.use_amp,
)
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset.
@@ -23,7 +23,10 @@ import numpy as np
import tqdm
from lerobot.common.robot_devices.motors.configs import DynamixelMotorsBusConfig
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
)
from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0
@@ -23,7 +23,10 @@ import numpy as np
import tqdm
from lerobot.common.robot_devices.motors.configs import FeetechMotorsBusConfig
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
)
from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 0
+3 -1
View File
@@ -30,7 +30,9 @@ class MotorsBus(Protocol):
def write(self): ...
def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig]) -> list[MotorsBus]:
def make_motors_buses_from_configs(
motors_bus_configs: dict[str, MotorsBusConfig],
) -> list[MotorsBus]:
motors_buses = {}
for key, cfg in motors_bus_configs.items():
@@ -69,9 +69,13 @@ class ManipulatorRobotConfig(RobotConfig):
if not cam.mock:
cam.mock = True
if self.max_relative_target is not None and isinstance(self.max_relative_target, Sequence):
if self.max_relative_target is not None and isinstance(
self.max_relative_target, Sequence
):
for name in self.follower_arms:
if len(self.follower_arms[name].motors) != len(self.max_relative_target):
if len(self.follower_arms[name].motors) != len(
self.max_relative_target
):
raise ValueError(
f"len(max_relative_target)={len(self.max_relative_target)} but the follower arm with name {name} has "
f"{len(self.follower_arms[name].motors)} motors. Please make sure that the "
@@ -42,7 +42,9 @@ def run_camera_capture(cameras, images_lock, latest_images_dict, stop_event):
local_dict = {}
for name, cam in cameras.items():
frame = cam.async_read()
ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
ret, buffer = cv2.imencode(
".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]
)
if ret:
local_dict[name] = base64.b64encode(buffer).decode("utf-8")
else:
@@ -61,7 +63,9 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
calib_dir.mkdir(parents=True, exist_ok=True)
calib_file = calib_dir / "main_follower.json"
try:
from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_manual_calibration
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_arm_manual_calibration,
)
except ImportError:
print("[WARNING] Calibration function not available. Skipping calibration.")
return
@@ -72,7 +76,9 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
print(f"[INFO] Loaded calibration from {calib_file}")
else:
print("[INFO] Calibration file not found. Running manual calibration...")
calibration = run_arm_manual_calibration(motors_bus, "lekiwi", "follower_arm", "follower")
calibration = run_arm_manual_calibration(
motors_bus, "lekiwi", "follower_arm", "follower"
)
print(f"[INFO] Calibration complete. Saving to {calib_file}")
with open(calib_file, "w") as f:
json.dump(calibration, f)
@@ -116,7 +122,14 @@ def run_lekiwi(robot_config):
robot = LeKiwi(motors_bus)
# Define the expected arm motor IDs.
arm_motor_ids = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
arm_motor_ids = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
# Disable torque for each arm motor.
for motor in arm_motor_ids:
@@ -130,7 +143,9 @@ def run_lekiwi(robot_config):
images_lock = threading.Lock()
stop_event = threading.Event()
cam_thread = threading.Thread(
target=run_camera_capture, args=(cameras, images_lock, latest_images_dict, stop_event), daemon=True
target=run_camera_capture,
args=(cameras, images_lock, latest_images_dict, stop_event),
daemon=True,
)
cam_thread.start()
@@ -159,7 +174,9 @@ def run_lekiwi(robot_config):
f"[WARNING] Received {len(arm_positions)} arm positions, expected {len(arm_motor_ids)}"
)
else:
for motor, pos in zip(arm_motor_ids, arm_positions, strict=False):
for motor, pos in zip(
arm_motor_ids, arm_positions, strict=False
):
motors_bus.write("Goal_Position", pos, motor)
# Process wheel (base) commands.
if "raw_velocity" in data:
@@ -190,7 +207,9 @@ def run_lekiwi(robot_config):
try:
pos = motors_bus.read("Present_Position", motor)
# Convert the position to a float (or use as is if already numeric).
follower_arm_state.append(float(pos) if not isinstance(pos, (int, float)) else pos)
follower_arm_state.append(
float(pos) if not isinstance(pos, (int, float)) else pos
)
except Exception as e:
print(f"[ERROR] Reading motor {motor} failed: {e}")
@@ -28,7 +28,10 @@ import numpy as np
import torch
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs
from lerobot.common.robot_devices.motors.utils import (
MotorsBus,
make_motors_buses_from_configs,
)
from lerobot.common.robot_devices.robots.configs import ManipulatorRobotConfig
from lerobot.common.robot_devices.robots.utils import get_arm_id
from lerobot.common.robot_devices.utils import (
@@ -25,9 +25,14 @@ import zmq
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.motors.feetech import TorqueMode
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs
from lerobot.common.robot_devices.motors.utils import (
MotorsBus,
make_motors_buses_from_configs,
)
from lerobot.common.robot_devices.robots.configs import LeKiwiRobotConfig
from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_manual_calibration
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_arm_manual_calibration,
)
from lerobot.common.robot_devices.robots.utils import get_arm_id
from lerobot.common.robot_devices.utils import RobotDeviceNotConnectedError
@@ -266,7 +271,9 @@ class MobileManipulator:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
calibration = run_arm_manual_calibration(
arm, self.robot_type, name, arm_type
)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
@@ -296,7 +303,9 @@ class MobileManipulator:
bus.write("Torque_Enable", 0, motor_id)
# Then filter out wheels
arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")}
arm_only_dict = {
k: v for k, v in bus.motors.items() if not k.startswith("wheel_")
}
if not arm_only_dict:
continue
@@ -324,7 +333,11 @@ class MobileManipulator:
socks = dict(poller.poll(15))
if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
# Drain all messages, keep only the last
last_msg = None
@@ -337,7 +350,11 @@ class MobileManipulator:
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
# Decode only the final message
try:
@@ -360,7 +377,9 @@ class MobileManipulator:
if new_arm_state is not None and frames is not None:
self.last_frames = frames
remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32)
remote_arm_state_tensor = torch.tensor(
new_arm_state, dtype=torch.float32
)
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = new_speed
@@ -375,14 +394,21 @@ class MobileManipulator:
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
return frames, present_speed, remote_arm_state_tensor
def _process_present_speed(self, present_speed: dict) -> torch.Tensor:
state_tensor = torch.zeros(3, dtype=torch.int32)
if present_speed:
decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()}
decoded = {
key: MobileManipulator.raw_to_degps(value)
for key, value in present_speed.items()
}
if "1" in decoded:
state_tensor[0] = decoded["1"]
if "2" in decoded:
@@ -395,7 +421,9 @@ class MobileManipulator:
self, record_data: bool = False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise RobotDeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.")
raise RobotDeviceNotConnectedError(
"MobileManipulator is not connected. Run `connect()` first."
)
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
@@ -461,9 +489,15 @@ class MobileManipulator:
body_state = self.wheel_raw_to_body(present_speed)
body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s
body_state_mm = (
body_state[0] * 1000.0,
body_state[1] * 1000.0,
body_state[2],
) # Convert x,y to mm/s
wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32)
combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0)
combined_state_tensor = torch.cat(
(remote_arm_state_tensor, wheel_state_tensor), dim=0
)
obs_dict = {"observation.state": combined_state_tensor}
@@ -620,7 +654,11 @@ class MobileManipulator:
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps]
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
return {
"left_wheel": wheel_raw[0],
"back_wheel": wheel_raw[1],
"right_wheel": wheel_raw[2],
}
def wheel_raw_to_body(
self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125
+3 -1
View File
@@ -72,7 +72,9 @@ def make_robot_from_config(config: RobotConfig):
return ManipulatorRobot(config)
elif isinstance(config, LeKiwiRobotConfig):
from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
from lerobot.common.robot_devices.robots.mobile_manipulator import (
MobileManipulator,
)
return MobileManipulator(config)
else: