mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-22 20:19:43 +00:00
feat(envs): add envs pre-post processor (#2474)
* more changes * working changes * more changes * more fixes * fix style * more * clean * put axis-1 * more fixes * more styling fixes: * iterate on review: * more changes * add env processor * style * more changes * add docs * fix imports * fix test, add to train * Update src/lerobot/envs/factory.py Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co> Signed-off-by: Jade Choghari <chogharijade@gmail.com> * iterate on review --------- Signed-off-by: Jade Choghari <chogharijade@gmail.com> Co-authored-by: jade.choghari@huggingface.co <“chogharijade@gmail.com”> Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
This commit is contained in:
+69
-21
@@ -28,7 +28,6 @@ import torch
|
||||
from gymnasium import spaces
|
||||
from libero.libero import benchmark, get_libero_path
|
||||
from libero.libero.envs import OffScreenRenderEnv
|
||||
from robosuite.utils.transform_utils import quat2axisangle
|
||||
|
||||
|
||||
def _parse_camera_names(camera_name: str | Sequence[str]) -> list[str]:
|
||||
@@ -175,11 +174,36 @@ class LiberoEnv(gym.Env):
|
||||
self.observation_space = spaces.Dict(
|
||||
{
|
||||
"pixels": spaces.Dict(images),
|
||||
"agent_pos": spaces.Box(
|
||||
low=AGENT_POS_LOW,
|
||||
high=AGENT_POS_HIGH,
|
||||
shape=(OBS_STATE_DIM,),
|
||||
dtype=np.float64,
|
||||
"robot_state": spaces.Dict(
|
||||
{
|
||||
"eef": spaces.Dict(
|
||||
{
|
||||
"pos": spaces.Box(low=-np.inf, high=np.inf, shape=(3,), dtype=np.float64),
|
||||
"quat": spaces.Box(
|
||||
low=-np.inf, high=np.inf, shape=(4,), dtype=np.float64
|
||||
),
|
||||
"mat": spaces.Box(
|
||||
low=-np.inf, high=np.inf, shape=(3, 3), dtype=np.float64
|
||||
),
|
||||
}
|
||||
),
|
||||
"gripper": spaces.Dict(
|
||||
{
|
||||
"qpos": spaces.Box(
|
||||
low=-np.inf, high=np.inf, shape=(2,), dtype=np.float64
|
||||
),
|
||||
"qvel": spaces.Box(
|
||||
low=-np.inf, high=np.inf, shape=(2,), dtype=np.float64
|
||||
),
|
||||
}
|
||||
),
|
||||
"joints": spaces.Dict(
|
||||
{
|
||||
"pos": spaces.Box(low=-np.inf, high=np.inf, shape=(7,), dtype=np.float64),
|
||||
"vel": spaces.Box(low=-np.inf, high=np.inf, shape=(7,), dtype=np.float64),
|
||||
}
|
||||
),
|
||||
}
|
||||
),
|
||||
}
|
||||
)
|
||||
@@ -191,6 +215,7 @@ class LiberoEnv(gym.Env):
|
||||
def render(self):
|
||||
raw_obs = self._env.env._get_observations()
|
||||
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
|
||||
image = image[::-1, ::-1] # flip both H and W for visualization
|
||||
return image
|
||||
|
||||
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
|
||||
@@ -212,23 +237,48 @@ class LiberoEnv(gym.Env):
|
||||
images = {}
|
||||
for camera_name in self.camera_name:
|
||||
image = raw_obs[camera_name]
|
||||
image = image[::-1, ::-1] # rotate 180 degrees
|
||||
images[self.camera_name_mapping[camera_name]] = image
|
||||
state = np.concatenate(
|
||||
(
|
||||
raw_obs["robot0_eef_pos"],
|
||||
quat2axisangle(raw_obs["robot0_eef_quat"]),
|
||||
raw_obs["robot0_gripper_qpos"],
|
||||
)
|
||||
)
|
||||
agent_pos = state
|
||||
|
||||
eef_pos = raw_obs.get("robot0_eef_pos")
|
||||
eef_quat = raw_obs.get("robot0_eef_quat")
|
||||
|
||||
# rotation matrix from controller
|
||||
eef_mat = self._env.robots[0].controller.ee_ori_mat if eef_pos is not None else None
|
||||
gripper_qpos = raw_obs.get("robot0_gripper_qpos")
|
||||
gripper_qvel = raw_obs.get("robot0_gripper_qvel")
|
||||
joint_pos = raw_obs.get("robot0_joint_pos")
|
||||
joint_vel = raw_obs.get("robot0_joint_vel")
|
||||
obs = {
|
||||
"pixels": images,
|
||||
"robot_state": {
|
||||
"eef": {
|
||||
"pos": eef_pos, # (3,)
|
||||
"quat": eef_quat, # (4,)
|
||||
"mat": eef_mat, # (3, 3)
|
||||
},
|
||||
"gripper": {
|
||||
"qpos": gripper_qpos, # (2,)
|
||||
"qvel": gripper_qvel, # (2,)
|
||||
},
|
||||
"joints": {
|
||||
"pos": joint_pos, # (7,)
|
||||
"vel": joint_vel, # (7,)
|
||||
},
|
||||
},
|
||||
}
|
||||
if self.obs_type == "pixels":
|
||||
return {"pixels": images.copy()}
|
||||
|
||||
if self.obs_type == "pixels_agent_pos":
|
||||
return {
|
||||
"pixels": images.copy(),
|
||||
"agent_pos": agent_pos,
|
||||
}
|
||||
# Validate required fields are present
|
||||
if eef_pos is None or eef_quat is None or gripper_qpos is None:
|
||||
raise ValueError(
|
||||
f"Missing required robot state fields in raw observation. "
|
||||
f"Got eef_pos={eef_pos is not None}, eef_quat={eef_quat is not None}, "
|
||||
f"gripper_qpos={gripper_qpos is not None}"
|
||||
)
|
||||
return obs
|
||||
|
||||
raise NotImplementedError(
|
||||
f"The observation type '{self.obs_type}' is not supported in LiberoEnv. "
|
||||
"Please switch to an image-based obs_type (e.g. 'pixels', 'pixels_agent_pos')."
|
||||
@@ -355,12 +405,10 @@ def create_libero_envs(
|
||||
print(f"Restricting to task_ids={task_ids_filter}")
|
||||
|
||||
out: dict[str, dict[int, Any]] = defaultdict(dict)
|
||||
|
||||
for suite_name in suite_names:
|
||||
suite = _get_suite(suite_name)
|
||||
total = len(suite.tasks)
|
||||
selected = _select_task_ids(total, task_ids_filter)
|
||||
|
||||
if not selected:
|
||||
raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user