From 83a4338f8b487a9bf0c8631cbe8b4742bee59c19 Mon Sep 17 00:00:00 2001 From: Adil Zouitine Date: Sun, 6 Jul 2025 22:03:37 +0200 Subject: [PATCH] chore (output format): improves output format --- docs/source/processor_tutorial.mdx | 461 +++++++++++++++++++++-- src/lerobot/processor/pipeline.py | 76 ++-- src/lerobot/scripts/eval.py | 11 +- tests/processor/test_batch_conversion.py | 2 +- 4 files changed, 487 insertions(+), 63 deletions(-) diff --git a/docs/source/processor_tutorial.mdx b/docs/source/processor_tutorial.mdx index a6e143725..3f59666cd 100644 --- a/docs/source/processor_tutorial.mdx +++ b/docs/source/processor_tutorial.mdx @@ -95,9 +95,13 @@ This approach has several problems: RobotProcessor solves these issues by providing a declarative pipeline approach where each transformation is a separate, testable, shareable component. -## Understanding EnvTransition +## Understanding EnvTransition and Batch Format -Before diving into RobotProcessor, let's understand the data structure it operates on. An `EnvTransition` is a 7-tuple that represents a complete transition in the environment: +RobotProcessor works with two data formats: + +### 1. EnvTransition Tuple Format + +An `EnvTransition` is a 7-tuple that represents a complete transition in the environment: ```python from lerobot.processor.pipeline import TransitionIndex @@ -105,24 +109,59 @@ from lerobot.processor.pipeline import TransitionIndex # EnvTransition structure: # (observation, action, reward, done, truncated, info, complementary_data) transition = ( - {"pixels": ..., "agent_pos": ...}, # observation at time t - [0.1, -0.2, 0.3], # action taken at time t - 1.0, # reward received - False, # episode done flag - False, # episode truncated flag - {"success": True}, # additional info from environment - {"step_idx": 42} # complementary_data for inter-step communication + {"observation.image": ..., "observation.state": ...}, # observation at time t + [0.1, -0.2, 0.3], # action taken at time t + 1.0, # reward received + False, # episode done flag + False, # episode truncated flag + {"success": True}, # additional info from environment + {"step_idx": 42} # complementary_data for inter-step communication ) ``` -Each element serves a specific purpose: -1. **observation**: Raw sensor data from the environment (images, states, etc.) -2. **action**: The action command sent to the robot -3. **reward**: Scalar reward signal (for RL tasks) -4. **done**: Boolean indicating natural episode termination -5. **truncated**: Boolean indicating artificial termination (time limit, safety) -6. **info**: Dictionary with environment-specific information -7. **complementary_data**: Dictionary for passing data between processor steps (NEW!) +### 2. Batch Dictionary Format + +This is the format used by LeRobot datasets and replay buffers: + +```python +# Batch dictionary format (used by LeRobotDataset, ReplayBuffer) +batch = { + "observation.image": torch.tensor(...), # Image observations + "observation.state": torch.tensor(...), # State observations + "action": torch.tensor(...), # Actions + "next.reward": torch.tensor(...), # Rewards + "next.done": torch.tensor(...), # Done flags + "next.truncated": torch.tensor(...), # Truncated flags + "info": {...}, # Info dictionary + # Additional keys are preserved but ignored during conversion +} +``` + +### Automatic Format Conversion + +RobotProcessor automatically handles both formats: + +```python +from lerobot.processor.pipeline import RobotProcessor +from lerobot.processor.observation_processor import ImageProcessor + +processor = RobotProcessor([ImageProcessor()]) + +# Works with EnvTransition tuples +transition = ({"pixels": image_array}, None, 0.0, False, False, {}, {}) +processed_transition = processor(transition) # Returns EnvTransition tuple + +# Also works with batch dictionaries +batch = { + "observation.pixels": image_tensor, + "action": action_tensor, + "next.reward": reward_tensor, + "next.done": done_tensor, + "next.truncated": truncated_tensor, + "info": info_dict +} +processed_batch = processor(batch) # Returns batch dictionary +``` ### Using TransitionIndex @@ -145,6 +184,228 @@ info = transition[TransitionIndex.INFO] comp_data = transition[TransitionIndex.COMPLEMENTARY_DATA] ``` +### Default Conversion Functions + +RobotProcessor uses these default conversion functions: + +```python +def _default_batch_to_transition(batch): + """Default conversion from batch dict to EnvTransition tuple.""" + # Extract observation keys (anything starting with "observation.") + observation_keys = {k: v for k, v in batch.items() if k.startswith("observation.")} + + observation = None + if observation_keys: + observation = {} + # Keep observation.* keys as-is (don't remove "observation." prefix) + for key, value in observation_keys.items(): + observation[key] = value + + return ( + observation, + batch.get("action"), + batch.get("next.reward", 0.0), # Note: "next.reward" not "reward" + batch.get("next.done", False), # Note: "next.done" not "done" + batch.get("next.truncated", False), # Note: "next.truncated" not "truncated" + batch.get("info", {}), + {}, # Empty complementary_data + ) + +def _default_transition_to_batch(transition): + """Default conversion from EnvTransition tuple to batch dict.""" + obs, action, reward, done, truncated, info, _ = transition + + batch = { + "action": action, + "next.reward": reward, # Note: "next.reward" not "reward" + "next.done": done, # Note: "next.done" not "done" + "next.truncated": truncated, # Note: "next.truncated" not "truncated" + "info": info, + } + + # Flatten observation dict (keep observation.* keys as-is) + if isinstance(obs, dict): + for key, value in obs.items(): + batch[key] = value + + return batch +``` + +### Custom Conversion Functions + +You can customize how RobotProcessor converts between formats: + +```python +def custom_batch_to_transition(batch): + """Custom conversion from batch dict to EnvTransition tuple.""" + # Extract observation keys (anything starting with "observation.") + observation = {k: v for k, v in batch.items() if k.startswith("observation.")} + + return ( + observation, + batch.get("action"), + batch.get("reward", 0.0), # Use "reward" instead of "next.reward" + batch.get("done", False), # Use "done" instead of "next.done" + batch.get("truncated", False), + batch.get("info", {}), + batch.get("complementary_data", {}) + ) + +def custom_transition_to_batch(transition): + """Custom conversion from EnvTransition tuple to batch dict.""" + obs, action, reward, done, truncated, info, comp_data = transition + + batch = { + "action": action, + "reward": reward, # Use "reward" instead of "next.reward" + "done": done, # Use "done" instead of "next.done" + "truncated": truncated, + "info": info, + } + + # Flatten observation dict + if obs: + batch.update(obs) + + return batch + +# Use custom converters +processor = RobotProcessor( + steps=[ImageProcessor()], + to_transition=custom_batch_to_transition, + to_output=custom_transition_to_batch +) +``` + +### Advanced: Controlling Output Format with `to_output` + +The `to_output` function determines what format is returned when you call the processor with a batch dictionary. Sometimes you want to output `EnvTransition` tuples even when you input batch dictionaries: + +```python +# Identity function to always return EnvTransition tuples +def keep_as_transition(transition): + """Always return EnvTransition tuple regardless of input format.""" + return transition + +# Processor that always outputs EnvTransition tuples +processor = RobotProcessor( + steps=[ImageProcessor(), StateProcessor()], + to_output=keep_as_transition # Always return tuple format +) + +# Even when called with batch dict, returns EnvTransition tuple +batch = { + "observation.image": image_tensor, + "action": action_tensor, + "next.reward": reward_tensor, + "next.done": done_tensor, + "next.truncated": truncated_tensor, + "info": info_dict +} + +result = processor(batch) # Returns EnvTransition tuple, not batch dict! +print(type(result)) # +``` + +### Real-World Example: Environment Interaction + +This is particularly useful for environment interaction where you want consistent tuple output: + +```python +from lerobot.processor.observation_processor import VanillaObservationProcessor + +# Create processor that always outputs EnvTransition for environment interaction +# This avoids format conversion overhead during real-time control +env_processor = RobotProcessor( + [VanillaObservationProcessor()], + to_transition=lambda x: x, # Pass through - no conversion needed + to_output=lambda x: x, # Always return EnvTransition tuple +) + +# Environment interaction loop +env = make_env() +obs, info = env.reset() + +for step in range(1000): + # Create transition - input is already in tuple format + transition = (obs, None, 0.0, False, False, info, {"step": step}) + + # Process - output is guaranteed to be EnvTransition tuple + processed_transition = env_processor(transition) + processed_obs = processed_transition[TransitionIndex.OBSERVATION] + + # Use with policy + action = policy.select_action(processed_obs) + obs, reward, done, truncated, info = env.step(action) + + if done or truncated: + break +``` + +### When to Use Different Output Formats + +**Use EnvTransition tuple output when:** +- Environment interaction and real-time control +- You need to access individual transition components frequently +- Performance is critical (avoids dictionary creation overhead) +- Working with gym environments that expect tuple format + +**Use batch dictionary output when:** +- Training with LeRobot datasets +- Working with DataLoaders and batched processing +- Interfacing with existing LeRobot training code +- You need the standardized "next.*" key format + +```python +# For environment interaction - use tuple output +env_processor = RobotProcessor( + steps=[ImageProcessor(), StateProcessor()], + to_output=lambda x: x # Return EnvTransition tuple +) + +# For training - use batch output (default) +train_processor = RobotProcessor( + steps=[ImageProcessor(), StateProcessor(), NormalizerProcessor(...)], + # to_output defaults to _default_transition_to_batch +) + +# Training loop +for batch in dataloader: + processed_batch = train_processor(batch) # Returns batch dict + loss = model.compute_loss(processed_batch) + +# Environment loop +for step in range(1000): + transition = (obs, None, 0.0, False, False, info, {}) + processed_transition = env_processor(transition) # Returns EnvTransition tuple + obs = processed_transition[TransitionIndex.OBSERVATION] + action = policy.select_action(obs) +``` + +### Why "next.reward", "next.done", "next.truncated"? + +The default conversion uses "next.*" prefixes because this matches the standard format used by LeRobot datasets and follows the convention that rewards, done flags, and truncated flags are the result of taking an action (i.e., they come from the "next" state): + +```python +# Standard RL transition format +# (s_t, a_t, r_{t+1}, done_{t+1}, truncated_{t+1}) +# ^ ^ ^ ^ ^ +# | | | | | +# | | | | +-- Result of action a_t +# | | | +-- Result of action a_t +# | | +-- Result of action a_t +# | +-- Action taken in state s_t +# +-- State at time t + +batch = { + "observation.state": s_t, + "action": a_t, + "next.reward": r_{t+1}, # Reward received after taking action + "next.done": done_{t+1}, # Done flag after taking action + "next.truncated": truncated_{t+1}, # Truncated flag after taking action +} +``` + ## Your First RobotProcessor Let's create a processor that properly handles image and state preprocessing: @@ -184,6 +445,111 @@ print("Image range:", processed_obs["observation.images.camera_front"].min().ite "to", processed_obs["observation.images.camera_front"].max().item()) # 0.0 to 1.0 ``` +## Working with LeRobot Datasets and Replay Buffers + +RobotProcessor seamlessly works with LeRobot's batch dictionary format: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.processor.pipeline import RobotProcessor +from lerobot.processor.normalize_processor import NormalizerProcessor +from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature + +# Load a dataset +dataset = LeRobotDataset("lerobot/pusht") + +# Define features and normalization +features = { + "observation.image": PolicyFeature(FeatureType.VISUAL, (3, 96, 96)), + "observation.state": PolicyFeature(FeatureType.STATE, (2,)), + "action": PolicyFeature(FeatureType.ACTION, (2,)), +} + +norm_map = { + FeatureType.VISUAL: NormalizationMode.MEAN_STD, + FeatureType.STATE: NormalizationMode.MIN_MAX, + FeatureType.ACTION: NormalizationMode.MEAN_STD, +} + +# Create processor with normalization +processor = RobotProcessor([ + NormalizerProcessor.from_lerobot_dataset(dataset, features, norm_map), +]) + +# Process a batch from the dataset +batch = dataset[0] # Get first batch +print("Original batch keys:", list(batch.keys())) +print("Original image shape:", batch["observation.image"].shape) + +# Process the batch - automatically converts to/from batch format +processed_batch = processor(batch) +print("Processed batch keys:", list(processed_batch.keys())) +print("Processed image range:", processed_batch["observation.image"].min().item(), + "to", processed_batch["observation.image"].max().item()) + +# Use with DataLoader +from torch.utils.data import DataLoader + +dataloader = DataLoader(dataset, batch_size=32, shuffle=True) + +for batch in dataloader: + # Process entire batch at once + processed_batch = processor(batch) + + # Use processed batch for training + # model.train_step(processed_batch) + break +``` + +## Integration with Replay Buffers + +RobotProcessor works great with replay buffers for online learning: + +```python +from lerobot.common.utils.buffer import ReplayBuffer +from lerobot.processor.pipeline import RobotProcessor +from lerobot.processor.device_processor import DeviceProcessor + +# Create replay buffer +buffer = ReplayBuffer(capacity=10000) + +# Create processor for online data +online_processor = RobotProcessor([ + ImageProcessor(), + StateProcessor(), + DeviceProcessor(device="cuda"), +]) + +# During environment interaction +env = make_env() +obs, info = env.reset() + +for step in range(1000): + # Raw environment observation + transition = (obs, None, 0.0, False, False, info, {}) + + # Process for policy input + processed_transition = online_processor(transition) + processed_obs = processed_transition[TransitionIndex.OBSERVATION] + + # Get action from policy + action = policy.select_action(processed_obs) + + # Execute action + next_obs, reward, done, truncated, info = env.step(action) + + # Store in replay buffer (can store either format) + buffer.add(obs, action, reward, next_obs, done, truncated, info) + + obs = next_obs + if done or truncated: + obs, info = env.reset() + +# Sample and process batches for training +batch = buffer.sample(batch_size=32) +processed_batch = online_processor(batch) # Processes entire batch +``` + ## Creating Custom Steps: The ProcessorStep Protocol A processor step must follow certain conventions. Let's create a complete example that shows all required and optional methods: @@ -689,7 +1055,7 @@ def state_dict(self) -> Dict[str, torch.Tensor]: ## Complete Policy Example with Pre and Post Processing -Here's how to use RobotProcessor in a real robot control loop: +Here's how to use RobotProcessor in a real robot control loop, showing both tuple and batch formats: ```python from lerobot.processor.pipeline import RobotProcessor, ProcessorStepRegistry, TransitionIndex @@ -739,7 +1105,7 @@ preprocessor = preprocessor.to("cuda") postprocessor = postprocessor.to("cuda") policy = policy.to("cuda") -# Control loop +# Control loop using EnvTransition format env = make_robot_env() obs, info = env.reset() @@ -750,7 +1116,7 @@ for episode in range(10): # Create transition with raw observation transition = (obs, None, 0.0, False, False, info, {"step": step}) - # Preprocess + # Preprocess - works with tuple format processed_transition = preprocessor(transition) processed_obs = processed_transition[TransitionIndex.OBSERVATION] @@ -784,6 +1150,47 @@ for episode in range(10): # Save preprocessor with learned statistics preprocessor.save_pretrained(f"./checkpoints/preprocessor_ep{episode}") +# Alternative: Using batch dictionary format +# This is useful when integrating with existing LeRobot training code +def control_loop_with_batch_format(): + """Example using batch dictionary format.""" + obs, info = env.reset() + + for step in range(1000): + # Create batch dictionary + batch = { + "observation.image": torch.from_numpy(obs["pixels"]).unsqueeze(0), + "observation.state": torch.from_numpy(obs["agent_pos"]).unsqueeze(0), + "action": torch.zeros(1, 7), # Placeholder + "next.reward": torch.tensor([0.0]), + "next.done": torch.tensor([False]), + "next.truncated": torch.tensor([False]), + "info": info, + } + + # Preprocess - works with batch format + processed_batch = preprocessor(batch) + + # Get action from policy + with torch.no_grad(): + action = policy.select_action({ + k: v for k, v in processed_batch.items() + if k.startswith("observation.") + }) + + # Add action to batch for postprocessing + processed_batch["action"] = action + + # Postprocess + final_batch = postprocessor(processed_batch) + final_action = final_batch["action"] + + # Execute action + obs, reward, terminated, truncated, info = env.step(final_action.cpu().numpy()) + + if terminated or truncated: + break + # Push final version to hub preprocessor.push_to_hub("my-username/act-preprocessor") postprocessor.push_to_hub("my-username/act-postprocessor") @@ -834,6 +1241,9 @@ processor.register_after_step_hook(validate_tensors) RobotProcessor provides a powerful, modular approach to data preprocessing in robotics: +- **Dual format support**: Works seamlessly with both EnvTransition tuples and batch dictionaries +- **Automatic format conversion**: Converts between tuple and batch formats as needed +- **LeRobot integration**: Native support for LeRobotDataset and ReplayBuffer formats - **Clear separation of concerns**: Each transformation is a separate, testable unit - **Proper state management**: Clear distinction between config (JSON) and state (tensors) - **Device-aware**: Seamless GPU/CPU transfers with `.to(device)` @@ -841,7 +1251,16 @@ RobotProcessor provides a powerful, modular approach to data preprocessing in ro - **Easy sharing**: Push to Hugging Face Hub for reproducibility - **Type safety**: Use `TransitionIndex` instead of magic numbers - **Debugging tools**: Step through transformations and add monitoring hooks +- **Flexible conversion**: Customize `to_transition` and `to_output` functions for specific needs -By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust. +Key advantages of the dual format approach: +- **Environment interaction**: Use tuple format for real-time robot control +- **Training/evaluation**: Use batch format for dataset processing and model training +- **Seamless integration**: Same processor works with both formats automatically +- **Backward compatibility**: Existing code using either format continues to work +- **Output format control**: Use `to_output` to control return format regardless of input format +- **Performance optimization**: Avoid unnecessary format conversions during time-critical operations + +By following these patterns, your preprocessing code becomes more maintainable, shareable, and robust while being compatible with the entire LeRobot ecosystem. For the full API reference, see the [RobotProcessor API documentation](/api/processor). diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index 7c88f6d3a..a7181dcc2 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -239,34 +239,26 @@ def _default_transition_to_batch(transition: EnvTransition) -> dict[str, Any]: class RobotProcessor(ModelHubMixin): """ Composable, debuggable post-processing processor for robot transitions. - The class orchestrates an ordered collection of small, functional - transforms—steps—executed left-to-right on each incoming - `EnvTransition`. - Parameters: - steps : Sequence[ProcessorStep], optional - Ordered list executed on every call - name : str, default="RobotProcessor" - Human-readable identifier that is persisted inside the JSON config. - seed : int | None, optional - Global seed forwarded to steps that choose to consume it. - Examples: - Basic usage:: - env = gym.make("CartPole-v1") - proc = RobotProcessor([ - ObservationNormalizer(), - IntrinsicVelocity(), - VelocityBonus(0.02), - ]) - obs, info = env.reset(seed=0) - tr = (obs, None, 0.0, False, False, info, {}) - obs, *_ = proc(tr) # agent sees a normalised observation - Inspecting intermediate results:: - for idx, step_tr in enumerate(proc.step_through(tr)): - print(idx, step_tr) - Serialization to the Hugging Face Hub:: - proc.save_pretrained("chkpt") - proc.push_to_hub("my-org/cartpole_proc") - loaded = RobotProcessor.from_pretrained("my-org/cartpole_proc") + + The class orchestrates an ordered collection of small, functional transforms—steps—executed + left-to-right on each incoming `EnvTransition`. It can process both `EnvTransition` tuples + and batch dictionaries, automatically converting between formats as needed. + + Args: + steps: Ordered list of processing steps executed on every call. Defaults to empty list. + name: Human-readable identifier that is persisted inside the JSON config. + Defaults to "RobotProcessor". + seed: Global seed forwarded to steps that choose to consume it. Defaults to None. + to_transition: Function to convert batch dict to EnvTransition tuple. + Defaults to _default_batch_to_transition. + to_output: Function to convert EnvTransition tuple to the desired output format. + Usually it is a batch dict or EnvTransition tuple. + Defaults to _default_transition_to_batch. + before_step_hooks: List of hooks called before each step. Each hook receives the step + index and transition, and can optionally return a modified transition. + after_step_hooks: List of hooks called after each step. Each hook receives the step + index and transition, and can optionally return a modified transition. + reset_hooks: List of hooks called during processor reset. """ steps: Sequence[ProcessorStep] = field(default_factory=list) @@ -276,7 +268,7 @@ class RobotProcessor(ModelHubMixin): to_transition: Callable[[dict[str, Any]], EnvTransition] = field( default_factory=lambda: _default_batch_to_transition, repr=False ) - to_batch: Callable[[EnvTransition], dict[str, Any]] = field( + to_output: Callable[[EnvTransition], dict[str, Any] | EnvTransition] = field( default_factory=lambda: _default_transition_to_batch, repr=False ) @@ -292,16 +284,22 @@ class RobotProcessor(ModelHubMixin): reset_hooks: list[Callable[[], None]] = field(default_factory=list, repr=False) def __call__(self, data: EnvTransition | dict[str, Any]): - """Process *data* through all steps. + """Process data through all steps. - The method accepts **either** the classic :pydata:`EnvTransition` tuple - **or** a *batch* dictionary (like the ones returned by - :class:`lerobot.utils.buffer.ReplayBuffer` or - :class:`lerobot.datasets.lerobot_dataset.LeRobotDataset`). If a dict is - supplied it is first converted to the internal tuple format using - :pyattr:`to_transition`; after all steps are executed the tuple is - transformed back into a dict with :pyattr:`to_batch` and the result is - returned – thereby preserving the caller's original data type. + The method accepts either the classic EnvTransition tuple or a batch dictionary + (like the ones returned by ReplayBuffer or LeRobotDataset). If a dict is supplied + it is first converted to the internal tuple format using to_transition; after all + steps are executed the tuple is transformed back into a dict with to_batch and the + result is returned – thereby preserving the caller's original data type. + + Args: + data: Either an EnvTransition tuple or a batch dictionary to process. + + Returns: + The processed data in the same format as the input (tuple or dict). + + Raises: + ValueError: If the transition is not a valid 7-tuple format. """ called_with_batch = isinstance(data, dict) @@ -329,7 +327,7 @@ class RobotProcessor(ModelHubMixin): if updated is not None: transition = updated - return self.to_batch(transition) if called_with_batch else transition + return self.to_output(transition) if called_with_batch else transition def step_through(self, transition: EnvTransition) -> Iterable[EnvTransition]: """Yield the intermediate Transition instances after each processor step.""" diff --git a/src/lerobot/scripts/eval.py b/src/lerobot/scripts/eval.py index 8dd2fa6d3..c80da8138 100644 --- a/src/lerobot/scripts/eval.py +++ b/src/lerobot/scripts/eval.py @@ -131,7 +131,14 @@ def rollout( render_callback(env) # Create observation processing processor - obs_processor = RobotProcessor([VanillaObservationProcessor()]) + # NOTE: During environment interaction, we skip batch dictionary conversion + # since that format is only needed for loss computation during training. + # Using identity functions to avoid unnecessary format transformations. + obs_processor = RobotProcessor( + [VanillaObservationProcessor()], + to_transition=lambda x: x, + to_output=lambda x: x, + ) all_observations = [] all_actions = [] @@ -152,7 +159,7 @@ def rollout( check_env_attributes_and_types(env) while not np.all(done): # Numpy array to tensor and changing dictionary keys to LeRobot policy format. - transition = (observation, None, None, None, None, None, None) + transition = (observation, None, None, None, None, info, None) processed_transition = obs_processor(transition) observation = processed_transition[TransitionIndex.OBSERVATION] if return_observations: diff --git a/tests/processor/test_batch_conversion.py b/tests/processor/test_batch_conversion.py index ae453ac3e..de2ca0e7d 100644 --- a/tests/processor/test_batch_conversion.py +++ b/tests/processor/test_batch_conversion.py @@ -273,7 +273,7 @@ def test_custom_converter(): batch["custom_field"] = "custom_value" return batch - proc = RobotProcessor([], to_transition=to_tr, to_batch=to_batch) + proc = RobotProcessor([], to_transition=to_tr, to_output=to_batch) batch = _dummy_batch() out = proc(batch)