mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-16 07:49:48 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2201401c99 | |||
| 64773e7b22 |
@@ -167,9 +167,9 @@ jobs:
|
||||
|
||||
# ── LIBERO TRAIN+EVAL SMOKE ──────────────────────────────────────────────
|
||||
# Train SmolVLA for 1 step (batch_size=1, dataset episode 0 only) then
|
||||
# immediately runs eval inside the training loop (eval_freq=1, 1 episode).
|
||||
# immediately runs eval inside the training loop (env_eval_freq=1, 1 episode).
|
||||
# Tests the full train→eval-within-training pipeline end-to-end.
|
||||
- name: Run Libero train+eval smoke (1 step, eval_freq=1)
|
||||
- name: Run Libero train+eval smoke (1 step, env_eval_freq=1)
|
||||
if: env.HF_USER_TOKEN != ''
|
||||
run: |
|
||||
docker run --name libero-train-smoke --gpus all \
|
||||
@@ -196,7 +196,7 @@ jobs:
|
||||
--output_dir=/tmp/train-smoke \
|
||||
--steps=1 \
|
||||
--batch_size=1 \
|
||||
--eval_freq=1 \
|
||||
--env_eval_freq=1 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.use_async_envs=false \
|
||||
|
||||
@@ -58,7 +58,7 @@ test-act-ete-train:
|
||||
--dataset.episodes="[0]" \
|
||||
--batch_size=2 \
|
||||
--steps=4 \
|
||||
--eval_freq=2 \
|
||||
--env_eval_freq=2 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval.batch_size=1 \
|
||||
--save_freq=2 \
|
||||
@@ -96,7 +96,7 @@ test-diffusion-ete-train:
|
||||
--dataset.episodes="[0]" \
|
||||
--batch_size=2 \
|
||||
--steps=2 \
|
||||
--eval_freq=2 \
|
||||
--env_eval_freq=2 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval.batch_size=1 \
|
||||
--save_checkpoint=true \
|
||||
@@ -126,7 +126,7 @@ test-tdmpc-ete-train:
|
||||
--dataset.episodes="[0]" \
|
||||
--batch_size=2 \
|
||||
--steps=2 \
|
||||
--eval_freq=2 \
|
||||
--env_eval_freq=2 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval.batch_size=1 \
|
||||
--save_checkpoint=true \
|
||||
@@ -161,7 +161,7 @@ test-smolvla-ete-train:
|
||||
--dataset.episodes="[0]" \
|
||||
--batch_size=2 \
|
||||
--steps=4 \
|
||||
--eval_freq=2 \
|
||||
--env_eval_freq=2 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval.batch_size=1 \
|
||||
--save_freq=2 \
|
||||
|
||||
@@ -719,7 +719,7 @@ Example configuration for training the [reward classifier](https://huggingface.c
|
||||
"num_workers": 4,
|
||||
"steps": 5000,
|
||||
"log_freq": 10,
|
||||
"eval_freq": 1000,
|
||||
"env_eval_freq": 1000,
|
||||
"save_freq": 1000,
|
||||
"save_checkpoint": true,
|
||||
"seed": 2,
|
||||
|
||||
@@ -143,7 +143,7 @@ lerobot-train \
|
||||
--batch_size=4 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval_freq=1000
|
||||
--env_eval_freq=1000
|
||||
```
|
||||
|
||||
## Reproducing published results
|
||||
|
||||
@@ -173,7 +173,7 @@ lerobot-train \
|
||||
--batch_size=4 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval_freq=1000
|
||||
--env_eval_freq=1000
|
||||
```
|
||||
|
||||
## Relationship to LIBERO
|
||||
|
||||
@@ -120,11 +120,11 @@ lerobot-train \
|
||||
--batch_size=4 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.n_episodes=1 \
|
||||
--eval_freq=1000
|
||||
--env_eval_freq=1000
|
||||
```
|
||||
|
||||
## Practical tips
|
||||
|
||||
- Use the one-hot task conditioning for multi-task training (MT10/MT50 conventions) so policies have explicit task context.
|
||||
- Inspect the dataset task descriptions and the `info["is_success"]` keys when writing post-processing or logging so your success metrics line up with the benchmark.
|
||||
- Adjust `batch_size`, `steps`, and `eval_freq` to match your compute budget.
|
||||
- Adjust `batch_size`, `steps`, and `env_eval_freq` to match your compute budget.
|
||||
|
||||
@@ -103,7 +103,7 @@ accelerate launch \
|
||||
--batch_size=32 \
|
||||
--num_workers=4 \
|
||||
--log_freq=20 \
|
||||
--eval_freq=-1 \
|
||||
--env_eval_freq=-1 \
|
||||
--save_checkpoint=true \
|
||||
--save_freq=2000
|
||||
```
|
||||
@@ -142,7 +142,7 @@ accelerate launch \
|
||||
--batch_size=32 \
|
||||
--num_workers=4 \
|
||||
--log_freq=20 \
|
||||
--eval_freq=-1 \
|
||||
--env_eval_freq=-1 \
|
||||
--save_checkpoint=true \
|
||||
--save_freq=2000
|
||||
```
|
||||
|
||||
@@ -314,7 +314,7 @@ lerobot-train \
|
||||
--steps=30000 \
|
||||
--save_freq=1000 \
|
||||
--log_freq=100 \
|
||||
--eval_freq=1000 \
|
||||
--env_eval_freq=1000 \
|
||||
--policy.type=multi_task_dit \
|
||||
--policy.device=cuda \
|
||||
--policy.horizon=32 \
|
||||
|
||||
@@ -166,7 +166,7 @@ lerobot-train \
|
||||
--output_dir=./outputs/smolvla_robocasa_CloseFridge \
|
||||
--steps=100000 \
|
||||
--batch_size=4 \
|
||||
--eval_freq=5000 \
|
||||
--env_eval_freq=5000 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.n_episodes=5 \
|
||||
--save_freq=10000
|
||||
|
||||
@@ -165,7 +165,7 @@ lerobot-train \
|
||||
--output_dir=./outputs/smolvla_vlabench_primitive \
|
||||
--steps=100000 \
|
||||
--batch_size=4 \
|
||||
--eval_freq=5000 \
|
||||
--env_eval_freq=5000 \
|
||||
--eval.batch_size=1 \
|
||||
--eval.n_episodes=1 \
|
||||
--save_freq=10000
|
||||
|
||||
@@ -39,6 +39,8 @@ class DatasetConfig:
|
||||
# This reduces memory and speeds up DataLoader IPC. The training pipeline handles the conversion.
|
||||
return_uint8: bool = False
|
||||
streaming: bool = False
|
||||
# Fraction of episodes held out per task for offline evaluation (0.0 = disabled).
|
||||
eval_split: float = 0.0
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.episodes is not None:
|
||||
@@ -73,8 +75,6 @@ class EvalConfig:
|
||||
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
|
||||
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
|
||||
use_async_envs: bool = True
|
||||
# Whether to record eval rollouts as a LeRobot v3.0 dataset on disk.
|
||||
recording: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.batch_size == 0:
|
||||
|
||||
@@ -100,8 +100,13 @@ class TrainPipelineConfig(HubMixin):
|
||||
prefetch_factor: int = 4
|
||||
persistent_workers: bool = True
|
||||
steps: int = 100_000
|
||||
eval_freq: int = 20_000
|
||||
# Run policy in the simulation environment every N steps to measure reward/success (0 = disabled).
|
||||
env_eval_freq: int = 20_000
|
||||
log_freq: int = 200
|
||||
# Compute eval loss on held-out episodes every N steps (0 = disabled). Requires eval_split > 0.
|
||||
eval_steps: int = 0
|
||||
# Cap on total eval samples, split uniformly across tasks (0 = use all held-out data).
|
||||
max_eval_samples: int = 0
|
||||
tolerance_s: float = 1e-4
|
||||
save_checkpoint: bool = True
|
||||
# Checkpoint is saved every `save_freq` training iterations and after the last training step.
|
||||
|
||||
@@ -35,7 +35,7 @@ from .dataset_tools import (
|
||||
remove_feature,
|
||||
split_dataset,
|
||||
)
|
||||
from .factory import make_dataset, resolve_delta_timestamps
|
||||
from .factory import make_dataset, make_train_eval_datasets, resolve_delta_timestamps
|
||||
from .image_writer import safe_stop_image_writer
|
||||
from .io_utils import load_episodes, write_stats
|
||||
from .language import (
|
||||
@@ -89,6 +89,7 @@ __all__ = [
|
||||
"get_feature_stats",
|
||||
"load_episodes",
|
||||
"make_dataset",
|
||||
"make_train_eval_datasets",
|
||||
"merge_datasets",
|
||||
"modify_features",
|
||||
"modify_tasks",
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import math
|
||||
from pprint import pformat
|
||||
|
||||
import torch
|
||||
@@ -130,3 +131,81 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
|
||||
dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
|
||||
|
||||
return dataset
|
||||
|
||||
|
||||
def make_train_eval_datasets(
|
||||
cfg: TrainPipelineConfig,
|
||||
) -> tuple[LeRobotDataset | MultiLeRobotDataset, LeRobotDataset | None]:
|
||||
"""Create train and optional eval datasets by splitting episodes based on eval_split.
|
||||
|
||||
The last ceil(n_episodes * eval_split) episodes per task are held out for evaluation.
|
||||
If eval_split == 0.0, returns (full_dataset, None).
|
||||
"""
|
||||
full_dataset = make_dataset(cfg)
|
||||
|
||||
if cfg.dataset.eval_split == 0.0:
|
||||
return full_dataset, None
|
||||
|
||||
base_episodes = (
|
||||
full_dataset.episodes if full_dataset.episodes is not None else list(range(full_dataset.num_episodes))
|
||||
)
|
||||
|
||||
episode_tasks = full_dataset.meta.episodes["tasks"]
|
||||
task_to_episodes: dict[str, list[int]] = {}
|
||||
for ep_idx in base_episodes:
|
||||
task_key = episode_tasks[ep_idx][0] if episode_tasks[ep_idx] else ""
|
||||
task_to_episodes.setdefault(task_key, []).append(ep_idx)
|
||||
|
||||
train_episodes, eval_episodes = [], []
|
||||
for eps in task_to_episodes.values():
|
||||
n_eval = math.ceil(len(eps) * cfg.dataset.eval_split)
|
||||
train_episodes.extend(eps[: len(eps) - n_eval])
|
||||
eval_episodes.extend(eps[len(eps) - n_eval :])
|
||||
|
||||
if not train_episodes:
|
||||
raise ValueError(
|
||||
f"eval_split={cfg.dataset.eval_split} leaves 0 training episodes from {len(base_episodes)} total."
|
||||
)
|
||||
|
||||
logging.info(
|
||||
f"Train/eval split: {len(train_episodes)} train, {len(eval_episodes)} eval "
|
||||
f"(eval_split={cfg.dataset.eval_split}, {len(task_to_episodes)} tasks)"
|
||||
)
|
||||
|
||||
delta_timestamps = resolve_delta_timestamps(cfg.trainable_config, full_dataset.meta)
|
||||
|
||||
train_image_transforms = (
|
||||
ImageTransforms(cfg.dataset.image_transforms) if cfg.dataset.image_transforms.enable else None
|
||||
)
|
||||
|
||||
train_dataset = LeRobotDataset(
|
||||
cfg.dataset.repo_id,
|
||||
root=cfg.dataset.root,
|
||||
episodes=train_episodes,
|
||||
delta_timestamps=delta_timestamps,
|
||||
image_transforms=train_image_transforms,
|
||||
revision=cfg.dataset.revision,
|
||||
video_backend=cfg.dataset.video_backend,
|
||||
return_uint8=True,
|
||||
tolerance_s=cfg.tolerance_s,
|
||||
)
|
||||
|
||||
eval_dataset = LeRobotDataset(
|
||||
cfg.dataset.repo_id,
|
||||
root=cfg.dataset.root,
|
||||
episodes=eval_episodes,
|
||||
delta_timestamps=delta_timestamps,
|
||||
image_transforms=None,
|
||||
revision=cfg.dataset.revision,
|
||||
video_backend=cfg.dataset.video_backend,
|
||||
return_uint8=True,
|
||||
tolerance_s=cfg.tolerance_s,
|
||||
)
|
||||
|
||||
if cfg.dataset.use_imagenet_stats:
|
||||
for ds in (train_dataset, eval_dataset):
|
||||
for key in ds.meta.camera_keys:
|
||||
for stats_type, stats in IMAGENET_STATS.items():
|
||||
ds.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32)
|
||||
|
||||
return train_dataset, eval_dataset
|
||||
|
||||
@@ -74,7 +74,6 @@ from tqdm import trange
|
||||
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.envs import (
|
||||
check_env_attributes_and_types,
|
||||
close_envs,
|
||||
@@ -85,7 +84,7 @@ from lerobot.envs import (
|
||||
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
|
||||
from lerobot.processor import PolicyProcessorPipeline
|
||||
from lerobot.types import PolicyAction
|
||||
from lerobot.utils.constants import ACTION, DONE, OBS_IMAGE, OBS_IMAGES, OBS_STATE, OBS_STR, REWARD
|
||||
from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD
|
||||
from lerobot.utils.device_utils import get_safe_torch_device
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.io_utils import write_video
|
||||
@@ -96,56 +95,6 @@ from lerobot.utils.utils import (
|
||||
)
|
||||
|
||||
|
||||
def _env_features_to_dataset_features(env_features: dict) -> dict:
|
||||
"""Convert EnvConfig.features (PolicyFeature objects) to the plain dict format for LeRobotDataset.create()."""
|
||||
features = {}
|
||||
for key, ft in env_features.items():
|
||||
if ft.type.value == "visual":
|
||||
features[key] = {
|
||||
"dtype": "video",
|
||||
"shape": tuple(ft.shape),
|
||||
"names": ["channel", "height", "width"],
|
||||
}
|
||||
else:
|
||||
features[key] = {"dtype": "float32", "shape": tuple(ft.shape), "names": None}
|
||||
features["next.reward"] = {"dtype": "float32", "shape": (1,), "names": None}
|
||||
features["next.success"] = {"dtype": "bool", "shape": (1,), "names": None}
|
||||
features["next.done"] = {"dtype": "bool", "shape": (1,), "names": None}
|
||||
return features
|
||||
|
||||
|
||||
def _build_raw_frame(
|
||||
raw_obs: dict,
|
||||
env_idx: int,
|
||||
action: np.ndarray,
|
||||
reward: float,
|
||||
success: bool,
|
||||
done: bool,
|
||||
task: str,
|
||||
) -> dict:
|
||||
"""Build a dataset frame from raw env observations for one env index."""
|
||||
frame: dict[str, Any] = {}
|
||||
if "pixels" in raw_obs:
|
||||
if isinstance(raw_obs["pixels"], dict):
|
||||
for cam_name, img in raw_obs["pixels"].items():
|
||||
frame[f"{OBS_IMAGES}.{cam_name}"] = img[env_idx]
|
||||
else:
|
||||
frame[OBS_IMAGE] = raw_obs["pixels"][env_idx]
|
||||
if "agent_pos" in raw_obs:
|
||||
frame[OBS_STATE] = raw_obs["agent_pos"][env_idx]
|
||||
for key, val in raw_obs.items():
|
||||
if key in ("pixels", "agent_pos"):
|
||||
continue
|
||||
if isinstance(val, np.ndarray):
|
||||
frame[f"{OBS_STR}.{key}"] = val[env_idx]
|
||||
frame[ACTION] = action
|
||||
frame["next.reward"] = np.float32(reward)
|
||||
frame["next.success"] = success
|
||||
frame["next.done"] = done
|
||||
frame["task"] = task
|
||||
return frame
|
||||
|
||||
|
||||
def rollout(
|
||||
env: gym.vector.VectorEnv,
|
||||
policy: PreTrainedPolicy,
|
||||
@@ -156,7 +105,6 @@ def rollout(
|
||||
seeds: list[int] | None = None,
|
||||
return_observations: bool = False,
|
||||
render_callback: Callable[[gym.vector.VectorEnv], None] | None = None,
|
||||
recording_dataset: Any | None = None,
|
||||
) -> dict:
|
||||
"""Run a batched policy rollout once through a batch of environments.
|
||||
|
||||
@@ -197,14 +145,6 @@ def rollout(
|
||||
if render_callback is not None:
|
||||
render_callback(env)
|
||||
|
||||
raw_observation = deepcopy(observation) if recording_dataset is not None else None
|
||||
task_desc = ""
|
||||
if recording_dataset is not None:
|
||||
try:
|
||||
task_desc = list(env.call("task_description"))[0]
|
||||
except (AttributeError, NotImplementedError):
|
||||
task_desc = ""
|
||||
|
||||
all_observations = []
|
||||
all_actions = []
|
||||
all_rewards = []
|
||||
@@ -277,25 +217,6 @@ def rollout(
|
||||
else:
|
||||
successes = [False] * env.num_envs
|
||||
|
||||
if recording_dataset is not None and raw_observation is not None:
|
||||
prev_done = done.copy()
|
||||
for env_idx in range(env.num_envs):
|
||||
if prev_done[env_idx]:
|
||||
continue
|
||||
frame = _build_raw_frame(
|
||||
raw_observation,
|
||||
env_idx,
|
||||
action_numpy[env_idx],
|
||||
reward[env_idx],
|
||||
successes[env_idx],
|
||||
bool(terminated[env_idx] | truncated[env_idx]),
|
||||
task_desc,
|
||||
)
|
||||
recording_dataset.add_frame(frame)
|
||||
if terminated[env_idx] or truncated[env_idx]:
|
||||
recording_dataset.save_episode()
|
||||
raw_observation = deepcopy(observation)
|
||||
|
||||
# Keep track of which environments are done so far.
|
||||
# Mark the episode as done if we reach the maximum step limit.
|
||||
# This ensures that the rollout always terminates cleanly at `max_steps`,
|
||||
@@ -352,7 +273,6 @@ def eval_policy(
|
||||
videos_dir: Path | None = None,
|
||||
return_episode_data: bool = False,
|
||||
start_seed: int | None = None,
|
||||
recording_dataset: Any | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Args:
|
||||
@@ -441,7 +361,6 @@ def eval_policy(
|
||||
seeds=list(seeds) if seeds else None,
|
||||
return_observations=return_episode_data,
|
||||
render_callback=render_frame if max_episodes_rendered > 0 else None,
|
||||
recording_dataset=recording_dataset,
|
||||
)
|
||||
|
||||
# Figure out where in each rollout sequence the first done condition was encountered (results after
|
||||
@@ -644,10 +563,6 @@ def eval_main(cfg: EvalPipelineConfig):
|
||||
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
|
||||
|
||||
recording_dir = Path(cfg.output_dir) / "recordings" if cfg.eval.recording else None
|
||||
max_episodes_rendered = 0 if cfg.eval.recording else 10
|
||||
videos_dir = None if cfg.eval.recording else Path(cfg.output_dir) / "videos"
|
||||
|
||||
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
|
||||
info = eval_policy_all(
|
||||
envs=envs,
|
||||
@@ -657,13 +572,10 @@ def eval_main(cfg: EvalPipelineConfig):
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=cfg.eval.n_episodes,
|
||||
max_episodes_rendered=max_episodes_rendered,
|
||||
videos_dir=videos_dir,
|
||||
return_episode_data=False,
|
||||
max_episodes_rendered=10,
|
||||
videos_dir=Path(cfg.output_dir) / "videos",
|
||||
start_seed=cfg.seed,
|
||||
max_parallel_tasks=cfg.env.max_parallel_tasks,
|
||||
recording_dir=recording_dir,
|
||||
env_features=cfg.env.features if cfg.eval.recording else None,
|
||||
)
|
||||
print("Overall Aggregated Metrics:")
|
||||
print(info["overall"])
|
||||
@@ -706,7 +618,6 @@ def eval_one(
|
||||
videos_dir: Path | None,
|
||||
return_episode_data: bool,
|
||||
start_seed: int | None,
|
||||
recording_dataset: Any | None = None,
|
||||
) -> TaskMetrics:
|
||||
"""Evaluates one task_id of one suite using the provided vec env."""
|
||||
|
||||
@@ -724,7 +635,6 @@ def eval_one(
|
||||
videos_dir=task_videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
recording_dataset=recording_dataset,
|
||||
)
|
||||
|
||||
per_episode = task_result["per_episode"]
|
||||
@@ -751,8 +661,6 @@ def run_one(
|
||||
videos_dir: Path | None,
|
||||
return_episode_data: bool,
|
||||
start_seed: int | None,
|
||||
recording_dir: Path | None = None,
|
||||
env_features: dict | None = None,
|
||||
):
|
||||
"""
|
||||
Run eval_one for a single (task_group, task_id, env).
|
||||
@@ -764,38 +672,21 @@ def run_one(
|
||||
task_videos_dir = videos_dir / f"{task_group}_{task_id}"
|
||||
task_videos_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
recording_dataset = None
|
||||
if recording_dir is not None and env_features is not None:
|
||||
task_recording_dir = recording_dir / f"{task_group}_{task_id}"
|
||||
fps = env.unwrapped.metadata.get("render_fps", 30)
|
||||
features = _env_features_to_dataset_features(env_features)
|
||||
recording_dataset = LeRobotDataset.create(
|
||||
repo_id=f"eval_{task_group}_{task_id}",
|
||||
fps=fps,
|
||||
features=features,
|
||||
root=str(task_recording_dir),
|
||||
use_videos=True,
|
||||
)
|
||||
|
||||
try:
|
||||
metrics = eval_one(
|
||||
env,
|
||||
policy=policy,
|
||||
env_preprocessor=env_preprocessor,
|
||||
env_postprocessor=env_postprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=n_episodes,
|
||||
max_episodes_rendered=max_episodes_rendered,
|
||||
videos_dir=task_videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
recording_dataset=recording_dataset,
|
||||
)
|
||||
finally:
|
||||
if recording_dataset is not None:
|
||||
recording_dataset.finalize()
|
||||
|
||||
# Call the existing eval_one (assumed to return TaskMetrics-like dict)
|
||||
metrics = eval_one(
|
||||
env,
|
||||
policy=policy,
|
||||
env_preprocessor=env_preprocessor,
|
||||
env_postprocessor=env_postprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
n_episodes=n_episodes,
|
||||
max_episodes_rendered=max_episodes_rendered,
|
||||
videos_dir=task_videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
)
|
||||
# ensure we always provide video_paths key to simplify accumulation
|
||||
if max_episodes_rendered > 0:
|
||||
metrics.setdefault("video_paths", [])
|
||||
return task_group, task_id, metrics
|
||||
@@ -811,8 +702,6 @@ def eval_policy_all(
|
||||
n_episodes: int,
|
||||
*,
|
||||
max_episodes_rendered: int = 0,
|
||||
recording_dir: Path | None = None,
|
||||
env_features: dict | None = None,
|
||||
videos_dir: Path | None = None,
|
||||
return_episode_data: bool = False,
|
||||
start_seed: int | None = None,
|
||||
@@ -872,8 +761,6 @@ def eval_policy_all(
|
||||
videos_dir=videos_dir,
|
||||
return_episode_data=return_episode_data,
|
||||
start_seed=start_seed,
|
||||
recording_dir=recording_dir,
|
||||
env_features=env_features,
|
||||
)
|
||||
|
||||
if max_parallel_tasks <= 1:
|
||||
|
||||
@@ -45,7 +45,8 @@ from lerobot.common.train_utils import (
|
||||
from lerobot.common.wandb_utils import WandBLogger
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.train import TrainPipelineConfig
|
||||
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state, make_dataset
|
||||
from lerobot.datasets import EpisodeAwareSampler, compute_sampler_state
|
||||
from lerobot.datasets.factory import make_train_eval_datasets
|
||||
from lerobot.envs import close_envs, make_env, make_env_pre_post_processors
|
||||
from lerobot.optim.factory import make_optimizer_and_scheduler
|
||||
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
|
||||
@@ -244,19 +245,19 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
# LeRobotDataset skips its snapshot_download when try_load() succeeds, so no rank re-downloads.
|
||||
if is_main_process:
|
||||
logging.info("Creating dataset")
|
||||
dataset = make_dataset(cfg)
|
||||
dataset, eval_dataset = make_train_eval_datasets(cfg)
|
||||
|
||||
accelerator.wait_for_everyone()
|
||||
|
||||
# Other ranks read from the shared copy populated by the main process.
|
||||
if not is_main_process:
|
||||
dataset = make_dataset(cfg)
|
||||
dataset, eval_dataset = make_train_eval_datasets(cfg)
|
||||
|
||||
# Create environment used for evaluating checkpoints during training on simulation data.
|
||||
# On real-world data, no need to create an environment as evaluations are done outside train.py,
|
||||
# using the eval.py instead, with gym_dora environment and dora-rs.
|
||||
eval_env = None
|
||||
if cfg.eval_freq > 0 and cfg.env is not None and is_main_process:
|
||||
if cfg.env_eval_freq > 0 and cfg.env is not None and is_main_process:
|
||||
logging.info("Creating env")
|
||||
eval_env = make_env(cfg.env, n_envs=cfg.eval.batch_size, use_async_envs=cfg.eval.use_async_envs)
|
||||
|
||||
@@ -434,6 +435,8 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
f"Resuming data order at epoch {sampler_state['epoch']}, "
|
||||
f"sample {sampler_state['start_index']}"
|
||||
)
|
||||
if dataset.reader._absolute_to_relative_idx is not None:
|
||||
sampler.indices = [dataset.reader._absolute_to_relative_idx[i] for i in sampler.indices]
|
||||
else:
|
||||
shuffle = True
|
||||
sampler = None
|
||||
@@ -455,6 +458,31 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
persistent_workers=cfg.persistent_workers and cfg.num_workers > 0,
|
||||
)
|
||||
|
||||
# Build eval dataloader if a held-out split exists
|
||||
eval_dataloader = None
|
||||
if eval_dataset is not None:
|
||||
eval_ds = eval_dataset
|
||||
if cfg.max_eval_samples > 0 and hasattr(eval_dataset, "hf_dataset"):
|
||||
task_indices = eval_dataset.hf_dataset["task_index"]
|
||||
unique_tasks = sorted(set(task_indices))
|
||||
per_task = max(1, cfg.max_eval_samples // len(unique_tasks))
|
||||
selected: list[int] = []
|
||||
for t in unique_tasks:
|
||||
frames = [i for i, ti in enumerate(task_indices) if ti == t][:per_task]
|
||||
selected.extend(frames)
|
||||
eval_ds = torch.utils.data.Subset(eval_dataset, selected)
|
||||
|
||||
eval_collate_fn = lerobot_collate_fn if dataset.meta.has_language_columns else None
|
||||
eval_dataloader = torch.utils.data.DataLoader(
|
||||
eval_ds,
|
||||
batch_size=cfg.batch_size,
|
||||
shuffle=False,
|
||||
num_workers=cfg.num_workers,
|
||||
pin_memory=device.type == "cuda",
|
||||
drop_last=False,
|
||||
collate_fn=eval_collate_fn,
|
||||
)
|
||||
|
||||
# Prepare everything with accelerator
|
||||
accelerator.wait_for_everyone()
|
||||
policy, optimizer, dataloader, lr_scheduler = accelerator.prepare(
|
||||
@@ -534,7 +562,8 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
train_tracker.step()
|
||||
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0
|
||||
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
|
||||
is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0
|
||||
is_env_eval_step = cfg.env_eval_freq > 0 and step % cfg.env_eval_freq == 0
|
||||
is_eval_step = cfg.eval_steps > 0 and eval_dataloader is not None and step % cfg.eval_steps == 0
|
||||
|
||||
if is_log_step:
|
||||
# Collective reduce must run on every rank, before the main-process gate below.
|
||||
@@ -557,6 +586,27 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
wandb_logger.log_dict(wandb_log_dict, step)
|
||||
train_tracker.reset_averages()
|
||||
|
||||
if is_eval_step:
|
||||
policy.eval()
|
||||
eval_loss_sum = 0.0
|
||||
n_eval_batches = 0
|
||||
with torch.no_grad(), accelerator.autocast():
|
||||
for eval_batch in eval_dataloader:
|
||||
for cam_key in dataset.meta.camera_keys:
|
||||
if cam_key in eval_batch and eval_batch[cam_key].dtype == torch.uint8:
|
||||
eval_batch[cam_key] = eval_batch[cam_key].to(dtype=torch.float32) / 255.0
|
||||
eval_batch = preprocessor(eval_batch)
|
||||
loss, _ = policy.forward(eval_batch)
|
||||
eval_loss_sum += loss.item()
|
||||
n_eval_batches += 1
|
||||
eval_loss = eval_loss_sum / max(n_eval_batches, 1)
|
||||
policy.train()
|
||||
|
||||
if is_main_process:
|
||||
logging.info(f"step {step}: eval_loss={eval_loss:.4f}")
|
||||
if wandb_logger:
|
||||
wandb_logger.log_dict({"eval_loss": eval_loss}, step=step, mode="eval")
|
||||
|
||||
if cfg.save_checkpoint and is_saving_step:
|
||||
if is_main_process:
|
||||
logging.info(f"Checkpoint policy after step {step}")
|
||||
@@ -579,7 +629,7 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
|
||||
|
||||
accelerator.wait_for_everyone()
|
||||
|
||||
if cfg.env and is_eval_step:
|
||||
if cfg.env and is_env_eval_step:
|
||||
if is_main_process:
|
||||
step_id = get_step_identifier(step, cfg.steps)
|
||||
logging.info(f"Eval policy at step {step}")
|
||||
|
||||
@@ -134,7 +134,7 @@ class TestMultiGPUTraining:
|
||||
f"--output_dir={output_dir}",
|
||||
"--batch_size=4",
|
||||
"--steps=10",
|
||||
"--eval_freq=-1",
|
||||
"--env_eval_freq=-1",
|
||||
"--log_freq=5",
|
||||
"--save_freq=10",
|
||||
"--seed=42",
|
||||
@@ -177,7 +177,7 @@ class TestMultiGPUTraining:
|
||||
f"--output_dir={output_dir}",
|
||||
"--batch_size=4",
|
||||
"--steps=20",
|
||||
"--eval_freq=-1",
|
||||
"--env_eval_freq=-1",
|
||||
"--log_freq=5",
|
||||
"--save_freq=10",
|
||||
"--seed=42",
|
||||
|
||||
Reference in New Issue
Block a user