chore(processor): add type alias RobotProcessorPipeline and PolicyProcessorPipeline (#1859)

* feat(processor): introduce PolicyProcessorPipeline and RobotProcessorPipeline as type aliases for DataProcessorPipeline

- Added PolicyProcessorPipeline and RobotProcessorPipeline type aliases to enhance clarity and maintainability in the processor module.
- Updated the __all__ list to include the new pipelines for better module export consistency.

* refactor(processor): replace DataProcessorPipeline with PolicyProcessorPipeline across multiple modules

- Updated all instances of DataProcessorPipeline to PolicyProcessorPipeline in various processor files for consistency and clarity.
- Adjusted function signatures to reflect the new pipeline type, enhancing maintainability and readability.

* refactor(processor): update hotswap_stats function to use PolicyProcessorPipeline

- Changed the parameter name from robot_processor to policy_processor for clarity.
- Ensured consistency with recent updates to the processor module by reflecting the new pipeline type in the function signature.

* refactor(processor): replace DataProcessorPipeline with PolicyProcessorPipeline in migrate_policy_normalization.py

- Updated the preprocessor and postprocessor to use PolicyProcessorPipeline for consistency with recent changes in the processor module.
- Enhanced clarity and maintainability by aligning with the new pipeline structure.

* refactor(processor): update hotswap_stats to use PolicyProcessorPipeline

- Changed the parameter type in hotswap_stats from DataProcessorPipeline to PolicyProcessorPipeline for consistency with recent updates.
- Enhanced clarity by updating the function documentation to reflect the new pipeline type.

* refactor(processor): replace DataProcessorPipeline with RobotProcessorPipeline across multiple files

- Updated instances of DataProcessorPipeline to RobotProcessorPipeline in evaluate.py, record.py, replay.py, teleoperate.py, and other relevant files for consistency and clarity.
- Adjusted function signatures and variable types to reflect the new pipeline structure, enhancing maintainability and readability.
This commit is contained in:
Adil Zouitine
2025-09-03 19:01:28 +02:00
committed by GitHub
parent 6c7169c4af
commit a6dbb65917
23 changed files with 102 additions and 89 deletions
+3 -3
View File
@@ -21,7 +21,7 @@ from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import DataProcessorPipeline
from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
transition_to_robot_action,
@@ -65,7 +65,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert ee pose action to joint action
robot_ee_to_joints_processor = DataProcessorPipeline(
robot_ee_to_joints_processor = RobotProcessorPipeline(
steps=[
AddRobotObservationAsComplimentaryData(robot=robot),
InverseKinematicsEEToJoints(
@@ -79,7 +79,7 @@ robot_ee_to_joints_processor = DataProcessorPipeline(
)
# Build pipeline to convert joint observation to ee pose observation
robot_joints_to_ee_pose_processor = DataProcessorPipeline(
robot_joints_to_ee_pose_processor = RobotProcessorPipeline(
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
+4 -4
View File
@@ -20,7 +20,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import DataProcessorPipeline
from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import (
action_to_transition,
observation_to_transition,
@@ -73,7 +73,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert phone action to ee pose action
phone_to_robot_ee_pose_processor = DataProcessorPipeline(
phone_to_robot_ee_pose_processor = RobotProcessorPipeline(
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
AddRobotObservationAsComplimentaryData(robot=robot),
@@ -93,7 +93,7 @@ phone_to_robot_ee_pose_processor = DataProcessorPipeline(
)
# Build pipeline to convert ee pose action to joint action
robot_ee_to_joints_processor = DataProcessorPipeline(
robot_ee_to_joints_processor = RobotProcessorPipeline(
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
@@ -110,7 +110,7 @@ robot_ee_to_joints_processor = DataProcessorPipeline(
)
# Build pipeline to convert joint observation to ee pose observation
robot_joints_to_ee_pose = DataProcessorPipeline(
robot_joints_to_ee_pose = RobotProcessorPipeline(
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
+2 -2
View File
@@ -19,7 +19,7 @@ import time
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import DataProcessorPipeline
from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import action_to_transition, transition_to_robot_action
from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig
from lerobot.robots.so100_follower.robot_kinematic_processor import (
@@ -50,7 +50,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert ee pose action to joint action
robot_ee_to_joints_processor = DataProcessorPipeline(
robot_ee_to_joints_processor = RobotProcessorPipeline(
steps=[
AddRobotObservationAsComplimentaryData(robot=robot),
InverseKinematicsEEToJoints(
+2 -2
View File
@@ -16,7 +16,7 @@
import time
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import DataProcessorPipeline
from lerobot.processor import RobotProcessorPipeline
from lerobot.processor.converters import action_to_transition, transition_to_robot_action
from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig
from lerobot.robots.so100_follower.robot_kinematic_processor import (
@@ -49,7 +49,7 @@ kinematics_solver = RobotKinematics(
)
# Build pipeline to convert phone action to ee pose action to joint action
phone_to_robot_joints_processor = DataProcessorPipeline(
phone_to_robot_joints_processor = RobotProcessorPipeline(
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
AddRobotObservationAsComplimentaryData(robot=robot),
+4 -4
View File
@@ -19,9 +19,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.act.configuration_act import ACTConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -33,7 +33,7 @@ def make_act_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -57,12 +57,12 @@ def make_act_pre_post_processors(
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
@@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -34,7 +34,7 @@ def make_diffusion_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -57,12 +57,12 @@ def make_diffusion_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
+4 -4
View File
@@ -38,7 +38,7 @@ from lerobot.policies.sac.reward_model.configuration_classifier import RewardCla
from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig
from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig
from lerobot.processor import DataProcessorPipeline, ProcessorKwargs
from lerobot.processor import PolicyProcessorPipeline, ProcessorKwargs
def get_policy_class(name: str) -> type[PreTrainedPolicy]:
@@ -122,7 +122,7 @@ def make_pre_post_processors(
policy_cfg: PreTrainedConfig,
pretrained_path: str | None = None,
**kwargs: Unpack[ProcessorConfigKwargs],
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
"""Make a processor instance for a given policy type.
This function creates the appropriate processor configuration based on the policy type.
@@ -146,14 +146,14 @@ def make_pre_post_processors(
postprocessor_kwargs = kwargs.get("postprocessor_kwargs", {})
return (
DataProcessorPipeline.from_pretrained(
PolicyProcessorPipeline.from_pretrained(
pretrained_model_name_or_path=pretrained_path,
config_filename=kwargs.get("preprocessor_config_filename", "robot_preprocessor.json"),
overrides=kwargs.get("preprocessor_overrides", {}),
to_transition=preprocessor_kwargs.get("to_transition"),
to_output=preprocessor_kwargs.get("to_output"),
),
DataProcessorPipeline.from_pretrained(
PolicyProcessorPipeline.from_pretrained(
pretrained_model_name_or_path=pretrained_path,
config_filename=kwargs.get("postprocessor_config_filename", "robot_postprocessor.json"),
overrides=kwargs.get("postprocessor_overrides", {}),
+4 -4
View File
@@ -23,9 +23,9 @@ from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.processor import (
AddBatchDimensionProcessorStep,
ComplementaryDataProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
ProcessorStep,
ProcessorStepRegistry,
@@ -72,7 +72,7 @@ def make_pi0_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -105,12 +105,12 @@ def make_pi0_pre_post_processors(
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
@@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -34,7 +34,7 @@ def make_pi0fast_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -57,12 +57,12 @@ def make_pi0fast_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
+4 -4
View File
@@ -21,9 +21,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -35,7 +35,7 @@ def make_sac_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -58,12 +58,12 @@ def make_sac_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
@@ -17,10 +17,10 @@ import torch
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
from lerobot.processor import (
DataProcessorPipeline,
DeviceProcessorStep,
IdentityProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
)
@@ -30,7 +30,7 @@ def make_classifier_processor(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -48,12 +48,12 @@ def make_classifier_processor(
output_steps = [DeviceProcessorStep(device="cpu"), IdentityProcessorStep()]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name="classifier_preprocessor",
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name="classifier_postprocessor",
**postprocessor_kwargs,
@@ -22,9 +22,9 @@ from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
ComplementaryDataProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
ProcessorStepRegistry,
RenameProcessorStep,
@@ -38,7 +38,7 @@ def make_smolvla_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -68,12 +68,12 @@ def make_smolvla_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
@@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -34,7 +34,7 @@ def make_tdmpc_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -57,12 +57,12 @@ def make_tdmpc_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
@@ -21,9 +21,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N
from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DataProcessorPipeline,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
RenameProcessorStep,
UnnormalizerProcessorStep,
@@ -35,7 +35,7 @@ def make_vqbet_pre_post_processors(
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
preprocessor_kwargs: ProcessorKwargs | None = None,
postprocessor_kwargs: ProcessorKwargs | None = None,
) -> tuple[DataProcessorPipeline, DataProcessorPipeline]:
) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]:
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
if postprocessor_kwargs is None:
@@ -58,12 +58,12 @@ def make_vqbet_pre_post_processors(
),
]
return (
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=input_steps,
name=PREPROCESSOR_DEFAULT_NAME,
**preprocessor_kwargs,
),
DataProcessorPipeline(
PolicyProcessorPipeline(
steps=output_steps,
name=POSTPROCESSOR_DEFAULT_NAME,
**postprocessor_kwargs,
+4
View File
@@ -46,10 +46,12 @@ from .pipeline import (
IdentityProcessorStep,
InfoProcessorStep,
ObservationProcessorStep,
PolicyProcessorPipeline,
ProcessorKwargs,
ProcessorStep,
ProcessorStepRegistry,
RewardProcessorStep,
RobotProcessorPipeline,
TruncatedProcessorStep,
)
from .rename_processor import RenameProcessorStep
@@ -79,6 +81,7 @@ __all__ = [
"NormalizerProcessorStep",
"Numpy2TorchActionProcessorStep",
"ObservationProcessorStep",
"PolicyProcessorPipeline",
"ProcessorKwargs",
"ProcessorStep",
"ProcessorStepRegistry",
@@ -88,6 +91,7 @@ __all__ = [
"DataProcessorPipeline",
"TimeLimitProcessorStep",
"AddBatchDimensionProcessorStep",
"RobotProcessorPipeline",
"TokenizerProcessorStep",
"Torch2NumpyActionProcessorStep",
"transition_to_batch",
@@ -50,7 +50,7 @@ from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from .batch_processor import AddBatchDimensionProcessorStep
from .device_processor import DeviceProcessorStep
from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep
from .pipeline import DataProcessorPipeline
from .pipeline import PolicyProcessorPipeline
from .rename_processor import RenameProcessorStep
# Policy type to class mapping
@@ -422,14 +422,14 @@ def main():
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=policy_config.device),
]
preprocessor = DataProcessorPipeline(steps=preprocessor_steps, name="robot_preprocessor")
preprocessor = PolicyProcessorPipeline(steps=preprocessor_steps, name="robot_preprocessor")
# Create postprocessor with unnormalizer for outputs only
postprocessor_steps = [
DeviceProcessorStep(device="cpu"),
UnnormalizerProcessorStep(features=output_features, norm_map=norm_map, stats=stats),
]
postprocessor = DataProcessorPipeline(steps=postprocessor_steps, name="robot_postprocessor")
postprocessor = PolicyProcessorPipeline(steps=postprocessor_steps, name="robot_postprocessor")
# Determine hub repo ID if pushing to hub
if args.push_to_hub:
+6 -6
View File
@@ -12,7 +12,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from .converters import to_tensor
from .core import EnvTransition, TransitionKey
from .pipeline import DataProcessorPipeline, ProcessorStep, ProcessorStepRegistry
from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry
@dataclass
@@ -253,17 +253,17 @@ class UnnormalizerProcessorStep(_NormalizationMixin, ProcessorStep):
def hotswap_stats(
robot_processor: DataProcessorPipeline, stats: dict[str, dict[str, Any]]
) -> DataProcessorPipeline:
policy_processor: PolicyProcessorPipeline, stats: dict[str, dict[str, Any]]
) -> PolicyProcessorPipeline:
"""
Replaces normalization statistics in a RobotProcessor pipeline.
Replaces normalization statistics in a PolicyProcessor pipeline.
This function creates a deep copy of the provided `RobotProcessor` and updates the
This function creates a deep copy of the provided `PolicyProcessorPipeline` and updates the
statistics of any `NormalizerProcessorStep` or `UnnormalizerProcessorStep` steps within it.
It's useful for adapting a trained policy to a new environment or dataset with
different data distributions.
"""
rp = deepcopy(robot_processor)
rp = deepcopy(policy_processor)
for step in rp.steps:
if isinstance(step, _NormalizationMixin):
step.stats = stats
+5 -1
View File
@@ -23,7 +23,7 @@ from collections.abc import Callable, Iterable, Sequence
from copy import deepcopy
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generic, TypedDict, TypeVar, cast
from typing import Any, Generic, TypeAlias, TypedDict, TypeVar, cast
import torch
from huggingface_hub import ModelHubMixin, hf_hub_download
@@ -805,6 +805,10 @@ class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]):
return transformed_transition[TransitionKey.COMPLEMENTARY_DATA]
RobotProcessorPipeline: TypeAlias = DataProcessorPipeline
PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline
class ObservationProcessorStep(ProcessorStep, ABC):
"""Base class for processors that modify only the observation component of a transition.
+14 -9
View File
@@ -76,7 +76,12 @@ from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep, TransitionKey
from lerobot.processor import (
IdentityProcessorStep,
PolicyProcessorPipeline,
RobotProcessorPipeline,
TransitionKey,
)
from lerobot.processor.converters import (
action_to_transition,
observation_to_transition,
@@ -235,22 +240,22 @@ def record_loop(
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | list[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
preprocessor: DataProcessorPipeline | None = None,
postprocessor: DataProcessorPipeline | None = None,
preprocessor: PolicyProcessorPipeline | None = None,
postprocessor: PolicyProcessorPipeline | None = None,
control_time_s: int | None = None,
teleop_action_processor: DataProcessorPipeline | None = None, # runs after teleop
robot_action_processor: DataProcessorPipeline | None = None, # runs before robot
robot_observation_processor: DataProcessorPipeline | None = None, # runs after robot
teleop_action_processor: RobotProcessorPipeline | None = None, # runs after teleop
robot_action_processor: RobotProcessorPipeline | None = None, # runs before robot
robot_observation_processor: RobotProcessorPipeline | None = None, # runs after robot
single_task: str | None = None,
display_data: bool = False,
):
teleop_action_processor = teleop_action_processor or DataProcessorPipeline(
teleop_action_processor = teleop_action_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr
)
robot_action_processor = robot_action_processor or DataProcessorPipeline(
robot_action_processor = robot_action_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()], to_transition=lambda tr: tr, to_output=transition_to_robot_action
)
robot_observation_processor = robot_observation_processor or DataProcessorPipeline(
robot_observation_processor = robot_observation_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=lambda tr: tr,
+3 -3
View File
@@ -47,7 +47,7 @@ from pprint import pformat
from lerobot.configs import parser
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep
from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline
from lerobot.processor.converters import action_to_transition, transition_to_robot_action
from lerobot.robots import ( # noqa: F401
Robot,
@@ -85,7 +85,7 @@ class ReplayConfig:
# Use vocal synthesis to read events.
play_sounds: bool = True
# Optional processor for actions before sending to robot
robot_action_processor: DataProcessorPipeline | None = None
robot_action_processor: RobotProcessorPipeline | None = None
@parser.wrap()
@@ -94,7 +94,7 @@ def replay(cfg: ReplayConfig):
logging.info(pformat(asdict(cfg)))
# Initialize robot action processor with default if not provided
robot_action_processor = cfg.robot_action_processor or DataProcessorPipeline(
robot_action_processor = cfg.robot_action_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=action_to_transition,
to_output=transition_to_robot_action, # type: ignore[arg-type]
+10 -10
View File
@@ -61,7 +61,7 @@ import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep
from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline
from lerobot.processor.converters import (
action_to_transition,
observation_to_transition,
@@ -104,9 +104,9 @@ class TeleoperateConfig:
# Display all cameras on screen
display_data: bool = False
# Optional processors for data transformation
teleop_action_processor: DataProcessorPipeline | None = None # runs after teleop
robot_action_processor: DataProcessorPipeline | None = None # runs before robot
robot_observation_processor: DataProcessorPipeline | None = None # runs after robot
teleop_action_processor: RobotProcessorPipeline | None = None # runs after teleop
robot_action_processor: RobotProcessorPipeline | None = None # runs before robot
robot_observation_processor: RobotProcessorPipeline | None = None # runs after robot
def teleop_loop(
@@ -115,20 +115,20 @@ def teleop_loop(
fps: int,
display_data: bool = False,
duration: float | None = None,
teleop_action_processor: DataProcessorPipeline | None = None,
robot_action_processor: DataProcessorPipeline | None = None,
robot_observation_processor: DataProcessorPipeline | None = None,
teleop_action_processor: RobotProcessorPipeline | None = None,
robot_action_processor: RobotProcessorPipeline | None = None,
robot_observation_processor: RobotProcessorPipeline | None = None,
):
# Initialize processors with defaults if not provided
teleop_action_processor = teleop_action_processor or DataProcessorPipeline(
teleop_action_processor = teleop_action_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr
)
robot_action_processor = robot_action_processor or DataProcessorPipeline(
robot_action_processor = robot_action_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=lambda tr: tr,
to_output=transition_to_robot_action, # type: ignore[arg-type]
)
robot_observation_processor = robot_observation_processor or DataProcessorPipeline(
robot_observation_processor = robot_observation_processor or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=lambda tr: tr,
+3 -3
View File
@@ -31,7 +31,7 @@ from termcolor import colored
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import DEFAULT_FEATURES
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.processor import DataProcessorPipeline, TransitionKey
from lerobot.processor import PolicyProcessorPipeline, TransitionKey
from lerobot.robots import Robot
@@ -102,8 +102,8 @@ def predict_action(
observation: dict[str, np.ndarray],
policy: PreTrainedPolicy,
device: torch.device,
preprocessor: DataProcessorPipeline,
postprocessor: DataProcessorPipeline,
preprocessor: PolicyProcessorPipeline,
postprocessor: PolicyProcessorPipeline,
use_amp: bool,
task: str | None = None,
robot_type: str | None = None,
+3 -3
View File
@@ -32,7 +32,7 @@ from lerobot.datasets.utils import load_json, write_json
from lerobot.optim.optimizers import load_optimizer_state, save_optimizer_state
from lerobot.optim.schedulers import load_scheduler_state, save_scheduler_state
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.processor import DataProcessorPipeline
from lerobot.processor import PolicyProcessorPipeline
from lerobot.utils.random_utils import load_rng_state, save_rng_state
@@ -75,8 +75,8 @@ def save_checkpoint(
policy: PreTrainedPolicy,
optimizer: Optimizer,
scheduler: LRScheduler | None = None,
preprocessor: DataProcessorPipeline | None = None,
postprocessor: DataProcessorPipeline | None = None,
preprocessor: PolicyProcessorPipeline | None = None,
postprocessor: PolicyProcessorPipeline | None = None,
) -> None:
"""This function creates the following directory structure: