mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-15 00:29:52 +00:00
finish unitree camera integration
This commit is contained in:
@@ -43,6 +43,11 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
|
||||
|
||||
cameras[key] = Reachy2Camera(cfg)
|
||||
|
||||
elif cfg.type == "zmq":
|
||||
from .zmq import ZMQCamera
|
||||
|
||||
cameras[key] = ZMQCamera(cfg)
|
||||
|
||||
else:
|
||||
try:
|
||||
cameras[key] = cast(Camera, make_device_from_device_class(cfg))
|
||||
|
||||
@@ -14,6 +14,3 @@
|
||||
|
||||
from .camera_zmq import ZMQCamera
|
||||
from .configuration_zmq import ZMQCameraConfig
|
||||
|
||||
__all__ = ["ZMQCamera", "ZMQCameraConfig"]
|
||||
|
||||
|
||||
@@ -48,3 +48,5 @@ class UnitreeG1Config(RobotConfig):
|
||||
freeze_body = False
|
||||
|
||||
gravity_compensation = True
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
|
||||
@@ -72,9 +72,17 @@ class DataBuffer:
|
||||
#motor class for unitree?
|
||||
#TODO: camera, sim
|
||||
class UnitreeG1(Robot):
|
||||
def __init__(self, config: UnitreeG1Config):
|
||||
|
||||
config_class = UnitreeG1Config
|
||||
name = "unitree_g1"
|
||||
|
||||
def __init__(self, config: UnitreeG1Config):
|
||||
super().__init__(config)
|
||||
|
||||
logger_mp.info("Initialize UnitreeG1...")
|
||||
|
||||
self.config = config
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
self.q_target = np.zeros(14)
|
||||
self.tauff_target = np.zeros(14)
|
||||
self.motion_mode = config.motion_mode
|
||||
@@ -315,14 +323,28 @@ class UnitreeG1(Robot):
|
||||
pass
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
pass
|
||||
# Connect cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
logger_mp.info(f"{self} connected with {len(self.cameras)} camera(s).")
|
||||
|
||||
def disconnect(self):
|
||||
pass
|
||||
# Disconnect cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
logger_mp.info(f"{self} disconnected.")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
obs_array = self.get_current_dual_arm_q()
|
||||
obs_dict = {f"{G1_29_JointArmIndex(motor).name}.pos": val for motor, val in zip(G1_29_JointArmIndex, obs_array, strict=True)}
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger_mp.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
@property
|
||||
@@ -331,11 +353,21 @@ class UnitreeG1(Robot):
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
pass
|
||||
return all(cam.is_connected for cam in self.cameras.values())
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{G1_29_JointArmIndex(motor).name}.pos": float for motor in G1_29_JointArmIndex}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {f"{G1_29_JointArmIndex(motor).name}.pos": float for motor in G1_29_JointArmIndex}
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
#need a any to any teleoperator solution. i wanna teleoperate a horse with a shoe. action
|
||||
@@ -343,16 +375,13 @@ class UnitreeG1(Robot):
|
||||
#also what would be fun is finding all sorts of robots and adding them to lerobot, see if people do the same.
|
||||
#then teleop them wiuth the glove hehe
|
||||
#then we get ALL THE DATA
|
||||
print(self.calibration)
|
||||
print(action)
|
||||
if self.is_calibrated:
|
||||
action = self.invert_calibration(action)
|
||||
#check if action is within bounds
|
||||
for key, value in action.items():
|
||||
if value < self.calibration[key]["range_min"] or value > self.calibration[key]["range_max"]:
|
||||
raise ValueError(f"Action value {value} for {key} is out of bounds, actions are not normalized")
|
||||
print(action)
|
||||
exit()
|
||||
|
||||
if self.freeze_body:
|
||||
arm_joint_indices = set(range(15, 29)) # 15–28 are arms
|
||||
for jid in G1_29_JointIndex:
|
||||
|
||||
@@ -70,6 +70,7 @@ from lerobot.cameras import ( # noqa: F401
|
||||
)
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
|
||||
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
|
||||
from lerobot.cameras.zmq.configuration_zmq import ZMQCameraConfig # noqa: F401
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.datasets.image_writer import safe_stop_image_writer
|
||||
@@ -98,6 +99,7 @@ from lerobot.robots import ( # noqa: F401
|
||||
make_robot_from_config,
|
||||
so100_follower,
|
||||
so101_follower,
|
||||
unitree_g1,
|
||||
)
|
||||
from lerobot.teleoperators import ( # noqa: F401
|
||||
Teleoperator,
|
||||
@@ -196,7 +198,7 @@ class RecordConfig:
|
||||
self.policy.pretrained_path = policy_path
|
||||
|
||||
if self.teleop is None and self.policy is None:
|
||||
raise ValueError("Choose a policy, a teleoperator or both to control the robot")
|
||||
logging.warning("No teleop or policy provided: running in idle (neutral actions) mode.")
|
||||
|
||||
@classmethod
|
||||
def __get_path_fields__(cls) -> list[str]:
|
||||
@@ -301,6 +303,9 @@ def record_loop(
|
||||
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
|
||||
obs_processed = robot_observation_processor(obs)
|
||||
|
||||
act_processed_policy = None
|
||||
act_processed_teleop = None
|
||||
|
||||
if policy is not None or dataset is not None:
|
||||
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
|
||||
|
||||
@@ -332,6 +337,11 @@ def record_loop(
|
||||
base_action = robot._from_keyboard_to_base_action(keyboard_action)
|
||||
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
act_processed_teleop = teleop_action_processor((act, obs))
|
||||
elif policy is None and teleop is None:
|
||||
# ⬅️ Neutral/idle actions: record without policy or teleop
|
||||
# Build a zero action in the robot's action space
|
||||
neutral = {k: 0.0 for k in robot.action_features.keys()}
|
||||
act_processed_teleop = teleop_action_processor((neutral, obs))
|
||||
else:
|
||||
logging.info(
|
||||
"No policy or teleoperator provided, skipping action generation."
|
||||
|
||||
Reference in New Issue
Block a user