diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 79ebf289a..e0c1c30ae 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -7,8 +7,6 @@ - sections: - local: il_robots title: Imitation Learning for Robots - - local: cameras - title: Cameras - local: bring_your_own_policies title: Bring Your Own Policies - local: integrate_hardware @@ -29,6 +27,8 @@ title: Porting Large Datasets - local: using_dataset_tools title: Using the Dataset Tools + - local: dataset_subtask + title: Using Subtasks in the Dataset title: "Datasets" - sections: - local: act @@ -110,6 +110,10 @@ - local: phone_teleop title: Phone title: "Teleoperators" +- sections: + - local: cameras + title: Cameras + title: "Sensors" - sections: - local: torch_accelerators title: PyTorch accelerators diff --git a/docs/source/cameras.mdx b/docs/source/cameras.mdx index 5c35be0ba..8af0f5ae5 100644 --- a/docs/source/cameras.mdx +++ b/docs/source/cameras.mdx @@ -1,12 +1,22 @@ # Cameras -LeRobot offers multiple options for video capture, including phone cameras, built-in laptop cameras, external webcams, and Intel RealSense cameras. To efficiently record frames from most cameras, you can use either the `OpenCVCamera` or `RealSenseCamera` class. For additional compatibility details on the `OpenCVCamera` class, refer to the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html). +LeRobot offers multiple options for video capture: -### Finding your camera +| Class | Supported Cameras | +| ----------------- | ----------------------------------- | +| `OpenCVCamera` | Phone, built-in laptop, USB webcams | +| `ZMQCamera` | Network-connected cameras | +| `RealSenseCamera` | Intel RealSense (with depth) | +| `Reachy2Camera` | Reachy 2 robot cameras | -To instantiate a camera, you need a camera identifier. This identifier might change if you reboot your computer or re-plug your camera, a behavior mostly dependant on your operating system. +> [!TIP] +> For `OpenCVCamera` compatibility details, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html). -To find the camera indices of the cameras plugged into your system, run the following script: +### Find your camera + +Every camera requires a unique identifier to be instantiated, allowing you to distinguish between multiple connected devices. + +`OpenCVCamera` and `RealSenseCamera` support auto-discovery. Run the command below to list available devices and their identifiers. Note that these identifiers may change after rebooting your computer or re-plugging the camera, depending on your operating system. ```bash lerobot-find-cameras opencv # or realsense for Intel Realsense cameras @@ -14,7 +24,7 @@ lerobot-find-cameras opencv # or realsense for Intel Realsense cameras The output will look something like this if you have two cameras connected: -``` +```bash --- Detected Cameras --- Camera #0: Name: OpenCV Camera @ 0 @@ -33,13 +43,37 @@ Camera #0: > [!WARNING] > When using Intel RealSense cameras in `macOS`, you could get this [error](https://github.com/IntelRealSense/librealsense/issues/12307): `Error finding RealSense cameras: failed to set power state`, this can be solved by running the same command with `sudo` permissions. Note that using RealSense cameras in `macOS` is unstable. -## Use Cameras +`ZMQCamera` and `Reachy2Camera` do not support auto-discovery. They must be configured manually by providing their network address and port or robot SDK settings. -Below are two examples, demonstrating how to work with the API. +## Use cameras -- **Asynchronous frame capture** using an OpenCV-based camera +### Frame access modes + +All camera classes implement three access modes for capturing frames: + +| Method | Behavior | Blocks? | Best For | +| ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------- | ---------------------------------------- | +| `read()` | Waits for the camera hardware to return a frame. May block for a long time depending on the camera and SDK. | Yes | Simple scripts, sequential capture | +| `async_read(timeout_ms)` | Returns the latest unconsumed frame from background thread. Blocks only if buffer is empty, up to `timeout_ms`. Raises `TimeoutError` if no frame arrives. | With a timeout | Control loops synchronized to camera FPS | +| `read_latest(max_age_ms)` | Peeks at the most recent frame in buffer (may be stale). Raises `TimeoutError` if frame is older than `max_age_ms`. | No | UI visualization, logging, monitoring | + +### Usage examples + +The following examples show how to use the camera API to configure and capture frames from different camera types. + +- **Blocking and non-blocking frame capture** using an OpenCV-based camera - **Color and depth capture** using an Intel RealSense camera +> [!WARNING] +> Failing to cleanly disconnect cameras can cause resource leaks. Use the context manager protocol to ensure automatic cleanup: +> +> ```python +> with OpenCVCamera(config) as camera: +> ... +> ``` +> +> You can also call `connect()` and `disconnect()` manually, but always use a `finally` block for the latter. + @@ -60,16 +94,30 @@ config = OpenCVCameraConfig( ) # Instantiate and connect an `OpenCVCamera`, performing a warm-up read (default). -camera = OpenCVCamera(config) -camera.connect() +with OpenCVCamera(config) as camera: + + # Read a frame synchronously — blocks until hardware delivers a new frame + frame = camera.read() + print(f"read() call returned frame with shape:", frame.shape) + + # Read a frame asynchronously with a timeout — returns the latest unconsumed frame or waits up to timeout_ms for a new one + try: + for i in range(10): + frame = camera.async_read(timeout_ms=200) + print(f"async_read call returned frame {i} with shape:", frame.shape) + except TimeoutError as e: + print(f"No frame received within timeout: {e}") + + # Instantly return a frame - returns the most recent frame captured by the camera + try: + initial_frame = camera.read_latest(max_age_ms=1000) + for i in range(10): + frame = camera.read_latest(max_age_ms=1000) + print(f"read_latest call returned frame {i} with shape:", frame.shape) + print(f"Was a new frame received by the camera? {not (initial_frame == frame).any()}") + except TimeoutError as e: + print(f"Frame too old: {e}") -# Read frames asynchronously in a loop via `async_read(timeout_ms)` -try: - for i in range(10): - frame = camera.async_read(timeout_ms=200) - print(f"Async frame {i} shape:", frame.shape) -finally: - camera.disconnect() ``` @@ -111,10 +159,10 @@ finally: -## Use your phone +## Use your phone's camera - + To use your iPhone as a camera on macOS, enable the Continuity Camera feature: @@ -124,83 +172,49 @@ To use your iPhone as a camera on macOS, enable the Continuity Camera feature: For more details, visit [Apple support](https://support.apple.com/en-gb/guide/mac-help/mchl77879b8a/mac). -Your iPhone should be detected automatically when running the camera setup script in the next section. - - + -If you want to use your phone as a camera on Linux, follow these steps to set up a virtual camera +If you want to use your phone as a camera using OBS, follow these steps to set up a virtual camera. -1. _Install `v4l2loopback-dkms` and `v4l-utils`_. Those packages are required to create virtual camera devices (`v4l2loopback`) and verify their settings with the `v4l2-ctl` utility from `v4l-utils`. Install them using: +1. _(Linux only) Install `v4l2loopback-dkms` and `v4l-utils`_. These packages create virtual camera devices and verify their settings. Install with: - -```python +```bash sudo apt install v4l2loopback-dkms v4l-utils ``` - -2. _Install [DroidCam](https://droidcam.app) on your phone_. This app is available for both iOS and Android. -3. _Install [OBS Studio](https://obsproject.com)_. This software will help you manage the camera feed. Install it using [Flatpak](https://flatpak.org): +2. _Install the [DroidCam app](https://droidcam.app) on your phone_. This app is available for both iOS and Android. +3. _Download and install [OBS Studio](https://obsproject.com)_. +4. _Download and install the [DroidCam OBS plugin](https://droidcam.app/obs)_. +5. _Start OBS Studio_. - -```python -flatpak install flathub com.obsproject.Studio -``` - - -4. _Install the DroidCam OBS plugin_. This plugin integrates DroidCam with OBS Studio. Install it with: - - -```python -flatpak install flathub com.obsproject.Studio.Plugin.DroidCam -``` - - -5. _Start OBS Studio_. Launch with: - - -```python -flatpak run com.obsproject.Studio -``` - - -6. _Add your phone as a source_. Follow the instructions [here](https://droidcam.app/obs/usage). Be sure to set the resolution to `640x480`. -7. _Adjust resolution settings_. In OBS Studio, go to `File > Settings > Video`. Change the `Base(Canvas) Resolution` and the `Output(Scaled) Resolution` to `640x480` by manually typing it in. +6. _Add your phone as a source_. Follow the instructions [here](https://droidcam.app/obs/usage). Be sure to set the resolution to `640x480` to avoid the watermarks. +7. _Adjust resolution settings_. In OBS Studio, go to `File > Settings > Video` or `OBS > Preferences... > Video`. Change the `Base(Canvas) Resolution` and the `Output(Scaled) Resolution` to `640x480` by manually typing it. 8. _Start virtual camera_. In OBS Studio, follow the instructions [here](https://obsproject.com/kb/virtual-camera-guide). -9. _Verify the virtual camera setup_. Use `v4l2-ctl` to list the devices: +9. _Verify the virtual camera setup and resolution_. + - **Linux**: Use `v4l2-ctl` to list devices and check resolution: + ```bash + v4l2-ctl --list-devices # find VirtualCam and note its /dev/videoX path + v4l2-ctl -d /dev/videoX --get-fmt-video # replace with your VirtualCam path + ``` + You should see `VirtualCam` listed and resolution `640x480`. + - **macOS**: Open Photo Booth or FaceTime and select "OBS Virtual Camera" as the input. + - **Windows**: The native Camera app doesn't support virtual cameras. Use a video conferencing app (Zoom, Teams) or run `lerobot-find-cameras opencv` directly to verify. - -```python -v4l2-ctl --list-devices -``` - +
+Troubleshooting -You should see an entry like: +> The virtual camera resolution is incorrect. -``` -VirtualCam (platform:v4l2loopback-000): -/dev/video1 -``` +Delete the virtual camera source and recreate it. The resolution cannot be changed after creation. -10. _Check the camera resolution_. Use `v4l2-ctl` to ensure that the virtual camera output resolution is `640x480`. Change `/dev/video1` to the port of your virtual camera from the output of `v4l2-ctl --list-devices`. +> Error reading frame in background thread for OpenCVCamera(X): OpenCVCamera(X) frame width=640 or height=480 do not match configured width=1920 or height=1080. - -```python -v4l2-ctl -d /dev/video1 --get-fmt-video -``` - +This error is caused by OBS Virtual Camera advertising a `1920x1080` resolution despite rescaling. The only fix for now is to comment out the width and height check in `_postprocess_image()`. -You should see an entry like: - -``` ->>> Format Video Capture: ->>> Width/Height : 640/480 ->>> Pixel Format : 'YUYV' (YUYV 4:2:2) -``` - -Troubleshooting: If the resolution is not correct you will have to delete the Virtual Camera port and try again as it cannot be changed. - -If everything is set up correctly, you can proceed with the rest of the tutorial. +
+ +If everything is set up correctly, your phone will appear as a standard OpenCV camera and can be used with `OpenCVCamera`. diff --git a/docs/source/dataset_subtask.mdx b/docs/source/dataset_subtask.mdx new file mode 100644 index 000000000..beb5d80bd --- /dev/null +++ b/docs/source/dataset_subtask.mdx @@ -0,0 +1,278 @@ +# Using Subtasks in LeRobot Datasets + +Subtask support in robotics datasets has proven effective in improving robot reasoning and understanding. Subtasks are particularly useful for: + +- **Hierarchical policies**: Building policies that include subtask predictions to visualize robot reasoning in real time +- **Reward modeling**: Helping reward models understand task progression (e.g., SARM-style stage-aware reward models) +- **Task decomposition**: Breaking down complex manipulation tasks into atomic, interpretable steps + +LeRobotDataset now supports subtasks as part of its dataset structure, alongside tasks. + +## What are Subtasks? + +While a **task** describes the overall goal (e.g., "Pick up the apple and place it in the basket"), **subtasks** break down the execution into finer-grained steps: + +1. "Approach the apple" +2. "Grasp the apple" +3. "Lift the apple" +4. "Move to basket" +5. "Release the apple" + +Each frame in the dataset can be annotated with its corresponding subtask, enabling models to learn and predict these intermediate stages. + +An overview of subtask annotation showing how frames are labeled with intermediate subtask stages + +

