refactor(constants, processor): standardize action and observation keys across multiple files (#1808)

- Added new constants for truncated and done states in constants.py.
- Updated references to action and observation keys in pipeline_features.py, converters.py, hil_processor.py, tokenizer_processor.py, and robot_kinematic_processor.py to use the new constants for improved readability and maintainability.
This commit is contained in:
Adil Zouitine
2025-08-31 22:53:13 +02:00
committed by GitHub
parent 574a708950
commit 08fb310eaa
6 changed files with 123 additions and 114 deletions
@@ -20,6 +20,7 @@ import numpy as np
from scipy.spatial.transform import Rotation
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.constants import ACTION, OBS_STATE
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor.pipeline import (
ActionProcessor,
@@ -81,13 +82,13 @@ class EEReferenceAndDelta(ActionProcessor):
# Current pose from FK on measured joints
t_curr = self.kinematics.forward_kinematics(q)
enabled = bool(new_action.pop("action.enabled", 0))
tx = float(new_action.pop("action.target_x", 0.0))
ty = float(new_action.pop("action.target_y", 0.0))
tz = float(new_action.pop("action.target_z", 0.0))
wx = float(new_action.pop("action.target_wx", 0.0))
wy = float(new_action.pop("action.target_wy", 0.0))
wz = float(new_action.pop("action.target_wz", 0.0))
enabled = bool(new_action.pop(f"{ACTION}.enabled", 0))
tx = float(new_action.pop(f"{ACTION}.target_x", 0.0))
ty = float(new_action.pop(f"{ACTION}.target_y", 0.0))
tz = float(new_action.pop(f"{ACTION}.target_z", 0.0))
wx = float(new_action.pop(f"{ACTION}.target_wx", 0.0))
wy = float(new_action.pop(f"{ACTION}.target_wy", 0.0))
wz = float(new_action.pop(f"{ACTION}.target_wz", 0.0))
desired = None
@@ -123,12 +124,12 @@ class EEReferenceAndDelta(ActionProcessor):
# Write action fields
pos = desired[:3, 3]
tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec()
new_action["action.ee.x"] = float(pos[0])
new_action["action.ee.y"] = float(pos[1])
new_action["action.ee.z"] = float(pos[2])
new_action["action.ee.wx"] = float(tw[0])
new_action["action.ee.wy"] = float(tw[1])
new_action["action.ee.wz"] = float(tw[2])
new_action[f"{ACTION}.ee.x"] = float(pos[0])
new_action[f"{ACTION}.ee.y"] = float(pos[1])
new_action[f"{ACTION}.ee.z"] = float(pos[2])
new_action[f"{ACTION}.ee.wx"] = float(tw[0])
new_action[f"{ACTION}.ee.wy"] = float(tw[1])
new_action[f"{ACTION}.ee.wz"] = float(tw[2])
self._prev_enabled = enabled
return new_action
@@ -139,20 +140,20 @@ class EEReferenceAndDelta(ActionProcessor):
self._command_when_disabled = None
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
features.pop("action.enabled", None)
features.pop("action.target_x", None)
features.pop("action.target_y", None)
features.pop("action.target_z", None)
features.pop("action.target_wx", None)
features.pop("action.target_wy", None)
features.pop("action.target_wz", None)
features.pop(f"{ACTION}.enabled", None)
features.pop(f"{ACTION}.target_x", None)
features.pop(f"{ACTION}.target_y", None)
features.pop(f"{ACTION}.target_z", None)
features.pop(f"{ACTION}.target_wx", None)
features.pop(f"{ACTION}.target_wy", None)
features.pop(f"{ACTION}.target_wz", None)
features["action.ee.x"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.ee.y"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.ee.z"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.ee.wx"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.ee.wy"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.ee.wz"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.x"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.y"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.z"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.wx"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.wy"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.ee.wz"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
return features
@@ -180,12 +181,12 @@ class EEBoundsAndSafety(ActionProcessor):
_last_twist: np.ndarray | None = field(default=None, init=False, repr=False)
def action(self, act: dict) -> dict:
x = act.get("action.ee.x", None)
y = act.get("action.ee.y", None)
z = act.get("action.ee.z", None)
wx = act.get("action.ee.wx", None)
wy = act.get("action.ee.wy", None)
wz = act.get("action.ee.wz", None)
x = act.get(f"{ACTION}.ee.x", None)
y = act.get(f"{ACTION}.ee.y", None)
z = act.get(f"{ACTION}.ee.z", None)
wx = act.get(f"{ACTION}.ee.wx", None)
wy = act.get(f"{ACTION}.ee.wy", None)
wz = act.get(f"{ACTION}.ee.wz", None)
if None in (x, y, z, wx, wy, wz):
return act
@@ -207,12 +208,12 @@ class EEBoundsAndSafety(ActionProcessor):
self._last_pos = pos
self._last_twist = twist
act["action.ee.x"] = float(pos[0])
act["action.ee.y"] = float(pos[1])
act["action.ee.z"] = float(pos[2])
act["action.ee.wx"] = float(twist[0])
act["action.ee.wy"] = float(twist[1])
act["action.ee.wz"] = float(twist[2])
act[f"{ACTION}.ee.x"] = float(pos[0])
act[f"{ACTION}.ee.y"] = float(pos[1])
act[f"{ACTION}.ee.z"] = float(pos[2])
act[f"{ACTION}.ee.wx"] = float(twist[0])
act[f"{ACTION}.ee.wy"] = float(twist[1])
act[f"{ACTION}.ee.wz"] = float(twist[2])
return act
def reset(self):
@@ -250,12 +251,12 @@ class InverseKinematicsEEToJoints(ProcessorStep):
act = transition.get(TransitionKey.ACTION) or {}
comp = transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
x = act.get("action.ee.x", None)
y = act.get("action.ee.y", None)
z = act.get("action.ee.z", None)
wx = act.get("action.ee.wx", None)
wy = act.get("action.ee.wy", None)
wz = act.get("action.ee.wz", None)
x = act.get(f"{ACTION}.ee.x", None)
y = act.get(f"{ACTION}.ee.y", None)
z = act.get(f"{ACTION}.ee.z", None)
wx = act.get(f"{ACTION}.ee.wx", None)
wy = act.get(f"{ACTION}.ee.wy", None)
wz = act.get(f"{ACTION}.ee.wz", None)
if None in (x, y, z, wx, wy, wz):
return transition
@@ -285,19 +286,19 @@ class InverseKinematicsEEToJoints(ProcessorStep):
new_act = dict(act)
for i, name in enumerate(self.motor_names):
if name == "gripper":
new_act["observation.state.gripper.pos"] = float(raw["gripper"])
new_act[f"{OBS_STATE}.gripper.pos"] = float(raw["gripper"])
else:
new_act[f"action.{name}.pos"] = float(q_target[i])
new_act[f"{ACTION}.{name}.pos"] = float(q_target[i])
transition[TransitionKey.ACTION] = new_act
if not self.initial_guess_current_joints:
transition[TransitionKey.COMPLEMENTARY_DATA]["reference_joint_positions"] = q_target
return transition
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
features["observation.state.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features["action.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{OBS_STATE}.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
for name in self.motor_names:
features[f"action.{name}.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{ACTION}.{name}.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
return features
@@ -333,12 +334,12 @@ class GripperVelocityToJoint(ProcessorStep):
act = transition.get(TransitionKey.ACTION) or {}
comp = transition.get(TransitionKey.COMPLEMENTARY_DATA) or {}
if "action.gripper" not in act:
if f"{ACTION}.gripper" not in act:
return transition
if "gripper" not in self.motor_names:
new_act = dict(act)
new_act.pop("action.gripper", None)
new_act.pop(f"{ACTION}.gripper", None)
transition[TransitionKey.ACTION] = new_act
return transition
@@ -346,32 +347,32 @@ class GripperVelocityToJoint(ProcessorStep):
# Discrete gripper actions are in [0, 1, 2]
# 0: open, 1: close, 2: stay
# We need to shift them to [-1, 0, 1] and then scale them to clip_max
gripper_action = act.get("action.gripper", 1.0)
gripper_action = act.get(f"{ACTION}.gripper", 1.0)
gripper_action = gripper_action - 1.0
gripper_action *= self.clip_max
act["action.gripper"] = gripper_action
act[f"{ACTION}.gripper"] = gripper_action
# Get current gripper position from complementary data
raw = comp.get("raw_joint_positions") or {}
curr_pos = float(raw.get("gripper"))
# Compute desired gripper velocity
u = float(act.get("action.gripper", 0.0))
u = float(act.get(f"{ACTION}.gripper", 0.0))
delta = u * float(self.speed_factor)
gripper_pos = float(np.clip(curr_pos + delta, self.clip_min, self.clip_max))
new_act = dict(act)
new_act["action.gripper.pos"] = gripper_pos
new_act.pop("action.gripper", None)
new_act[f"{ACTION}.gripper.pos"] = gripper_pos
new_act.pop(f"{ACTION}.gripper", None)
transition[TransitionKey.ACTION] = new_act
obs["observation.state.gripper.pos"] = curr_pos
obs[f"{OBS_STATE}.gripper.pos"] = curr_pos
transition[TransitionKey.OBSERVATION] = obs
return transition
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
features.pop("action.gripper", None)
features["action.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features.pop(f"{ACTION}.gripper", None)
features[f"{ACTION}.gripper.pos"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
return features
@@ -396,26 +397,26 @@ class ForwardKinematicsJointsToEE(ObservationProcessor):
motor_names: list[str]
def observation(self, obs: dict) -> dict:
if not all(f"observation.state.{n}.pos" in obs for n in self.motor_names):
if not all(f"{OBS_STATE}.{n}.pos" in obs for n in self.motor_names):
return obs
q = np.array([obs[f"observation.state.{n}.pos"] for n in self.motor_names], dtype=float)
q = np.array([obs[f"{OBS_STATE}.{n}.pos"] for n in self.motor_names], dtype=float)
t = self.kinematics.forward_kinematics(q)
pos = t[:3, 3]
tw = Rotation.from_matrix(t[:3, :3]).as_rotvec()
obs["observation.state.ee.x"] = float(pos[0])
obs["observation.state.ee.y"] = float(pos[1])
obs["observation.state.ee.z"] = float(pos[2])
obs["observation.state.ee.wx"] = float(tw[0])
obs["observation.state.ee.wy"] = float(tw[1])
obs["observation.state.ee.wz"] = float(tw[2])
obs[f"{OBS_STATE}.ee.x"] = float(pos[0])
obs[f"{OBS_STATE}.ee.y"] = float(pos[1])
obs[f"{OBS_STATE}.ee.z"] = float(pos[2])
obs[f"{OBS_STATE}.ee.wx"] = float(tw[0])
obs[f"{OBS_STATE}.ee.wy"] = float(tw[1])
obs[f"{OBS_STATE}.ee.wz"] = float(tw[2])
return obs
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
# We specify the dataset features of this step that we want to be stored in the dataset
for k in ["x", "y", "z", "wx", "wy", "wz"]:
features[f"observation.state.ee.{k}"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
features[f"{OBS_STATE}.ee.{k}"] = (PolicyFeature(type=FeatureType.ACTION, shape=(1,)),)
return features