Added Robot action to tensor processor

Added new processor script for dealing with gym specific action processing
This commit is contained in:
Michel Aractingi
2025-08-11 18:27:05 +02:00
parent f58796a112
commit 53ace28c42
4 changed files with 87 additions and 52 deletions
+2 -2
View File
@@ -23,11 +23,10 @@ from .hil_processor import (
GripperPenaltyProcessor,
ImageCropResizeProcessor,
InterventionActionProcessor,
Numpy2TorchActionProcessor,
RewardClassifierProcessor,
TimeLimitProcessor,
Torch2NumpyActionProcessor,
)
from .gym_action_processor import RobotAction2TensorProcessor, Torch2NumpyActionProcessor, Numpy2TorchActionProcessor
from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats
from .observation_processor import VanillaObservationProcessor
@@ -56,6 +55,7 @@ __all__ = [
"DoneProcessor",
"MapDeltaActionToRobotAction",
"MapTensorToDeltaActionDict",
"RobotAction2TensorProcessor",
"EnvTransition",
"GripperPenaltyProcessor",
"IdentityProcessor",
@@ -0,0 +1,81 @@
#! /usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
from dataclasses import dataclass
import torch
import numpy as np
from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry
@ProcessorStepRegistry.register("robot_action_to_tensor_processor")
@dataclass
class RobotAction2TensorProcessor(ActionProcessor):
"""Convert robot action to tensor."""
motor_names: list[str]
def action(self, action: dict | None) -> torch.Tensor | None:
if action is None:
return None
action_tensor = torch.tensor([action[f"action.{motor_name}.pos"] for motor_name in self.motor_names])
return action_tensor
@ProcessorStepRegistry.register("torch2numpy_action_processor")
@dataclass
class Torch2NumpyActionProcessor(ActionProcessor):
"""Convert PyTorch tensor actions to NumPy arrays."""
squeeze_batch_dim: bool = True
def action(self, action: torch.Tensor | None) -> np.ndarray | None:
if action is None:
return None
if not isinstance(action, torch.Tensor):
raise TypeError(
f"Expected torch.Tensor or None, got {type(action).__name__}. "
"Use appropriate processor for non-tensor actions."
)
numpy_action = action.detach().cpu().numpy()
# Remove batch dimensions but preserve action dimensions
# Only squeeze if there's a batch dimension (first dim == 1)
if (
self.squeeze_batch_dim
and numpy_action.shape
and len(numpy_action.shape) > 1
and numpy_action.shape[0] == 1
):
numpy_action = numpy_action.squeeze(0)
return numpy_action
@ProcessorStepRegistry.register("numpy2torch_action_processor")
@dataclass
class Numpy2TorchActionProcessor(ActionProcessor):
"""Convert NumPy array action to PyTorch tensor."""
def action(self, action: np.ndarray | None) -> torch.Tensor | None:
if action is None:
return None
if not isinstance(action, np.ndarray):
raise TypeError(
f"Expected np.ndarray or None, got {type(action).__name__}. "
"Use appropriate processor for non-tensor actions."
)
torch_action = torch.from_numpy(action)
return torch_action
-47
View File
@@ -49,53 +49,6 @@ class AddTeleopEventsAsInfo(InfoProcessor):
return info
@ProcessorStepRegistry.register("torch2numpy_action_processor")
@dataclass
class Torch2NumpyActionProcessor(ActionProcessor):
"""Convert PyTorch tensor actions to NumPy arrays."""
squeeze_batch_dim: bool = True
def action(self, action: torch.Tensor | None) -> np.ndarray | None:
if action is None:
return None
if not isinstance(action, torch.Tensor):
raise TypeError(
f"Expected torch.Tensor or None, got {type(action).__name__}. "
"Use appropriate processor for non-tensor actions."
)
numpy_action = action.detach().cpu().numpy()
# Remove batch dimensions but preserve action dimensions
# Only squeeze if there's a batch dimension (first dim == 1)
if (
self.squeeze_batch_dim
and numpy_action.shape
and len(numpy_action.shape) > 1
and numpy_action.shape[0] == 1
):
numpy_action = numpy_action.squeeze(0)
return numpy_action
@ProcessorStepRegistry.register("numpy2torch_action_processor")
@dataclass
class Numpy2TorchActionProcessor(ActionProcessor):
"""Convert NumPy array action to PyTorch tensor."""
def action(self, action: np.ndarray | None) -> torch.Tensor | None:
if action is None:
return None
if not isinstance(action, np.ndarray):
raise TypeError(
f"Expected np.ndarray or None, got {type(action).__name__}. "
"Use appropriate processor for non-tensor actions."
)
torch_action = torch.from_numpy(action)
return torch_action
@ProcessorStepRegistry.register("image_crop_resize_processor")
+4 -3
View File
@@ -41,6 +41,7 @@ from lerobot.processor import (
MotorCurrentProcessor,
Numpy2TorchActionProcessor,
RewardClassifierProcessor,
RobotAction2TensorProcessor,
RobotProcessor,
TimeLimitProcessor,
ToBatchProcessor,
@@ -402,9 +403,7 @@ def make_processors(
joint_names=motor_names,
)
env_pipeline_steps = [
VanillaObservationProcessor(),
]
env_pipeline_steps = [VanillaObservationProcessor()]
if cfg.processor.observation is not None:
if cfg.processor.observation.add_joint_velocity_to_observation:
@@ -457,6 +456,7 @@ def make_processors(
)
)
env_pipeline_steps.append(RobotAction2TensorProcessor(motor_names=motor_names))
env_pipeline_steps.append(ToBatchProcessor())
env_pipeline_steps.append(DeviceProcessor(device=device))
@@ -654,6 +654,7 @@ def control_loop(
env_processor=env_processor,
action_processor=action_processor,
)
print(transition[TransitionKey.ACTION])
terminated = transition.get(TransitionKey.DONE, False)
truncated = transition.get(TransitionKey.TRUNCATED, False)