+ Figure: Overview of subtask annotation. +

+ +**Reference:** _Subtask-learning based for robot self-assembly in flexible collaborative assembly in manufacturing_, Original Article, Published: 19 April 2022. + +## Dataset Structure + +Subtask information is stored in the dataset metadata: + +``` +my-dataset/ +├── data/ +│ └── ... +├── meta/ +│ ├── info.json +│ ├── stats.json +│ ├── tasks.parquet +│ ├── subtasks.parquet # Subtask index → subtask string mapping +│ └── episodes/ +│ └── ... +└── videos/ + └── ... +``` + +### Subtasks Parquet File + +The `meta/subtasks.parquet` file maps subtask indices to their natural language descriptions: + +| subtask_index | subtask (index column) | +| ------------- | ---------------------- | +| 0 | "Approach the apple" | +| 1 | "Grasp the apple" | +| 2 | "Lift the apple" | +| ... | ... | + +### Frame-Level Annotations + +Each frame in the dataset can include a `subtask_index` field that references the subtasks parquet file: + +```python +# Example frame data in the parquet file +{ + "index": 42, + "timestamp": 1.4, + "episode_index": 0, + "task_index": 0, + "subtask_index": 2, # References "Lift the apple" + "observation.state": [...], + "action": [...], +} +``` + +## Annotating Datasets with Subtasks + +We provide a HuggingFace Space for easily annotating any LeRobotDataset with subtasks: + +**[https://huggingface.co/spaces/lerobot/annotate](https://huggingface.co/spaces/lerobot/annotate)** + +After completing your annotation: + +1. Click "Push to Hub" to upload your annotated dataset +2. You can also run the annotation space locally by following the instructions at [github.com/huggingface/lerobot-annotate](https://github.com/huggingface/lerobot-annotate) + +## Loading Datasets with Subtasks + +When you load a dataset with subtask annotations, the subtask information is automatically available: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset + +# Load a dataset with subtask annotations +dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated") + +# Access a sample +sample = dataset[100] + +# The sample includes both task and subtask information +print(sample["task"]) # "Collect the fruit" +print(sample["subtask"]) # "Grasp the apple" +print(sample["task_index"]) # tensor(0) +print(sample["subtask_index"]) # tensor(2) +``` + +### Checking for Subtask Support + +You can check if a dataset has subtask annotations: + +```python +# Check if subtasks are available +has_subtasks = ( + "subtask_index" in dataset.features + and dataset.meta.subtasks is not None +) + +if has_subtasks: + print(f"Dataset has {len(dataset.meta.subtasks)} unique subtasks") + print("Subtasks:", list(dataset.meta.subtasks.index)) +``` + +## Using Subtasks for Training + +### With the Tokenizer Processor + +The `TokenizerProcessor` automatically handles subtask tokenization for Vision-Language Action (VLA) models: + +```python +from lerobot.processor.tokenizer_processor import TokenizerProcessor +from lerobot.processor.pipeline import ProcessorPipeline + +# Create a tokenizer processor +tokenizer_processor = TokenizerProcessor( + tokenizer_name_or_path="google/paligemma-3b-pt-224", + padding="max_length", + max_length=64, +) + +# The processor will automatically tokenize subtasks if present in the batch +# and add them to the observation under: +# - "observation.subtask.tokens" +# - "observation.subtask.attention_mask" +``` + +When subtasks are available in the batch, the tokenizer processor adds: + +- `observation.subtask.tokens`: Tokenized subtask text +- `observation.subtask.attention_mask`: Attention mask for the subtask tokens + +### DataLoader with Subtasks + +```python +import torch +from lerobot.datasets.lerobot_dataset import LeRobotDataset + +dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated") + +dataloader = torch.utils.data.DataLoader( + dataset, + batch_size=16, + shuffle=True, +) + +for batch in dataloader: + # Access subtask information in the batch + subtasks = batch["subtask"] # List of subtask strings + subtask_indices = batch["subtask_index"] # Tensor of subtask indices + + # Use for training hierarchical policies or reward models + print(f"Batch subtasks: {set(subtasks)}") +``` + +## Example Datasets with Subtask Annotations + +Try loading a dataset with subtask annotations: + +```python +from lerobot.datasets.lerobot_dataset import LeRobotDataset + +# Example dataset with subtask annotations +dataset = LeRobotDataset("jadechoghari/collect-fruit-annotated") + +# Explore the subtasks +print("Available subtasks:") +for subtask_name in dataset.meta.subtasks.index: + print(f" - {subtask_name}") + +# Get subtask distribution +subtask_counts = {} +for i in range(len(dataset)): + sample = dataset[i] + subtask = sample["subtask"] + subtask_counts[subtask] = subtask_counts.get(subtask, 0) + 1 + +print("\nSubtask distribution:") +for subtask, count in sorted(subtask_counts.items(), key=lambda x: -x[1]): + print(f" {subtask}: {count} frames") +``` + +## Use Cases + +### 1. Hierarchical Policy Training + +Train policies that predict both actions and current subtask: + +```python +class HierarchicalPolicy(nn.Module): + def __init__(self, num_subtasks): + super().__init__() + self.action_head = nn.Linear(hidden_dim, action_dim) + self.subtask_head = nn.Linear(hidden_dim, num_subtasks) + + def forward(self, observations): + features = self.encoder(observations) + actions = self.action_head(features) + subtask_logits = self.subtask_head(features) + return actions, subtask_logits +``` + +### 2. Stage-Aware Reward Modeling (SARM) + +Build reward models that understand task progression: + +```python +# SARM predicts: +# - Stage: Which subtask is being executed (discrete) +# - Progress: How far along the subtask (continuous 0-1) + +class SARMRewardModel(nn.Module): + def forward(self, observations): + features = self.encoder(observations) + stage_logits = self.stage_classifier(features) + progress = self.progress_regressor(features) + return stage_logits, progress +``` + +### 3. Progress Visualization + +Monitor robot execution by tracking subtask progression: + +```python +def visualize_execution(model, observations): + for t, obs in enumerate(observations): + action, subtask_logits = model(obs) + predicted_subtask = subtask_names[subtask_logits.argmax()] + print(f"t={t}: Executing '{predicted_subtask}'") +``` + +## API Reference + +### LeRobotDataset Properties + +| Property | Type | Description | +| --------------------------- | ---------------------- | ------------------------------------------ | +| `meta.subtasks` | `pd.DataFrame \| None` | DataFrame mapping subtask names to indices | +| `features["subtask_index"]` | `dict` | Feature spec for subtask_index if present | + +### Sample Keys + +When subtasks are available, each sample includes: + +| Key | Type | Description | +| --------------- | -------------- | ------------------------------------ | +| `subtask_index` | `torch.Tensor` | Integer index of the current subtask | +| `subtask` | `str` | Natural language subtask description | + +## Related Resources + +- [SARM Paper](https://arxiv.org/pdf/2509.25358) - Stage-Aware Reward Modeling for Long Horizon Robot Manipulation +- [LeRobot Annotate Space](https://huggingface.co/spaces/lerobot/annotate) - Interactive annotation tool +- [LeRobotDataset v3.0](./lerobot-dataset-v3) - Dataset format documentation diff --git a/src/lerobot/cameras/zmq/camera_zmq.py b/src/lerobot/cameras/zmq/camera_zmq.py index a231a582a..f29e16a28 100644 --- a/src/lerobot/cameras/zmq/camera_zmq.py +++ b/src/lerobot/cameras/zmq/camera_zmq.py @@ -166,8 +166,10 @@ class ZMQCamera(Camera): @staticmethod def find_cameras() -> list[dict[str, Any]]: - """ZMQ cameras require manual configuration (server address/port).""" - return [] + """ + Detection not implemented for ZMQ cameras. These cameras require manual configuration (server address/port). + """ + raise NotImplementedError("Camera detection is not implemented for ZMQ cameras.") def _read_from_hardware(self) -> NDArray[Any]: """ diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index e2928e2a6..123d455c6 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -1396,6 +1396,132 @@ BYTES_PER_KIB = 1024 BYTES_PER_MIB = BYTES_PER_KIB * BYTES_PER_KIB +def modify_tasks( + dataset: LeRobotDataset, + new_task: str | None = None, + episode_tasks: dict[int, str] | None = None, +) -> LeRobotDataset: + """Modify tasks in a LeRobotDataset. + + This function allows you to either: + 1. Set a single task for the entire dataset (using `new_task`) + 2. Set specific tasks for specific episodes (using `episode_tasks`) + + You can combine both: `new_task` sets the default, and `episode_tasks` overrides + specific episodes. + + The dataset is modified in-place, updating only the task-related files: + - meta/tasks.parquet + - data/**/*.parquet (task_index column) + - meta/episodes/**/*.parquet (tasks column) + - meta/info.json (total_tasks) + + Args: + dataset: The source LeRobotDataset to modify. + new_task: A single task string to apply to all episodes. If None and episode_tasks + is also None, raises an error. + episode_tasks: Optional dict mapping episode indices to their task strings. + Overrides `new_task` for specific episodes. + + + Examples: + Set a single task for all episodes: + dataset = modify_tasks(dataset, new_task="Pick up the cube") + + Set different tasks for specific episodes: + dataset = modify_tasks( + dataset, + episode_tasks={0: "Task A", 1: "Task B", 2: "Task A"} + ) + + Set a default task with overrides: + dataset = modify_tasks( + dataset, + new_task="Default task", + episode_tasks={5: "Special task for episode 5"} + ) + """ + if new_task is None and episode_tasks is None: + raise ValueError("Must specify at least one of new_task or episode_tasks") + + if episode_tasks is not None: + valid_indices = set(range(dataset.meta.total_episodes)) + invalid = set(episode_tasks.keys()) - valid_indices + if invalid: + raise ValueError(f"Invalid episode indices: {invalid}") + + # Ensure episodes metadata is loaded + if dataset.meta.episodes is None: + dataset.meta.episodes = load_episodes(dataset.root) + + # Build the mapping from episode index to task string + episode_to_task: dict[int, str] = {} + for ep_idx in range(dataset.meta.total_episodes): + if episode_tasks and ep_idx in episode_tasks: + episode_to_task[ep_idx] = episode_tasks[ep_idx] + elif new_task is not None: + episode_to_task[ep_idx] = new_task + else: + # Keep original task if not overridden and no default provided + original_tasks = dataset.meta.episodes[ep_idx]["tasks"] + if not original_tasks: + raise ValueError(f"Episode {ep_idx} has no tasks and no default task was provided") + episode_to_task[ep_idx] = original_tasks[0] + + # Collect all unique tasks and create new task mapping + unique_tasks = sorted(set(episode_to_task.values())) + new_task_df = pd.DataFrame({"task_index": list(range(len(unique_tasks)))}, index=unique_tasks) + task_to_index = {task: idx for idx, task in enumerate(unique_tasks)} + + logging.info(f"Modifying tasks in {dataset.repo_id}") + logging.info(f"New tasks: {unique_tasks}") + + root = dataset.root + + # Update data files - modify task_index column + logging.info("Updating data files...") + data_dir = root / DATA_DIR + + for parquet_path in tqdm(sorted(data_dir.rglob("*.parquet")), desc="Updating data"): + df = pd.read_parquet(parquet_path) + + # Build a mapping from episode_index to new task_index for rows in this file + episode_indices_in_file = df["episode_index"].unique() + ep_to_new_task_idx = { + ep_idx: task_to_index[episode_to_task[ep_idx]] for ep_idx in episode_indices_in_file + } + + # Update task_index column + df["task_index"] = df["episode_index"].map(ep_to_new_task_idx) + df.to_parquet(parquet_path, index=False) + + # Update episodes metadata - modify tasks column + logging.info("Updating episodes metadata...") + episodes_dir = root / "meta" / "episodes" + + for parquet_path in tqdm(sorted(episodes_dir.rglob("*.parquet")), desc="Updating episodes"): + df = pd.read_parquet(parquet_path) + + # Update tasks column + df["tasks"] = df["episode_index"].apply(lambda ep_idx: [episode_to_task[ep_idx]]) + df.to_parquet(parquet_path, index=False) + + # Write new tasks.parquet + write_tasks(new_task_df, root) + + # Update info.json + dataset.meta.info["total_tasks"] = len(unique_tasks) + write_info(dataset.meta.info, root) + + # Reload metadata to reflect changes + dataset.meta.tasks = new_task_df + dataset.meta.episodes = load_episodes(root) + + logging.info(f"Tasks: {unique_tasks}") + + return dataset + + def convert_image_to_video_dataset( dataset: LeRobotDataset, output_dir: Path, diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 6798e7fd7..36bffa190 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -57,6 +57,7 @@ from lerobot.datasets.utils import ( load_info, load_nested_dataset, load_stats, + load_subtasks, load_tasks, update_chunk_file_indices, validate_episode_buffer, @@ -162,6 +163,7 @@ class LeRobotDatasetMetadata: self.info = load_info(self.root) check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks = load_tasks(self.root) + self.subtasks = load_subtasks(self.root) self.episodes = load_episodes(self.root) self.stats = load_stats(self.root) @@ -518,6 +520,7 @@ class LeRobotDatasetMetadata: _validate_feature_names(features) obj.tasks = None + obj.subtasks = None obj.episodes = None obj.stats = None obj.info = create_empty_dataset_info( @@ -1075,6 +1078,12 @@ class LeRobotDataset(torch.utils.data.Dataset): # Add task as a string task_idx = item["task_index"].item() item["task"] = self.meta.tasks.iloc[task_idx].name + + # add subtask information if available + if "subtask_index" in self.features and self.meta.subtasks is not None: + subtask_idx = item["subtask_index"].item() + item["subtask"] = self.meta.subtasks.iloc[subtask_idx].name + return item def __repr__(self): diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index ed678af6e..321ecedd5 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -60,6 +60,7 @@ VIDEO_DIR = "videos" CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}" DEFAULT_TASKS_PATH = "meta/tasks.parquet" +DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet" DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" @@ -353,6 +354,14 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame: return tasks +def load_subtasks(local_dir: Path) -> pandas.DataFrame | None: + """Load subtasks from subtasks.parquet if it exists.""" + subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH + if subtasks_path.exists(): + return pd.read_parquet(subtasks_path) + return None + + def write_episodes(episodes: Dataset, local_dir: Path) -> None: """Write episode metadata to a parquet file in the LeRobot v3.0 format. This function writes episode-level metadata to a single parquet file. diff --git a/src/lerobot/policies/sac/modeling_sac.py b/src/lerobot/policies/sac/modeling_sac.py index c7c6798ed..d5dd71a48 100644 --- a/src/lerobot/policies/sac/modeling_sac.py +++ b/src/lerobot/policies/sac/modeling_sac.py @@ -239,8 +239,10 @@ class SACPolicy( + target_param.data * (1.0 - self.config.critic_target_update_weight) ) - def update_temperature(self): - self.temperature = self.log_alpha.exp().item() + @property + def temperature(self) -> float: + """Return the current temperature value, always in sync with log_alpha.""" + return self.log_alpha.exp().item() def compute_loss_critic( self, @@ -457,11 +459,10 @@ class SACPolicy( dim = continuous_action_dim + (1 if self.config.num_discrete_actions is not None else 0) self.target_entropy = -np.prod(dim) / 2 - def _init_temperature(self): - """Set up temperature parameter and initial log_alpha.""" + def _init_temperature(self) -> None: + """Set up temperature parameter (log_alpha).""" temp_init = self.config.temperature_init self.log_alpha = nn.Parameter(torch.tensor([math.log(temp_init)])) - self.temperature = self.log_alpha.exp().item() class SACObservationEncoder(nn.Module): diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index 4f9485fee..18c7b0220 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -168,11 +168,12 @@ def _extract_complementary_data(batch: dict[str, Any]) -> dict[str, Any]: """ pad_keys = {k: v for k, v in batch.items() if "_is_pad" in k} task_key = {"task": batch["task"]} if "task" in batch else {} + subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {} index_key = {"index": batch["index"]} if "index" in batch else {} task_index_key = {"task_index": batch["task_index"]} if "task_index" in batch else {} episode_index_key = {"episode_index": batch["episode_index"]} if "episode_index" in batch else {} - return {**pad_keys, **task_key, **index_key, **task_index_key, **episode_index_key} + return {**pad_keys, **task_key, **subtask_key, **index_key, **task_index_key, **episode_index_key} def create_transition( diff --git a/src/lerobot/processor/tokenizer_processor.py b/src/lerobot/processor/tokenizer_processor.py index 5cd1bebb0..df559555a 100644 --- a/src/lerobot/processor/tokenizer_processor.py +++ b/src/lerobot/processor/tokenizer_processor.py @@ -34,6 +34,8 @@ from lerobot.utils.constants import ( ACTION_TOKEN_MASK, ACTION_TOKENS, OBS_LANGUAGE_ATTENTION_MASK, + OBS_LANGUAGE_SUBTASK_ATTENTION_MASK, + OBS_LANGUAGE_SUBTASK_TOKENS, OBS_LANGUAGE_TOKENS, ) from lerobot.utils.import_utils import _transformers_available @@ -139,6 +141,32 @@ class TokenizerProcessorStep(ObservationProcessorStep): return None + def get_subtask(self, transition: EnvTransition) -> list[str] | None: + """ + Extracts the subtask from the transition's complementary data. + + Args: + transition: The environment transition. + + Returns: + A list of subtask strings, or None if the subtask key is not found or the value is None. + """ + complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA) + if complementary_data is None: + return None + + subtask = complementary_data.get("subtask") + if subtask is None: + return None + + # Standardize to a list of strings for the tokenizer + if isinstance(subtask, str): + return [subtask] + elif isinstance(subtask, list) and all(isinstance(t, str) for t in subtask): + return subtask + + return None + def observation(self, observation: RobotObservation) -> RobotObservation: """ Tokenizes the task description and adds it to the observation dictionary. @@ -176,6 +204,24 @@ class TokenizerProcessorStep(ObservationProcessorStep): new_observation[OBS_LANGUAGE_TOKENS] = tokenized_prompt["input_ids"] new_observation[OBS_LANGUAGE_ATTENTION_MASK] = tokenized_prompt["attention_mask"].to(dtype=torch.bool) + # Tokenize subtask if available + subtask = self.get_subtask(self.transition) + if subtask is not None: + tokenized_subtask = self._tokenize_text(subtask) + + # Move new tokenized tensors to the detected device + if target_device is not None: + tokenized_subtask = { + k: v.to(target_device) if isinstance(v, torch.Tensor) else v + for k, v in tokenized_subtask.items() + } + + # Add tokenized subtask to the observation + new_observation[OBS_LANGUAGE_SUBTASK_TOKENS] = tokenized_subtask["input_ids"] + new_observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] = tokenized_subtask["attention_mask"].to( + dtype=torch.bool + ) + return new_observation def _detect_device(self, transition: EnvTransition) -> torch.device | None: diff --git a/src/lerobot/rl/learner.py b/src/lerobot/rl/learner.py index abc5c9504..ee09ac9ac 100644 --- a/src/lerobot/rl/learner.py +++ b/src/lerobot/rl/learner.py @@ -545,9 +545,6 @@ def add_actor_information_and_train( training_infos["temperature_grad_norm"] = temp_grad_norm training_infos["temperature"] = policy.temperature - # Update temperature - policy.update_temperature() - # Push policy to actors if needed if time.time() - last_time_policy_pushed > policy_parameters_push_frequency: push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy) diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index 4ba6ce44f..2ca9c520d 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -18,7 +18,7 @@ Edit LeRobot datasets using various transformation tools. This script allows you to delete episodes, split datasets, merge datasets, -remove features, and convert image datasets to video format. +remove features, modify tasks, and convert image datasets to video format. When new_repo_id is specified, creates a new dataset. Usage Examples: @@ -66,6 +66,25 @@ Remove camera feature: --operation.type remove_feature \ --operation.feature_names "['observation.images.top']" +Modify tasks - set a single task for all episodes (WARNING: modifies in-place): + python -m lerobot.scripts.lerobot_edit_dataset \ + --repo_id lerobot/pusht \ + --operation.type modify_tasks \ + --operation.new_task "Pick up the cube and place it" + +Modify tasks - set different tasks for specific episodes (WARNING: modifies in-place): + python -m lerobot.scripts.lerobot_edit_dataset \ + --repo_id lerobot/pusht \ + --operation.type modify_tasks \ + --operation.episode_tasks '{"0": "Task A", "1": "Task B", "2": "Task A"}' + +Modify tasks - set default task with overrides for specific episodes (WARNING: modifies in-place): + python -m lerobot.scripts.lerobot_edit_dataset \ + --repo_id lerobot/pusht \ + --operation.type modify_tasks \ + --operation.new_task "Default task" \ + --operation.episode_tasks '{"5": "Special task for episode 5"}' + Convert image dataset to video format and save locally: python -m lerobot.scripts.lerobot_edit_dataset \ --repo_id lerobot/pusht_image \ @@ -100,6 +119,7 @@ from lerobot.datasets.dataset_tools import ( convert_image_to_video_dataset, delete_episodes, merge_datasets, + modify_tasks, remove_feature, split_dataset, ) @@ -132,6 +152,13 @@ class RemoveFeatureConfig: feature_names: list[str] | None = None +@dataclass +class ModifyTasksConfig: + type: str = "modify_tasks" + new_task: str | None = None + episode_tasks: dict[str, str] | None = None + + @dataclass class ConvertImageToVideoConfig: type: str = "convert_image_to_video" @@ -151,7 +178,12 @@ class ConvertImageToVideoConfig: class EditDatasetConfig: repo_id: str operation: ( - DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig | ConvertImageToVideoConfig + DeleteEpisodesConfig + | SplitConfig + | MergeConfig + | RemoveFeatureConfig + | ModifyTasksConfig + | ConvertImageToVideoConfig ) root: str | None = None new_repo_id: str | None = None @@ -296,6 +328,48 @@ def handle_remove_feature(cfg: EditDatasetConfig) -> None: LeRobotDataset(output_repo_id, root=output_dir).push_to_hub() +def handle_modify_tasks(cfg: EditDatasetConfig) -> None: + if not isinstance(cfg.operation, ModifyTasksConfig): + raise ValueError("Operation config must be ModifyTasksConfig") + + new_task = cfg.operation.new_task + episode_tasks_raw = cfg.operation.episode_tasks + + if new_task is None and episode_tasks_raw is None: + raise ValueError("Must specify at least one of new_task or episode_tasks for modify_tasks operation") + + # Warn about in-place modification behavior + if cfg.new_repo_id is not None: + logging.warning("modify_tasks modifies datasets in-place. The --new_repo_id parameter is ignored.") + + dataset = LeRobotDataset(cfg.repo_id, root=cfg.root) + logging.warning(f"Modifying dataset in-place at {dataset.root}. Original data will be overwritten.") + + # Convert episode_tasks keys from string to int if needed (CLI passes strings) + episode_tasks: dict[int, str] | None = None + if episode_tasks_raw is not None: + episode_tasks = {int(k): v for k, v in episode_tasks_raw.items()} + + logging.info(f"Modifying tasks in {cfg.repo_id}") + if new_task: + logging.info(f" Default task: '{new_task}'") + if episode_tasks: + logging.info(f" Episode-specific tasks: {episode_tasks}") + + modified_dataset = modify_tasks( + dataset, + new_task=new_task, + episode_tasks=episode_tasks, + ) + + logging.info(f"Dataset modified at {dataset.root}") + logging.info(f"Tasks: {list(modified_dataset.meta.tasks.index)}") + + if cfg.push_to_hub: + logging.info(f"Pushing to hub as {cfg.repo_id}") + modified_dataset.push_to_hub() + + def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None: # Note: Parser may create any config type with the right fields, so we access fields directly # instead of checking isinstance() @@ -371,12 +445,14 @@ def edit_dataset(cfg: EditDatasetConfig) -> None: handle_merge(cfg) elif operation_type == "remove_feature": handle_remove_feature(cfg) + elif operation_type == "modify_tasks": + handle_modify_tasks(cfg) elif operation_type == "convert_image_to_video": handle_convert_image_to_video(cfg) else: raise ValueError( f"Unknown operation type: {operation_type}\n" - f"Available operations: delete_episodes, split, merge, remove_feature, convert_to_video" + f"Available operations: delete_episodes, split, merge, remove_feature, modify_tasks, convert_image_to_video" ) diff --git a/src/lerobot/utils/constants.py b/src/lerobot/utils/constants.py index 43a61b4f7..ecd54844c 100644 --- a/src/lerobot/utils/constants.py +++ b/src/lerobot/utils/constants.py @@ -26,6 +26,9 @@ OBS_IMAGES = OBS_IMAGE + "s" OBS_LANGUAGE = OBS_STR + ".language" OBS_LANGUAGE_TOKENS = OBS_LANGUAGE + ".tokens" OBS_LANGUAGE_ATTENTION_MASK = OBS_LANGUAGE + ".attention_mask" +OBS_LANGUAGE_SUBTASK = OBS_STR + ".subtask" +OBS_LANGUAGE_SUBTASK_TOKENS = OBS_LANGUAGE_SUBTASK + ".tokens" +OBS_LANGUAGE_SUBTASK_ATTENTION_MASK = OBS_LANGUAGE_SUBTASK + ".attention_mask" ACTION = "action" ACTION_PREFIX = ACTION + "." diff --git a/tests/datasets/test_dataset_tools.py b/tests/datasets/test_dataset_tools.py index 35a369de9..1de199630 100644 --- a/tests/datasets/test_dataset_tools.py +++ b/tests/datasets/test_dataset_tools.py @@ -26,6 +26,7 @@ from lerobot.datasets.dataset_tools import ( delete_episodes, merge_datasets, modify_features, + modify_tasks, remove_feature, split_dataset, ) @@ -1050,6 +1051,174 @@ def test_modify_features_preserves_file_structure(sample_dataset, tmp_path): assert "reward" in modified_dataset.meta.features +def test_modify_tasks_single_task_for_all(sample_dataset): + """Test setting a single task for all episodes.""" + new_task = "Pick up the cube and place it" + + modified_dataset = modify_tasks(sample_dataset, new_task=new_task) + + # Verify all episodes have the new task + assert len(modified_dataset.meta.tasks) == 1 + assert new_task in modified_dataset.meta.tasks.index + + # Verify task_index is 0 for all frames (only one task) + for i in range(len(modified_dataset)): + item = modified_dataset[i] + assert item["task_index"].item() == 0 + assert item["task"] == new_task + + +def test_modify_tasks_episode_specific(sample_dataset): + """Test setting different tasks for specific episodes.""" + episode_tasks = { + 0: "Task A", + 1: "Task B", + 2: "Task A", + 3: "Task C", + 4: "Task B", + } + + modified_dataset = modify_tasks(sample_dataset, episode_tasks=episode_tasks) + + # Verify correct number of unique tasks + unique_tasks = set(episode_tasks.values()) + assert len(modified_dataset.meta.tasks) == len(unique_tasks) + + # Verify each episode has the correct task + for ep_idx, expected_task in episode_tasks.items(): + ep_data = modified_dataset.meta.episodes[ep_idx] + assert ep_data["tasks"][0] == expected_task + + +def test_modify_tasks_default_with_overrides(sample_dataset): + """Test setting a default task with specific overrides.""" + default_task = "Default task" + override_task = "Special task" + episode_tasks = {2: override_task, 4: override_task} + + modified_dataset = modify_tasks( + sample_dataset, + new_task=default_task, + episode_tasks=episode_tasks, + ) + + # Verify correct number of unique tasks + assert len(modified_dataset.meta.tasks) == 2 + assert default_task in modified_dataset.meta.tasks.index + assert override_task in modified_dataset.meta.tasks.index + + # Verify episodes have correct tasks + for ep_idx in range(5): + ep_data = modified_dataset.meta.episodes[ep_idx] + if ep_idx in episode_tasks: + assert ep_data["tasks"][0] == override_task + else: + assert ep_data["tasks"][0] == default_task + + +def test_modify_tasks_no_task_specified(sample_dataset): + """Test error when no task is specified.""" + with pytest.raises(ValueError, match="Must specify at least one of new_task or episode_tasks"): + modify_tasks(sample_dataset) + + +def test_modify_tasks_invalid_episode_indices(sample_dataset): + """Test error with invalid episode indices.""" + with pytest.raises(ValueError, match="Invalid episode indices"): + modify_tasks(sample_dataset, episode_tasks={10: "Task", 20: "Task"}) + + +def test_modify_tasks_updates_info_json(sample_dataset): + """Test that total_tasks is updated in info.json.""" + episode_tasks = {0: "Task A", 1: "Task B", 2: "Task C", 3: "Task A", 4: "Task B"} + + modified_dataset = modify_tasks(sample_dataset, episode_tasks=episode_tasks) + + # Verify total_tasks is updated + assert modified_dataset.meta.total_tasks == 3 + + +def test_modify_tasks_preserves_other_metadata(sample_dataset): + """Test that modifying tasks preserves other metadata.""" + original_frames = sample_dataset.meta.total_frames + original_episodes = sample_dataset.meta.total_episodes + original_fps = sample_dataset.meta.fps + + modified_dataset = modify_tasks(sample_dataset, new_task="New task") + + # Verify other metadata is preserved + assert modified_dataset.meta.total_frames == original_frames + assert modified_dataset.meta.total_episodes == original_episodes + assert modified_dataset.meta.fps == original_fps + + +def test_modify_tasks_task_index_correct(sample_dataset): + """Test that task_index values are correct in data files.""" + # Create tasks that will have predictable indices (sorted alphabetically) + episode_tasks = { + 0: "Alpha task", # Will be index 0 + 1: "Beta task", # Will be index 1 + 2: "Alpha task", # Will be index 0 + 3: "Gamma task", # Will be index 2 + 4: "Beta task", # Will be index 1 + } + + modified_dataset = modify_tasks(sample_dataset, episode_tasks=episode_tasks) + + # Verify task indices are correct + task_to_expected_idx = { + "Alpha task": 0, + "Beta task": 1, + "Gamma task": 2, + } + + for i in range(len(modified_dataset)): + item = modified_dataset[i] + ep_idx = item["episode_index"].item() + expected_task = episode_tasks[ep_idx] + expected_idx = task_to_expected_idx[expected_task] + assert item["task_index"].item() == expected_idx + assert item["task"] == expected_task + + +def test_modify_tasks_in_place(sample_dataset): + """Test that modify_tasks modifies the dataset in-place.""" + original_root = sample_dataset.root + + modified_dataset = modify_tasks(sample_dataset, new_task="New task") + + # Verify same instance is returned and root is unchanged + assert modified_dataset is sample_dataset + assert modified_dataset.root == original_root + + +def test_modify_tasks_keeps_original_when_not_overridden(sample_dataset): + """Test that original tasks are kept when using episode_tasks without new_task.""" + from lerobot.datasets.utils import load_episodes + + # Ensure episodes metadata is loaded + if sample_dataset.meta.episodes is None: + sample_dataset.meta.episodes = load_episodes(sample_dataset.meta.root) + + # Get original tasks for episodes not being overridden + original_task_ep0 = sample_dataset.meta.episodes[0]["tasks"][0] + original_task_ep1 = sample_dataset.meta.episodes[1]["tasks"][0] + + # Only override episodes 2, 3, 4 + episode_tasks = {2: "New Task A", 3: "New Task B", 4: "New Task A"} + + modified_dataset = modify_tasks(sample_dataset, episode_tasks=episode_tasks) + + # Verify original tasks are kept for episodes 0 and 1 + assert modified_dataset.meta.episodes[0]["tasks"][0] == original_task_ep0 + assert modified_dataset.meta.episodes[1]["tasks"][0] == original_task_ep1 + + # Verify new tasks for overridden episodes + assert modified_dataset.meta.episodes[2]["tasks"][0] == "New Task A" + assert modified_dataset.meta.episodes[3]["tasks"][0] == "New Task B" + assert modified_dataset.meta.episodes[4]["tasks"][0] == "New Task A" + + def test_convert_image_to_video_dataset(tmp_path): """Test converting lerobot/pusht_image dataset to video format.""" from lerobot.datasets.lerobot_dataset import LeRobotDataset diff --git a/tests/datasets/test_subtask_dataset.py b/tests/datasets/test_subtask_dataset.py new file mode 100644 index 000000000..f80a6c72d --- /dev/null +++ b/tests/datasets/test_subtask_dataset.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for subtask functionality in LeRobotDataset. + +These tests verify that: +- Subtask information is correctly loaded from datasets that have subtask data +- The __getitem__ method correctly adds subtask strings to returned items +- Subtask handling gracefully handles missing data +""" + +import pandas as pd +import pytest +import torch + +from lerobot.datasets.lerobot_dataset import LeRobotDataset + + +class TestSubtaskDataset: + """Tests for subtask handling in LeRobotDataset.""" + + @pytest.fixture + def subtask_dataset(self): + """Load the test subtask dataset from the hub.""" + # Use lerobot/pusht-subtask dataset with episode 1 + return LeRobotDataset( + repo_id="lerobot/pusht-subtask", + episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + ) + + def test_subtask_dataset_loads(self, subtask_dataset): + """Test that the subtask dataset loads successfully.""" + assert subtask_dataset is not None + assert len(subtask_dataset) > 0 + + def test_subtask_metadata_loaded(self, subtask_dataset): + """Test that subtask metadata is loaded when present in dataset.""" + # The dataset should have subtasks metadata loaded + assert subtask_dataset.meta.subtasks is not None + assert isinstance(subtask_dataset.meta.subtasks, pd.DataFrame) + + def test_subtask_index_in_features(self, subtask_dataset): + """Test that subtask_index is a feature when dataset has subtasks.""" + assert "subtask_index" in subtask_dataset.features + + def test_getitem_returns_subtask_string(self, subtask_dataset): + """Test that __getitem__ correctly adds subtask string to returned item.""" + item = subtask_dataset[0] + + # Subtask should be present in the returned item + assert "subtask" in item + assert isinstance(item["subtask"], str) + assert len(item["subtask"]) > 0 # Should not be empty + + def test_getitem_has_subtask_index(self, subtask_dataset): + """Test that __getitem__ includes subtask_index.""" + item = subtask_dataset[0] + + assert "subtask_index" in item + assert isinstance(item["subtask_index"], torch.Tensor) + + def test_subtask_index_maps_to_valid_subtask(self, subtask_dataset): + """Test that subtask_index correctly maps to a subtask in metadata.""" + item = subtask_dataset[0] + + subtask_idx = item["subtask_index"].item() + subtask_from_metadata = subtask_dataset.meta.subtasks.iloc[subtask_idx].name + + assert item["subtask"] == subtask_from_metadata + + def test_all_items_have_subtask(self, subtask_dataset): + """Test that all items in the dataset have subtask information.""" + for i in range(min(len(subtask_dataset), 5)): # Check first 5 items + item = subtask_dataset[i] + assert "subtask" in item + assert isinstance(item["subtask"], str) + + def test_task_and_subtask_coexist(self, subtask_dataset): + """Test that both task and subtask are present in returned items.""" + item = subtask_dataset[0] + + # Both task and subtask should be present + assert "task" in item + assert "subtask" in item + assert isinstance(item["task"], str) + assert isinstance(item["subtask"], str) + + +class TestSubtaskDatasetMissing: + """Tests for graceful handling when subtask data is missing.""" + + @pytest.fixture + def dataset_without_subtasks(self, tmp_path, empty_lerobot_dataset_factory): + """Create a dataset without subtask information.""" + features = {"state": {"dtype": "float32", "shape": (2,), "names": None}} + dataset = empty_lerobot_dataset_factory(root=tmp_path / "no_subtask", features=features) + + # Add some frames and save + for _ in range(5): + dataset.add_frame({"state": torch.randn(2), "task": "Test task"}) + dataset.save_episode() + dataset.finalize() + + # Reload the dataset + return LeRobotDataset(dataset.repo_id, root=dataset.root) + + def test_no_subtask_in_features(self, dataset_without_subtasks): + """Test that subtask_index is not in features when not provided.""" + assert "subtask_index" not in dataset_without_subtasks.features + + def test_getitem_without_subtask(self, dataset_without_subtasks): + """Test that __getitem__ works when subtask is not present.""" + item = dataset_without_subtasks[0] + + # Item should still be retrievable + assert item is not None + assert "state" in item + assert "task" in item + + # Subtask should NOT be present + assert "subtask" not in item + + def test_subtasks_metadata_is_none(self, dataset_without_subtasks): + """Test that subtasks metadata is None when not present.""" + assert dataset_without_subtasks.meta.subtasks is None + + +class TestSubtaskEdgeCases: + """Edge case tests for subtask handling.""" + + def test_subtask_with_multiple_episodes(self): + """Test subtask handling with multiple episodes if available.""" + try: + dataset = LeRobotDataset( + repo_id="lerobot/pusht-subtask", + episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + ) + except Exception: + pytest.skip("Could not load test-subtask dataset") + + # Check first and last items have valid subtasks + first_item = dataset[0] + last_item = dataset[len(dataset) - 1] + + assert "subtask" in first_item + assert "subtask" in last_item + assert isinstance(first_item["subtask"], str) + assert isinstance(last_item["subtask"], str) + + def test_subtask_index_consistency(self): + """Test that same subtask_index returns same subtask string.""" + try: + dataset = LeRobotDataset( + repo_id="lerobot/pusht-subtask", + episodes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + ) + except Exception: + pytest.skip("Could not load test-subtask dataset") + + if len(dataset) < 2: + pytest.skip("Dataset too small for this test") + + # Collect subtask_index to subtask mappings + subtask_map = {} + for i in range(min(len(dataset), 10)): + item = dataset[i] + idx = item["subtask_index"].item() + subtask = item["subtask"] + + if idx in subtask_map: + # Same index should always return same subtask + assert subtask_map[idx] == subtask, ( + f"Inconsistent subtask for index {idx}: '{subtask_map[idx]}' vs '{subtask}'" + ) + else: + subtask_map[idx] = subtask diff --git a/tests/policies/test_sac_policy.py b/tests/policies/test_sac_policy.py index 8576883bd..6fad2979e 100644 --- a/tests/policies/test_sac_policy.py +++ b/tests/policies/test_sac_policy.py @@ -441,12 +441,13 @@ def test_sac_policy_with_predefined_entropy(): def test_sac_policy_update_temperature(): + """Test that temperature property is always in sync with log_alpha.""" config = create_default_config(continuous_action_dim=10, state_dim=10) policy = SACPolicy(config=config) assert policy.temperature == pytest.approx(1.0) policy.log_alpha.data = torch.tensor([math.log(0.1)]) - policy.update_temperature() + # Temperature property automatically reflects log_alpha changes assert policy.temperature == pytest.approx(0.1) diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index d6f87f567..64cc8aac8 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -27,7 +27,14 @@ import torch from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature from lerobot.processor import DataProcessorPipeline, TokenizerProcessorStep, TransitionKey from lerobot.processor.converters import create_transition, identity_transition -from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_LANGUAGE, OBS_STATE +from lerobot.utils.constants import ( + ACTION, + OBS_IMAGE, + OBS_LANGUAGE, + OBS_LANGUAGE_SUBTASK_ATTENTION_MASK, + OBS_LANGUAGE_SUBTASK_TOKENS, + OBS_STATE, +) from tests.utils import require_package @@ -1038,3 +1045,459 @@ def test_simulated_accelerate_scenario(): # MockTokenizer squeezes single-item batches, so shape is (max_length,) not (1, max_length) assert tokens.shape == (10,) # MockTokenizer behavior for single string in list assert attention_mask.shape == (10,) + + +# ============================================================================= +# Tests for get_subtask method +# ============================================================================= + + +@require_package("transformers") +def test_get_subtask_missing_key(): + """Test get_subtask returns None when subtask key is missing from complementary_data.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task"}, # No "subtask" key + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_none_value(): + """Test get_subtask returns None when subtask value is None.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": None}, + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_none_complementary_data(): + """Test get_subtask returns None when complementary_data is None.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data=None, # No complementary data + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_string(): + """Test get_subtask returns list with single string when subtask is a string.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "pick up the cube"}, + ) + + result = processor.get_subtask(transition) + assert result == ["pick up the cube"] + assert isinstance(result, list) + assert len(result) == 1 + + +@require_package("transformers") +def test_get_subtask_list_of_strings(): + """Test get_subtask returns the list when subtask is already a list of strings.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + subtask_list = ["pick up", "move to target", "place down"] + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": subtask_list}, + ) + + result = processor.get_subtask(transition) + assert result == subtask_list + assert isinstance(result, list) + assert len(result) == 3 + + +@require_package("transformers") +def test_get_subtask_unsupported_type_integer(): + """Test get_subtask returns None when subtask is an unsupported type (integer).""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": 123}, + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_unsupported_type_mixed_list(): + """Test get_subtask returns None when subtask is a list with mixed types.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": ["valid string", 123, "another string"]}, + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_unsupported_type_dict(): + """Test get_subtask returns None when subtask is a dictionary.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": {"key": "value"}}, + ) + + result = processor.get_subtask(transition) + assert result is None + + +@require_package("transformers") +def test_get_subtask_empty_string(): + """Test get_subtask with empty string returns list with empty string.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": ""}, + ) + + result = processor.get_subtask(transition) + assert result == [""] + + +@require_package("transformers") +def test_get_subtask_empty_list(): + """Test get_subtask with empty list returns empty list.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": []}, + ) + + result = processor.get_subtask(transition) + assert result == [] + + +# ============================================================================= +# Tests for subtask tokenization in observation method +# ============================================================================= + + +@require_package("transformers") +def test_subtask_tokenization_when_present(): + """Test that subtask is tokenized and added to observation when present.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "pick up the red cube"}, + ) + + result = processor(transition) + + # Check that subtask tokens were added to observation + observation = result[TransitionKey.OBSERVATION] + assert OBS_LANGUAGE_SUBTASK_TOKENS in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation + + # Check token structure + subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + assert isinstance(subtask_tokens, torch.Tensor) + assert isinstance(subtask_attention_mask, torch.Tensor) + assert subtask_tokens.shape == (8,) + assert subtask_attention_mask.shape == (8,) + assert subtask_attention_mask.dtype == torch.bool + + +@require_package("transformers") +def test_subtask_tokenization_not_added_when_none(): + """Test that subtask tokens are NOT added to observation when subtask is None.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task"}, # No subtask + ) + + result = processor(transition) + + # Check that subtask tokens were NOT added to observation + observation = result[TransitionKey.OBSERVATION] + assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation + + # But main task tokens should still be present + assert f"{OBS_LANGUAGE}.tokens" in observation + assert f"{OBS_LANGUAGE}.attention_mask" in observation + + +@require_package("transformers") +def test_subtask_tokenization_not_added_when_subtask_value_is_none(): + """Test that subtask tokens are NOT added when subtask value is explicitly None.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": None}, + ) + + result = processor(transition) + + # Check that subtask tokens were NOT added to observation + observation = result[TransitionKey.OBSERVATION] + assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation + + +@require_package("transformers") +def test_subtask_tokenization_list_of_strings(): + """Test subtask tokenization with list of strings.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": ["pick up", "place down"]}, + ) + + result = processor(transition) + + # Check that subtask tokens were added to observation + observation = result[TransitionKey.OBSERVATION] + assert OBS_LANGUAGE_SUBTASK_TOKENS in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation + + # Check token structure for batch + subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + assert subtask_tokens.shape == (2, 8) # batch_size=2, seq_len=8 + assert subtask_attention_mask.shape == (2, 8) + + +@require_package("transformers") +def test_subtask_tokenization_device_cpu(): + """Test that subtask tokens are on CPU when other tensors are on CPU.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with CPU tensors + observation = {OBS_STATE: torch.randn(10)} # CPU tensor + action = torch.randn(5) # CPU tensor + transition = create_transition( + observation=observation, + action=action, + complementary_data={"task": "main task", "subtask": "pick up cube"}, + ) + + result = processor(transition) + + # Check that subtask tokens are on CPU + subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + + assert subtask_tokens.device.type == "cpu" + assert subtask_attention_mask.device.type == "cpu" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +@require_package("transformers") +def test_subtask_tokenization_device_cuda(): + """Test that subtask tokens are moved to CUDA when other tensors are on CUDA.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + # Create transition with CUDA tensors + observation = {OBS_STATE: torch.randn(10).cuda()} # CUDA tensor + action = torch.randn(5).cuda() # CUDA tensor + transition = create_transition( + observation=observation, + action=action, + complementary_data={"task": "main task", "subtask": "pick up cube"}, + ) + + result = processor(transition) + + # Check that subtask tokens are on CUDA + subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + + assert subtask_tokens.device.type == "cuda" + assert subtask_attention_mask.device.type == "cuda" + + +@require_package("transformers") +def test_subtask_tokenization_preserves_other_observation_data(): + """Test that subtask tokenization preserves other observation data.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + original_state = torch.tensor([1.0, 2.0, 3.0]) + transition = create_transition( + observation={"state": original_state.clone()}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "pick up cube"}, + ) + + result = processor(transition) + observation = result[TransitionKey.OBSERVATION] + + # Check that original observation data is preserved + assert torch.equal(observation["state"], original_state) + + # Check that both task and subtask tokens are present + assert f"{OBS_LANGUAGE}.tokens" in observation + assert f"{OBS_LANGUAGE}.attention_mask" in observation + assert OBS_LANGUAGE_SUBTASK_TOKENS in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation + + +@require_package("transformers") +def test_subtask_attention_mask_dtype(): + """Test that subtask attention mask has correct dtype (bool).""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "pick up cube"}, + ) + + result = processor(transition) + observation = result[TransitionKey.OBSERVATION] + + subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + assert subtask_attention_mask.dtype == torch.bool + + +@require_package("transformers") +def test_subtask_tokenization_deterministic(): + """Test that subtask tokenization is deterministic for the same input.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "consistent subtask"}, + ) + + result1 = processor(transition) + result2 = processor(transition) + + subtask_tokens1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_tokens2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] + subtask_mask1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + subtask_mask2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] + + # Results should be identical + assert torch.equal(subtask_tokens1, subtask_tokens2) + assert torch.equal(subtask_mask1, subtask_mask2) + + +@require_package("transformers") +@patch("lerobot.processor.tokenizer_processor.AutoTokenizer") +def test_subtask_tokenization_integration_with_pipeline(mock_auto_tokenizer): + """Test subtask tokenization works correctly with DataProcessorPipeline.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer + + tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6) + robot_processor = DataProcessorPipeline( + [tokenizer_processor], to_transition=identity_transition, to_output=identity_transition + ) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": "subtask instruction"}, + ) + + result = robot_processor(transition) + + # Check that observation exists and both tokenizations were applied + assert TransitionKey.OBSERVATION in result + observation = result[TransitionKey.OBSERVATION] + + # Check task tokens + assert f"{OBS_LANGUAGE}.tokens" in observation + assert f"{OBS_LANGUAGE}.attention_mask" in observation + + # Check subtask tokens + assert OBS_LANGUAGE_SUBTASK_TOKENS in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation + + # Check shapes + assert observation[f"{OBS_LANGUAGE}.tokens"].shape == (6,) + assert observation[OBS_LANGUAGE_SUBTASK_TOKENS].shape == (6,) + + +@require_package("transformers") +def test_subtask_not_added_for_unsupported_types(): + """Test that subtask tokens are not added when subtask has unsupported type.""" + mock_tokenizer = MockTokenizer(vocab_size=100) + processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8) + + # Test with integer subtask + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": "main task", "subtask": 123}, + ) + + result = processor(transition) + observation = result[TransitionKey.OBSERVATION] + + # Subtask tokens should NOT be added for unsupported types + assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation + assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation + + # But main task tokens should still be present + assert f"{OBS_LANGUAGE}.tokens" in observation