Compare commits

..

5 Commits

Author SHA1 Message Date
github-actions[bot] e48f3bc96d chore(dependencies): update uv.lock 2026-06-12 05:01:38 +00:00
Steven Palma 87242cfced chore(dependecies): relax grpc-related bounds (#3777)
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
2026-06-11 19:13:14 +02:00
Steven Palma 1edc83a0ef feat(training): bump accelerate + use reduction types for tracked metrics in a multi rank setup (#3773)
* feat(training): bump accelerate + use reduction types for tracked metrics in a multi rank setup

* chore: address feedback
2026-06-11 19:07:28 +02:00
Steven Palma 6fbcf67249 chore: update readme (#3774)
* chore: update readme

* chore: update authors in project readme
2026-06-11 18:17:26 +02:00
Pepijn 41166b39fb fix(train): synchronize EpisodeAwareSampler shuffling across ranks and gate dataset download per node (#3768)
* fix(datasets): expose a generator on EpisodeAwareSampler for distributed shuffle sync

In distributed training, accelerate can only synchronize the shuffle
permutation across ranks when the sampler exposes a generator attribute.
EpisodeAwareSampler shuffled via the global torch RNG, so disjoint batch
shards relied on every rank's global CPU RNG staying in lockstep forever;
any rank-asymmetric RNG consumption (e.g. eval rollouts on the main
process only) silently desynced the permutations and ranks trained on
overlapping/missing samples.

* fix(train): seed sampler generator and gate dataset download per node

- Pass a generator seeded with cfg.seed to EpisodeAwareSampler so
  accelerator.prepare registers it as the synchronized RNG and the
  shuffle order is reproducible.
- Gate the initial make_dataset call on is_local_main_process instead of
  is_main_process: the global main process only exists on node 0, so on
  every other node all local ranks were downloading the dataset and
  building the Arrow cache concurrently.
2026-06-11 11:07:42 +02:00
11 changed files with 1265 additions and 1157 deletions
+10 -7
View File
@@ -58,7 +58,7 @@ action = model.select_action(obs)
robot.send_action(action)
```
**Supported Hardware:** SO100, LeKiwi, Koch, HopeJR, OMX, EarthRover, Reachy2, Gamepads, Keyboards, Phones, OpenARM, Unitree G1.
**Supported Hardware:** SO100, LeKiwi, Koch, HopeJR, OMX, EarthRover, Reachy2, Gamepads, Keyboards, Phones, OpenARM, Unitree G1, reBot B601.
While these devices are natively integrated into the LeRobot codebase, the library is designed to be extensible. You can easily implement the Robot interface to utilize LeRobot's data collection, training, and visualization tools for your own custom robot.
@@ -101,11 +101,13 @@ lerobot-train \
--dataset.repo_id=lerobot/aloha_mobile_cabinet
```
| Category | Models |
| -------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Imitation Learning** | [ACT](./docs/source/policy_act_README.md), [Diffusion](./docs/source/policy_diffusion_README.md), [VQ-BeT](./docs/source/policy_vqbet_README.md), [Multitask DiT Policy](./docs/source/policy_multi_task_dit_README.md) |
| **Reinforcement Learning** | [HIL-SERL](./docs/source/hilserl.mdx), [TDMPC](./docs/source/policy_tdmpc_README.md) & QC-FQL (coming soon) |
| **VLAs Models** | [Pi0Fast](./docs/source/pi0fast.mdx), [Pi0.5](./docs/source/pi05.mdx), [GR00T N1.5](./docs/source/policy_groot_README.md), [SmolVLA](./docs/source/policy_smolvla_README.md), [XVLA](./docs/source/xvla.mdx) |
| Category | Models |
| -------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Imitation Learning** | [ACT](./docs/source/policy_act_README.md), [Diffusion](./docs/source/policy_diffusion_README.md), [VQ-BeT](./docs/source/policy_vqbet_README.md), [Multitask DiT Policy](./docs/source/policy_multi_task_dit_README.md) |
| **Reinforcement Learning** | [HIL-SERL](./docs/source/hilserl.mdx), [TDMPC](./docs/source/policy_tdmpc_README.md) & QC-FQL (coming soon) |
| **VLAs Models** | [Pi0](./docs/source/pi0.mdx), [Pi0Fast](./docs/source/pi0fast.mdx), [Pi0.5](./docs/source/pi05.mdx), [GR00T N1.5](./docs/source/policy_groot_README.md), [SmolVLA](./docs/source/policy_smolvla_README.md), [XVLA](./docs/source/xvla.mdx), [EO-1](./docs/source/eo1.mdx), [MolmoAct2](./docs/source/molmoact2.mdx), [WALL-OSS](./docs/source/walloss.mdx) |
| **World Models** | [VLA-JEPA](./docs/source/vla_jepa.mdx) (more coming soon) |
| **Reward Models** | [SARM](./docs/source/sarm.mdx), [TOPReward](./docs/source/topreward.mdx), [Robometer](./docs/source/robometer.mdx) |
Similarly to the hardware, you can easily implement your own policy & leverage LeRobot's data collection, training, and visualization tools, and share your model to the HF Hub
@@ -133,6 +135,7 @@ Learn how to implement your own simulation environment or benchmark and distribu
- **[Discord](https://discord.gg/q8Dzzpym3f):** Join the `LeRobot` server to discuss with the community.
- **[X](https://x.com/LeRobotHF):** Follow us on X to stay up-to-date with the latest developments.
- **[Robot Learning Tutorial](https://huggingface.co/spaces/lerobot/robot-learning-tutorial):** A free, hands-on course to learn robot learning using LeRobot.
- **[T-Shirt Folding Experiment](https://huggingface.co/spaces/lerobot/robot-folding):** An end-to-end demonstration of folding t-shirts with LeRobot.
## Citation
@@ -140,7 +143,7 @@ If you use LeRobot in your project, please cite the GitHub repository to acknowl
```bibtex
@misc{cadene2024lerobot,
author = {Cadene, Remi and Alibert, Simon and Soare, Alexander and Gallouedec, Quentin and Zouitine, Adil and Palma, Steven and Kooijmans, Pepijn and Aractingi, Michel and Shukor, Mustafa and Aubakirova, Dana and Russi, Martino and Capuano, Francesco and Pascal, Caroline and Choghari, Jade and Moss, Jess and Wolf, Thomas},
author = {Cadene, Remi and Alibert, Simon and Soare, Alexander and Gallouedec, Quentin and Zouitine, Adil and Palma, Steven and Kooijmans, Pepijn and Aractingi, Michel and Shukor, Mustafa and Aubakirova, Dana and Russi, Martino and Capuano, Francesco and Pascal, Caroline and Choghari, Jade and Meftah, Khalil and Ellerbach, Maxime and Moss, Jess and Wolf, Thomas},
title = {LeRobot: State-of-the-art Machine Learning for Real-World Robotics in Pytorch},
howpublished = "\url{https://github.com/huggingface/lerobot}",
year = {2024}
+13 -7
View File
@@ -115,8 +115,8 @@ dataset = [
]
training = [
"lerobot[dataset]",
"accelerate>=1.10.0,<2.0.0",
"wandb>=0.24.0,<0.25.0",
"wandb>=0.24.0,<0.28.0",
"lerobot[accelerate-dep]",
]
hardware = [
"lerobot[pynput-dep]",
@@ -124,7 +124,7 @@ hardware = [
"lerobot[deepdiff-dep]",
]
viz = [
"rerun-sdk>=0.24.0,<0.34.0",
"rerun-sdk>=0.24.0,<0.27.0",
]
# ── User-facing composite extras (map to CLI scripts) ─────
# lerobot-record, lerobot-replay, lerobot-calibrate, lerobot-teleoperate, etc.
@@ -142,7 +142,8 @@ pygame-dep = ["pygame>=2.5.1,<2.7.0"]
# (noble ships urdfdom 3.x). Cap below 0.9.16 until system urdfdom 4.x is broadly available.
placo-dep = ["placo>=0.9.6,<0.9.16"]
transformers-dep = ["transformers>=5.4.0,<5.6.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf>=6.31.1,<6.32.0"]
grpcio-dep = ["grpcio>=1.73.1,<2.0.0", "protobuf>=6.31.1,<8.0.0"]
accelerate-dep = ["accelerate>=1.14.0,<2.0.0"]
can-dep = ["python-can>=4.2.0,<5.0.0"]
peft-dep = ["peft>=0.18.0,<1.0.0"]
scipy-dep = ["scipy>=1.14.0,<2.0.0"]
@@ -177,7 +178,12 @@ unitree_g1 = [
"lerobot[matplotlib-dep]",
"lerobot[pygame-dep]",
]
reachy2 = ["reachy2_sdk>=1.0.15,<1.1.0"]
# reachy2-sdk caps grpcio<=1.73.1 and protobuf<=6.32.0; quarantined here so downstream users aren't held back. reachy2-sdk is unlikely to release new versions.
reachy2 = [
"reachy2_sdk>=1.0.15,<1.1.0",
"grpcio<=1.73.1",
"protobuf<=6.32.0",
]
# Seeed Studio reBot B601-DM follower (motorbridge / CAN) + StarArm102 / reBot Arm 102
# leader (motorbridge-smart-servo / FashionStar UART servos).
rebot = ["lerobot[motorbridge-dep]", "lerobot[motorbridge-smart-servo-dep]"]
@@ -199,7 +205,7 @@ wallx = [
]
pi = ["lerobot[transformers-dep]", "lerobot[scipy-dep]"]
molmoact2 = ["lerobot[transformers-dep]", "lerobot[peft-dep]", "lerobot[scipy-dep]"]
smolvla = ["lerobot[transformers-dep]", "num2words>=0.5.14,<0.6.0", "accelerate>=1.7.0,<2.0.0"]
smolvla = ["lerobot[transformers-dep]", "num2words>=0.5.14,<0.6.0", "lerobot[accelerate-dep]"]
multi_task_dit = ["lerobot[transformers-dep]", "lerobot[diffusers-dep]"]
groot = [
"lerobot[transformers-dep]",
@@ -224,7 +230,7 @@ async = ["lerobot[grpcio-dep]", "lerobot[matplotlib-dep]"]
peft = ["lerobot[transformers-dep]", "lerobot[peft-dep]"]
# Development
dev = ["pre-commit>=3.7.0,<5.0.0", "debugpy>=1.8.1,<1.9.0", "lerobot[grpcio-dep]", "grpcio-tools==1.73.1", "mypy>=1.19.1", "ruff>=0.14.1", "lerobot[notebook]"]
dev = ["pre-commit>=3.7.0,<5.0.0", "debugpy>=1.8.1,<1.9.0", "lerobot[grpcio-dep]", "grpcio-tools>=1.73.1,<2.0.0", "mypy>=1.19.1", "ruff>=0.14.1", "lerobot[notebook]"]
notebook = ["jupyter>=1.0.0,<2.0.0", "ipykernel>=6.0.0,<7.0.0"]
test = ["pytest>=8.1.0,<9.0.0", "pytest-timeout>=2.4.0,<3.0.0", "pytest-cov>=5.0.0,<8.0.0", "mock-serial>=0.0.1,<0.1.0 ; sys_platform != 'win32'"]
video_benchmark = ["scikit-image>=0.23.2,<0.26.0", "pandas>=2.2.2,<2.4.0"]
+7 -1
View File
@@ -30,6 +30,7 @@ class EpisodeAwareSampler:
drop_n_first_frames: int = 0,
drop_n_last_frames: int = 0,
shuffle: bool = False,
generator: torch.Generator | None = None,
):
"""Sampler that optionally incorporates episode boundary information.
@@ -41,6 +42,10 @@ class EpisodeAwareSampler:
drop_n_first_frames: Number of frames to drop from the start of each episode.
drop_n_last_frames: Number of frames to drop from the end of each episode.
shuffle: Whether to shuffle the indices.
generator: Generator used for shuffling. Exposing this attribute (even when None) lets
`accelerate` register it as the synchronized RNG in distributed training, so
every rank draws the same permutation and batch shards stay disjoint. When
None, shuffling falls back to the global torch RNG.
"""
if drop_n_first_frames < 0:
raise ValueError(f"drop_n_first_frames must be >= 0, got {drop_n_first_frames}")
@@ -73,10 +78,11 @@ class EpisodeAwareSampler:
self.indices = indices
self.shuffle = shuffle
self.generator = generator
def __iter__(self) -> Iterator[int]:
if self.shuffle:
for i in torch.randperm(len(self.indices)):
for i in torch.randperm(len(self.indices), generator=self.generator):
yield self.indices[i]
else:
for i in self.indices:
+13 -49
View File
@@ -77,21 +77,6 @@ from lerobot.utils.constants import ACTION, DONE, OBS_STATE, REWARD
from lerobot.utils.utils import init_logging
def get_feature_names(dataset: LeRobotDataset, key: str) -> list[str]:
"""Return per-dimension names for a feature from the dataset metadata.
Only flat-list ``names`` metadata is used. Dict-style ``names`` and missing names fall back to ``{key}_{i}`` indices.
"""
feature = dataset.features[key]
dim = feature["shape"][-1]
names = feature.get("names")
if isinstance(names, list) and len(names) == dim:
return [str(name) for name in names]
return [f"{key}_{d}" for d in range(dim)]
def to_hwc_uint8_numpy(chw_float32_torch: torch.Tensor) -> np.ndarray:
assert chw_float32_torch.dtype == torch.float32
assert chw_float32_torch.ndim == 3
@@ -101,31 +86,6 @@ def to_hwc_uint8_numpy(chw_float32_torch: torch.Tensor) -> np.ndarray:
return hwc_uint8_numpy
def build_blueprint_from_dataset(dataset: LeRobotDataset):
"""Build a Rerun blueprint laying out camera images and time series for the given dataset.
Camera images and scalar signals (action, state, reward, done, success) are arranged in a grid.
The per-dimension series names for ``action`` and ``state`` are applied directly
via blueprint overrides.
"""
import rerun as rr
import rerun.blueprint as rrb
views = [rrb.Spatial2DView(origin=key, name=key) for key in dataset.meta.camera_keys]
# Style multi-dimensional signals (action, state) with per-dimension names.
for origin, key in ((ACTION, ACTION), ("state", OBS_STATE)):
if key in dataset.features:
names = get_feature_names(dataset, key)
styling = rr.SeriesLines(names=names)
views.append(rrb.TimeSeriesView(origin=origin, name=origin, overrides={origin: styling}))
for key in (DONE, REWARD, "next.success"):
if key in dataset.features:
views.append(rrb.TimeSeriesView(origin=key, name=key))
return rrb.Blueprint(rrb.Grid(*views))
def visualize_dataset(
dataset: LeRobotDataset,
episode_index: int,
@@ -164,8 +124,7 @@ def visualize_dataset(
import rerun as rr
spawn_local_viewer = mode == "local" and not save
blueprint = build_blueprint_from_dataset(dataset)
rr.init(f"{repo_id}/episode_{episode_index}", spawn=spawn_local_viewer, default_blueprint=blueprint)
rr.init(f"{repo_id}/episode_{episode_index}", spawn=spawn_local_viewer)
# Manually call python garbage collector after `rr.init` to avoid hanging in a blocking flush
# when iterating on a dataloader with `num_workers` > 0
@@ -183,21 +142,26 @@ def visualize_dataset(
for batch in tqdm.tqdm(dataloader, total=len(dataloader)):
if first_index is None:
first_index = batch["index"][0].item()
# iterate over the batch
for i in range(len(batch["index"])):
rr.set_time("frame_index", sequence=batch["index"][i].item() - first_index)
rr.set_time("timestamp", timestamp=batch["timestamp"][i].item())
# display each camera image
for key in dataset.meta.camera_keys:
img = to_hwc_uint8_numpy(batch[key][i])
img_entity = rr.Image(img).compress() if display_compressed_images else rr.Image(img)
rr.log(key, entity=img_entity)
# display each dimension of action space (e.g. actuators command)
if ACTION in batch:
rr.log(ACTION, rr.Scalars(batch[ACTION][i].numpy()))
for dim_idx, val in enumerate(batch[ACTION][i]):
rr.log(f"{ACTION}/{dim_idx}", rr.Scalars(val.item()))
# display each dimension of observed state space (e.g. agent position in joint space)
if OBS_STATE in batch:
rr.log("state", rr.Scalars(batch[OBS_STATE][i].numpy()))
for dim_idx, val in enumerate(batch[OBS_STATE][i]):
rr.log(f"state/{dim_idx}", rr.Scalars(val.item()))
if DONE in batch:
rr.log(DONE, rr.Scalars(batch[DONE][i].item()))
@@ -209,6 +173,8 @@ def visualize_dataset(
rr.log("next.success", rr.Scalars(batch["next.success"][i].item()))
if mode == "local" and save:
# save .rrd locally
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
repo_id_str = repo_id.replace("/", "_")
rrd_path = output_dir / f"{repo_id_str}_episode_{episode_index}.rrd"
@@ -216,7 +182,7 @@ def visualize_dataset(
return rrd_path
elif mode == "distant":
# Keep the process alive while it serves the gRPC/web connection.
# stop the process from exiting since it is serving the websocket connection
try:
while True:
time.sleep(1)
@@ -331,14 +297,12 @@ def main():
)
logging.warning("Setting grpc_port to ws_port value.")
kwargs["grpc_port"] = kwargs.pop("ws_port")
else:
kwargs.pop("ws_port") # Always remove ws_port from kwargs
init_logging()
logging.info("Loading dataset")
dataset = LeRobotDataset(repo_id, episodes=[args.episode_index], root=root, tolerance_s=tolerance_s)
visualize_dataset(dataset, **kwargs)
visualize_dataset(dataset, **vars(args))
if __name__ == "__main__":
+52 -19
View File
@@ -99,6 +99,9 @@ def update_policy(
start_time = time.perf_counter()
policy.train()
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
# Compute sample weights if a weighter is provided
sample_weights = None
weight_stats = None
@@ -158,6 +161,8 @@ def update_policy(
train_metrics.grad_norm = grad_norm.item()
train_metrics.lr = optimizer.param_groups[0]["lr"]
train_metrics.update_s = time.perf_counter() - start_time
if torch.cuda.is_available():
train_metrics.gpu_mem_gb = torch.cuda.max_memory_allocated() / (1024**3)
return train_metrics, output_dict
@@ -232,15 +237,18 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
# Dataset loading synchronization: main process downloads first to avoid race conditions
if is_main_process:
logging.info("Creating dataset")
# Dataset loading synchronization: each node's local main process downloads first to avoid
# race conditions (the global main process only exists on node 0, so gating on it would let
# all ranks of the other nodes download and build the Arrow cache concurrently).
if accelerator.is_local_main_process:
if is_main_process:
logging.info("Creating dataset")
dataset = make_dataset(cfg)
accelerator.wait_for_everyone()
# Now all other processes can safely load the dataset
if not is_main_process:
# Now all other processes can safely load the dataset from the local cache
if not accelerator.is_local_main_process:
dataset = make_dataset(cfg)
# Create environment used for evaluating checkpoints during training on simulation data.
@@ -386,12 +394,19 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
# create dataloader for offline training
if hasattr(active_cfg, "drop_n_last_frames"):
shuffle = False
# A dedicated generator (rather than the global torch RNG) lets accelerator.prepare
# synchronize the shuffle permutation across ranks, keeping batch shards disjoint even
# when ranks consume the global RNG asymmetrically (e.g. eval on the main process only).
sampler_generator = torch.Generator()
if cfg.seed is not None:
sampler_generator.manual_seed(cfg.seed)
sampler = EpisodeAwareSampler(
dataset.meta.episodes["dataset_from_index"],
dataset.meta.episodes["dataset_to_index"],
episode_indices_to_use=dataset.episodes,
drop_n_last_frames=active_cfg.drop_n_last_frames,
shuffle=True,
generator=sampler_generator,
)
else:
shuffle = True
@@ -424,12 +439,22 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
policy.train()
train_metrics = {
"loss": AverageMeter("loss", ":.3f"),
# Per-rank loss reflects only one shard of the global batch; mean recovers the loss DDP
# is actually optimizing. grad_norm and lr are already identical on every rank (post
# gradient sync / deterministic scheduler) so reducing them would be a no-op collective.
"loss": AverageMeter("loss", ":.3f", reduction="mean"),
"grad_norm": AverageMeter("grdn", ":.3f"),
"lr": AverageMeter("lr", ":0.1e"),
"update_s": AverageMeter("updt_s", ":.3f"),
"dataloading_s": AverageMeter("data_s", ":.3f"),
# Report the slowest rank for bottleneck-style timings so multi-GPU runs surface the
# true straggler instead of rank 0's view.
"update_s": AverageMeter("updt_s", ":.3f", reduction="max"),
"dataloading_s": AverageMeter("data_s", ":.3f", reduction="max"),
# Derived from the post-reduce max step time; set once per log window on the main rank.
"samples_per_s": AverageMeter("smp/s", ":.0f"),
}
if torch.cuda.is_available():
# max() because headroom is gated by the worst-case rank.
train_metrics["gpu_mem_gb"] = AverageMeter("mem_gb", ":.2f", reduction="max")
# Keep global batch size for logging; MetricsTracker handles world size internally.
effective_batch_size = cfg.batch_size * accelerator.num_processes
@@ -481,21 +506,29 @@ def train(cfg: TrainPipelineConfig, accelerator: "Accelerator | None" = None):
if is_main_process:
progbar.update(1)
train_tracker.step()
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 and is_main_process
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0
if is_log_step:
logging.info(train_tracker)
if wandb_logger:
wandb_log_dict = train_tracker.to_dict()
if output_dict:
wandb_log_dict.update(output_dict)
# Log sample weighting statistics if enabled
if sample_weighter is not None:
weighter_stats = sample_weighter.get_stats()
wandb_log_dict.update({f"sample_weighting/{k}": v for k, v in weighter_stats.items()})
wandb_logger.log_dict(wandb_log_dict, step)
# Collective reduce must run on every rank, before the main-process gate below.
train_tracker.reduce_across_ranks()
if is_main_process:
# Cluster-wide throughput, derived from the already-reduced (max) step time so it
# reflects the slowest rank — which is what actually gates the next iteration.
step_time = train_tracker.update_s.avg + train_tracker.dataloading_s.avg
if step_time > 0:
train_tracker.samples_per_s = effective_batch_size / step_time
logging.info(train_tracker)
if wandb_logger:
wandb_log_dict = train_tracker.to_dict()
if output_dict:
wandb_log_dict.update(output_dict)
# Log sample weighting statistics if enabled
if sample_weighter is not None:
weighter_stats = sample_weighter.get_stats()
wandb_log_dict.update({f"sample_weighting/{k}": v for k, v in weighter_stats.items()})
wandb_logger.log_dict(wandb_log_dict, step)
train_tracker.reset_averages()
if cfg.save_checkpoint and is_saving_step:
+50 -1
View File
@@ -13,21 +13,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from collections.abc import Callable
from typing import Any
import torch
from .utils import format_big_number
_VALID_REDUCTIONS = ("none", "max", "mean", "sum")
class AverageMeter:
"""
Computes and stores the average and current value
Adapted from https://github.com/pytorch/examples/blob/main/imagenet/main.py
Args:
name: Display name of the metric.
fmt: Format string used when rendering the metric.
reduction: Cross-process reduction applied by :meth:`MetricsTracker.reduce_across_ranks`
before logging. One of ``"none"`` (per-rank value, default), ``"max"``, ``"mean"``,
or ``"sum"``. Use ``"max"`` for bottleneck-style metrics (e.g. dataloading or
update wall time) so multi-GPU runs report the slowest rank rather than rank 0.
"""
def __init__(self, name: str, fmt: str = ":f"):
def __init__(self, name: str, fmt: str = ":f", reduction: str = "none"):
if reduction not in _VALID_REDUCTIONS:
raise ValueError(
f"Invalid reduction {reduction!r} for AverageMeter; expected one of {_VALID_REDUCTIONS}."
)
self.name = name
self.fmt = fmt
self.reduction = reduction
self.reset()
def reset(self) -> None:
@@ -138,6 +156,37 @@ class MetricsTracker:
self.episodes = self.samples / self._avg_samples_per_ep
self.epochs = self.samples / self._num_frames
def reduce_across_ranks(self) -> None:
"""
Synchronises the running averages of every metric whose ``reduction`` is not ``"none"``
across all distributed processes (in-place).
This is a collective operation and MUST be invoked on every rank — typically just before
logging. With no accelerator or in single-process runs it is a no-op. Without it, metrics
reported by the main process only reflect rank 0; for bottleneck-style timings
(``dataloading_s``, ``update_s``, ...) that means the slowest worker's stall is invisible.
"""
if self.accelerator is None or self.accelerator.num_processes <= 1:
return
buckets: dict[str, list[str]] = defaultdict(list)
for name, meter in self.metrics.items():
if meter.reduction != "none":
buckets[meter.reduction].append(name)
if not buckets:
return
device = self.accelerator.device
for reduction, names in buckets.items():
tensor = torch.tensor([self.metrics[n].avg for n in names], dtype=torch.float32, device=device)
reduced = self.accelerator.reduce(tensor, reduction=reduction)
for name, value in zip(names, reduced.tolist(), strict=True):
meter = self.metrics[name]
# Preserve avg == sum / count so a later .update() on this meter accumulates
# against the cluster view, not the stale per-rank history.
meter.avg = value
meter.sum = value * meter.count
def __str__(self) -> str:
display_list = [
f"step:{format_big_number(self.steps)}",
+12 -53
View File
@@ -38,8 +38,6 @@ def init_rerun(
require_package("rerun-sdk", extra="viz", import_name="rerun")
import rerun as rr
log_rerun_data.blueprint = None # Reset blueprint cache for new session
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
rr.init(session_name)
@@ -65,38 +63,6 @@ def _is_scalar(x):
)
def _build_blueprint(observation_paths: set[str], action_paths: set[str], image_paths: set[str]):
"""Build a Rerun blueprint laying out camera images, observation and action scalars in separate views.
Camera images, observation and action scalars are arranged in a grid.
"""
# Safe + zero-overhead: `log_rerun_data` already ran the `require_package` guard and imported rerun.
import rerun.blueprint as rrb
views = [rrb.Spatial2DView(origin=path, name=path) for path in sorted(image_paths)]
if observation_paths:
views.append(rrb.TimeSeriesView(name="observation", contents=sorted(observation_paths)))
if action_paths:
views.append(rrb.TimeSeriesView(name="action", contents=sorted(action_paths)))
return rrb.Blueprint(rrb.Grid(*views))
def _ensure_blueprint(observation_paths: set[str], action_paths: set[str], image_paths: set[str]) -> None:
"""Build and send the blueprint once, from the first observation and action data."""
if getattr(log_rerun_data, "blueprint", None) is not None:
return
# Safe + zero-overhead: `log_rerun_data` already ran the `require_package` guard and imported rerun.
import rerun as rr
blueprint = _build_blueprint(observation_paths, action_paths, image_paths)
log_rerun_data.blueprint = blueprint
rr.send_blueprint(blueprint)
def log_rerun_data(
observation: RobotObservation | None = None,
action: RobotAction | None = None,
@@ -110,15 +76,11 @@ def log_rerun_data(
- Scalars values (floats, ints) are logged as `rr.Scalars`.
- 3D NumPy arrays that resemble images (e.g., with 1, 3, or 4 channels first) are transposed
from CHW to HWC format, (optionally) compressed to JPEG and logged as `rr.Image` or `rr.EncodedImage`.
- 1D NumPy arrays are logged as a single `rr.Scalars` batch under one entity path, so that every
dimension shares the same view instead of being split across one view per element.
- Multi-dimensional **action** arrays are flattened and logged as a single `rr.Scalars` batch.
- 1D NumPy arrays are logged as a series of individual scalars, with each element indexed.
- Other multi-dimensional arrays are flattened and logged as individual scalars.
Keys are automatically namespaced with "observation." or "action." if not already present.
On the first call, a blueprint is built and sent so observation and action scalars get separate
time-series views and each image gets its own spatial view.
Args:
observation: An optional dictionary containing observation data to log.
action: An optional dictionary containing action data to log.
@@ -128,10 +90,6 @@ def log_rerun_data(
require_package("rerun-sdk", extra="viz", import_name="rerun")
import rerun as rr
observation_paths: set[str] = set()
action_paths: set[str] = set()
image_paths: set[str] = set()
if observation:
for k, v in observation.items():
if v is None:
@@ -140,19 +98,17 @@ def log_rerun_data(
if _is_scalar(v):
rr.log(key, rr.Scalars(float(v)))
observation_paths.add(key)
elif isinstance(v, np.ndarray):
arr = v
# Convert CHW -> HWC when needed
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.ndim == 1:
rr.log(key, rr.Scalars(arr.astype(float)))
observation_paths.add(key)
for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
else:
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
rr.log(key, entity=img_entity, static=True)
image_paths.add(key)
if action:
for k, v in action.items():
@@ -162,9 +118,12 @@ def log_rerun_data(
if _is_scalar(v):
rr.log(key, rr.Scalars(float(v)))
action_paths.add(key)
elif isinstance(v, np.ndarray):
rr.log(key, rr.Scalars(v.reshape(-1).astype(float)))
action_paths.add(key)
_ensure_blueprint(observation_paths, action_paths, image_paths)
if v.ndim == 1:
for i, vi in enumerate(v):
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
else:
# Fall back to flattening higher-dimensional arrays
flat = v.flatten()
for i, vi in enumerate(flat):
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
+24
View File
@@ -114,6 +114,30 @@ def test_shuffle():
assert set(sampler) == {0, 1, 2, 3, 4, 5}
def test_shuffle_with_generator_is_deterministic():
# Two samplers shuffling with same-seed generators must yield identical permutations.
# This is what keeps batch shards disjoint across ranks in distributed training, where
# accelerate synchronizes the sampler's generator state instead of the global torch RNG.
sampler_a = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
sampler_b = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
assert list(sampler_a) == list(sampler_b)
# Desyncing the global RNG must not affect the permutation.
sampler_c = EpisodeAwareSampler([0], [6], shuffle=True, generator=torch.Generator().manual_seed(42))
order_before = list(sampler_c)
sampler_c.generator.manual_seed(42)
torch.randperm(1000) # consume global RNG, as rank-asymmetric code (e.g. eval) would
assert list(sampler_c) == order_before
def test_generator_attribute_defaults_to_none():
# accelerate detects synchronizable samplers via `hasattr(sampler, "generator")`,
# so the attribute must exist even when no generator is passed.
sampler = EpisodeAwareSampler([0], [6], shuffle=True)
assert sampler.generator is None
assert set(sampler) == {0, 1, 2, 3, 4, 5}
def test_negative_drop_first_frames_raises():
with pytest.raises(ValueError, match="drop_n_first_frames must be >= 0"):
EpisodeAwareSampler([0], [10], drop_n_first_frames=-1)
+77 -1
View File
@@ -15,6 +15,7 @@
# limitations under the License.
import pytest
import torch
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
@@ -25,8 +26,16 @@ def mock_metrics():
class MockAccelerator:
def __init__(self, num_processes: int):
def __init__(self, num_processes: int, reduce_fn=None):
self.num_processes = num_processes
self.device = torch.device("cpu")
self._reduce_fn = reduce_fn
def reduce(self, tensor, reduction="mean"):
# In single-process tests we just want a deterministic stand-in for accelerate's reduce.
if self._reduce_fn is not None:
return self._reduce_fn(tensor, reduction)
return tensor
def test_average_meter_initialization():
@@ -157,3 +166,70 @@ def test_metrics_tracker_reset_averages(mock_metrics):
tracker.reset_averages()
assert tracker.loss.avg == 0.0
assert tracker.accuracy.avg == 0.0
def test_average_meter_invalid_reduction():
with pytest.raises(ValueError):
AverageMeter("loss", reduction="median")
def test_average_meter_reduction_stored():
meter = AverageMeter("updt_s", reduction="max")
assert meter.reduction == "max"
def test_metrics_tracker_reduce_across_ranks_no_accelerator():
metrics = {"update_s": AverageMeter("update_s", reduction="max")}
tracker = MetricsTracker(batch_size=32, num_frames=1000, num_episodes=50, metrics=metrics)
tracker.update_s = 0.5
tracker.reduce_across_ranks() # no-op without accelerator
assert tracker.update_s.avg == 0.5
def test_metrics_tracker_reduce_across_ranks_single_process():
metrics = {"update_s": AverageMeter("update_s", reduction="max")}
tracker = MetricsTracker(
batch_size=32,
num_frames=1000,
num_episodes=50,
metrics=metrics,
accelerator=MockAccelerator(num_processes=1),
)
tracker.update_s = 0.5
tracker.reduce_across_ranks() # no-op when world size is 1
assert tracker.update_s.avg == 0.5
def test_metrics_tracker_reduce_across_ranks_invokes_reduce():
captured = {}
def fake_reduce(tensor, reduction):
captured["reduction"] = reduction
captured["values"] = tensor.clone()
# Pretend the slowest rank reported 0.9 instead of this rank's 0.4.
return torch.tensor([0.9], dtype=tensor.dtype, device=tensor.device)
metrics = {
"loss": AverageMeter("loss"), # reduction="none" -> not touched
"update_s": AverageMeter("update_s", reduction="max"),
}
tracker = MetricsTracker(
batch_size=32,
num_frames=1000,
num_episodes=50,
metrics=metrics,
accelerator=MockAccelerator(num_processes=4, reduce_fn=fake_reduce),
)
tracker.loss = 1.0
tracker.update_s = 0.4
tracker.reduce_across_ranks()
assert captured["reduction"] == "max"
assert torch.allclose(captured["values"], torch.tensor([0.4]))
assert tracker.update_s.avg == pytest.approx(0.9)
# Metrics without a reduction stay untouched.
assert tracker.loss.avg == 1.0
# Invariant: avg == sum / count must hold after reduce, so subsequent .update() calls
# accumulate against the cluster view rather than the stale per-rank sum.
meter = tracker.update_s
assert meter.sum / meter.count == pytest.approx(meter.avg)
+34 -103
View File
@@ -30,46 +30,25 @@ from lerobot.utils.constants import OBS_STATE
@pytest.fixture
def mock_rerun(monkeypatch):
"""
Provide a mock `rerun` module (and `rerun.blueprint` submodule) so tests don't
depend on the real library. Also reload the module-under-test so it binds to
this mock `rr`.
Provide a mock `rerun` module so tests don't depend on the real library.
Also reload the module-under-test so it binds to this mock `rr`.
"""
calls = []
blueprints = []
class DummyScalar:
def __init__(self, value):
# Scalars may be built from a single float or from a 1D array batch.
self.value = value
self.value = float(value)
class DummyImage:
def __init__(self, arr):
self.arr = arr
def compress(self, *a, **k):
return self
def dummy_log(key, obj=None, **kwargs):
# Accept either positional `obj` or keyword `entity` and record remaining kwargs.
if obj is None and "entity" in kwargs:
obj = kwargs.pop("entity")
calls.append((key, obj, kwargs))
def dummy_send_blueprint(blueprint, *a, **k):
blueprints.append(blueprint)
# Mock the `rerun.blueprint` submodule used to build the layout.
dummy_rrb = SimpleNamespace(
Spatial2DView=lambda origin=None, name=None: SimpleNamespace(
kind="Spatial2DView", origin=origin, name=name
),
TimeSeriesView=lambda name=None, contents=None: SimpleNamespace(
kind="TimeSeriesView", name=name, contents=contents
),
Grid=lambda *views: SimpleNamespace(kind="Grid", views=list(views)),
Blueprint=lambda root: SimpleNamespace(kind="Blueprint", root=root),
)
dummy_rr = SimpleNamespace(
__name__="rerun",
__package__="rerun",
@@ -77,23 +56,20 @@ def mock_rerun(monkeypatch):
Scalars=DummyScalar,
Image=DummyImage,
log=dummy_log,
send_blueprint=dummy_send_blueprint,
init=lambda *a, **k: None,
spawn=lambda *a, **k: None,
blueprint=dummy_rrb,
)
# Inject fake modules into sys.modules (both `rerun` and `rerun.blueprint`).
# Inject fake module into sys.modules
monkeypatch.setitem(sys.modules, "rerun", dummy_rr)
monkeypatch.setitem(sys.modules, "rerun.blueprint", dummy_rrb)
# Now import and reload the module under test, to bind to our rerun mock
import lerobot.utils.visualization_utils as vu
importlib.reload(vu)
# Expose the reloaded module, the call recorder and the captured blueprints
yield vu, calls, blueprints
# Expose both the reloaded module and the call recorder
yield vu, calls
def _keys(calls):
@@ -116,13 +92,8 @@ def _kwargs_for(calls, key):
raise KeyError(f"Key {key} not found in calls: {calls}")
def _views_by_kind(blueprint, kind):
"""Return the views of a given kind from the (single) blueprint's grid."""
return [v for v in blueprint.root.views if v.kind == kind]
def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun):
vu, calls, blueprints = mock_rerun
vu, calls = mock_rerun
# Build EnvTransition dict
obs = {
@@ -132,7 +103,7 @@ def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun):
}
act = {
"action.throttle": 0.7,
# 1D array should be logged as a single Scalars batch under one entity path
# 1D array should log individual Scalars with suffix _i
"action.vector": np.array([1.0, 2.0], dtype=np.float32),
}
transition = {
@@ -149,28 +120,31 @@ def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun):
# - observation.state.temperature -> Scalars
# - observation.camera -> Image (HWC) with static=True
# - action.throttle -> Scalars
# - action.vector -> single Scalars batch (no per-element suffix)
# - action.vector_0, action.vector_1 -> Scalars
expected_keys = {
f"{OBS_STATE}.temperature",
"observation.camera",
"action.throttle",
"action.vector",
"action.vector_0",
"action.vector_1",
}
assert set(_keys(calls)) == expected_keys
# Check scalar types and values
temp_obj = _obj_for(calls, f"{OBS_STATE}.temperature")
assert type(temp_obj).__name__ == "DummyScalar"
assert float(temp_obj.value) == pytest.approx(25.0)
assert temp_obj.value == pytest.approx(25.0)
throttle_obj = _obj_for(calls, "action.throttle")
assert type(throttle_obj).__name__ == "DummyScalar"
assert float(throttle_obj.value) == pytest.approx(0.7)
assert throttle_obj.value == pytest.approx(0.7)
# 1D vector logged as a single batched Scalars under one entity path
vec = _obj_for(calls, "action.vector")
assert type(vec).__name__ == "DummyScalar"
np.testing.assert_allclose(np.asarray(vec.value), [1.0, 2.0])
v0 = _obj_for(calls, "action.vector_0")
v1 = _obj_for(calls, "action.vector_1")
assert type(v0).__name__ == "DummyScalar"
assert type(v1).__name__ == "DummyScalar"
assert v0.value == pytest.approx(1.0)
assert v1.value == pytest.approx(2.0)
# Check image handling: CHW -> HWC
img_obj = _obj_for(calls, "observation.camera")
@@ -178,24 +152,9 @@ def test_log_rerun_data_envtransition_scalars_and_image(mock_rerun):
assert img_obj.arr.shape == (10, 20, 3) # transposed
assert _kwargs_for(calls, "observation.camera").get("static", False) is True # static=True for images
# A blueprint should have been built and sent exactly once, and cached on the function.
assert len(blueprints) == 1
assert vu.log_rerun_data.blueprint is blueprints[0]
bp = blueprints[0]
# One spatial view per image path
spatial_views = _views_by_kind(bp, "Spatial2DView")
assert {v.origin for v in spatial_views} == {"observation.camera"}
# One time-series view each for observation and action scalars
ts_views = {v.name: v for v in _views_by_kind(bp, "TimeSeriesView")}
assert set(ts_views) == {"observation", "action"}
assert ts_views["observation"].contents == [f"{OBS_STATE}.temperature"]
assert ts_views["action"].contents == ["action.throttle", "action.vector"]
def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun):
vu, calls, blueprints = mock_rerun
vu, calls = mock_rerun
# First dict without prefixes treated as observation
# Second dict without prefixes treated as action
@@ -214,12 +173,14 @@ def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun):
# First dict was treated as observation, second as action
vu.log_rerun_data(observation=obs_plain, action=act_plain)
# Expected keys with auto-prefixes. The 1D vector is a single batched Scalars.
# Expected keys with auto-prefixes
expected = {
"observation.temp",
"observation.img",
"action.throttle",
"action.vec",
"action.vec_0",
"action.vec_1",
"action.vec_2",
}
logged = set(_keys(calls))
assert logged == expected
@@ -227,11 +188,11 @@ def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun):
# Scalars
t = _obj_for(calls, "observation.temp")
assert type(t).__name__ == "DummyScalar"
assert float(t.value) == pytest.approx(1.5)
assert t.value == pytest.approx(1.5)
throttle = _obj_for(calls, "action.throttle")
assert type(throttle).__name__ == "DummyScalar"
assert float(throttle.value) == pytest.approx(0.3)
assert throttle.value == pytest.approx(0.3)
# Image stays HWC
img = _obj_for(calls, "observation.img")
@@ -239,23 +200,15 @@ def test_log_rerun_data_plain_list_ordering_and_prefixes(mock_rerun):
assert img.arr.shape == (5, 6, 3)
assert _kwargs_for(calls, "observation.img").get("static", False) is True
# Vector logged as a single batched Scalars under one entity path
vec = _obj_for(calls, "action.vec")
assert type(vec).__name__ == "DummyScalar"
np.testing.assert_allclose(np.asarray(vec.value), [9, 8, 7])
# Blueprint sent once with the expected view layout
assert len(blueprints) == 1
bp = blueprints[0]
spatial_views = _views_by_kind(bp, "Spatial2DView")
assert {v.origin for v in spatial_views} == {"observation.img"}
ts_views = {v.name: v for v in _views_by_kind(bp, "TimeSeriesView")}
assert ts_views["observation"].contents == ["observation.temp"]
assert ts_views["action"].contents == ["action.throttle", "action.vec"]
# Vectors
for i, val in enumerate([9, 8, 7]):
o = _obj_for(calls, f"action.vec_{i}")
assert type(o).__name__ == "DummyScalar"
assert o.value == pytest.approx(val)
def test_log_rerun_data_kwargs_only(mock_rerun):
vu, calls, blueprints = mock_rerun
vu, calls = mock_rerun
vu.log_rerun_data(
observation={"observation.temp": 10.0, "observation.gray": np.zeros((8, 8, 1), dtype=np.uint8)},
@@ -269,7 +222,7 @@ def test_log_rerun_data_kwargs_only(mock_rerun):
temp = _obj_for(calls, "observation.temp")
assert type(temp).__name__ == "DummyScalar"
assert float(temp.value) == pytest.approx(10.0)
assert temp.value == pytest.approx(10.0)
img = _obj_for(calls, "observation.gray")
assert type(img).__name__ == "DummyImage"
@@ -278,26 +231,4 @@ def test_log_rerun_data_kwargs_only(mock_rerun):
a = _obj_for(calls, "action.a")
assert type(a).__name__ == "DummyScalar"
assert float(a.value) == pytest.approx(1.0)
# Blueprint sent once, with a spatial view for the image and time-series views for scalars
assert len(blueprints) == 1
bp = blueprints[0]
assert {v.origin for v in _views_by_kind(bp, "Spatial2DView")} == {"observation.gray"}
ts_views = {v.name: v for v in _views_by_kind(bp, "TimeSeriesView")}
assert ts_views["observation"].contents == ["observation.temp"]
assert ts_views["action"].contents == ["action.a"]
def test_log_rerun_data_blueprint_sent_only_once(mock_rerun):
"""The blueprint is built from the first call and not resent on subsequent calls."""
vu, calls, blueprints = mock_rerun
vu.log_rerun_data(observation={"temp": 1.0}, action={"a": 2.0})
assert len(blueprints) == 1
first_blueprint = vu.log_rerun_data.blueprint
vu.log_rerun_data(observation={"temp": 3.0}, action={"a": 4.0})
# Still only one blueprint, and the cached one is unchanged.
assert len(blueprints) == 1
assert vu.log_rerun_data.blueprint is first_blueprint
assert a.value == pytest.approx(1.0)
Generated
+973 -916
View File
File diff suppressed because it is too large Load Diff