From 066976e07858cfe44787ef6bdc8880c48929d99f Mon Sep 17 00:00:00 2001 From: Pepijn Kooijmans Date: Tue, 24 Mar 2026 22:14:27 +0100 Subject: [PATCH] feat(eval): add multiprocess runtime -- no Docker needed New eval.runtime=multiprocess spawns local lerobot-eval-worker subprocesses instead of Docker containers. Supports eval.policy_servers for parallel inference. Works on SLURM clusters and anywhere Docker is unavailable. Usage: lerobot-eval --eval.runtime=multiprocess \ --eval.instance_count=8 --eval.policy_servers=4 --eval.port=50051 Made-with: Cursor --- src/lerobot/configs/default.py | 15 ++-- src/lerobot/envs/docker_runtime.py | 106 ++++++++++++++++++++++++++++ src/lerobot/scripts/lerobot_eval.py | 11 ++- 3 files changed, 124 insertions(+), 8 deletions(-) diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index ead4f799a..ef7c3def3 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -77,7 +77,8 @@ class EvalConfig: # Use AsyncVectorEnv (multiprocessing). Prefer SyncVectorEnv unless your environment # spends significant time in Python-side stepping and can benefit from process parallelism. use_async_envs: bool = False - # Runtime where evaluation executes: "local" or "docker". + # Runtime where evaluation executes: "local", "docker", or "multiprocess". + # "multiprocess" spawns local worker processes + policy servers. runtime: str = "local" docker: EvalDockerConfig = field(default_factory=EvalDockerConfig) # Number of parallel eval script instances to launch for one run. @@ -86,14 +87,18 @@ class EvalConfig: # 0-indexed shard id for this process. Users usually leave this at 0. # Additional shards are launched automatically by `lerobot-eval` when instance_count > 1. instance_id: int = 0 - # Number of policy inference servers to run in parallel (Docker runtime only). + # Number of policy inference servers to run in parallel (docker/multiprocess runtimes). # Each server loads a copy of the model and listens on consecutive ports - # starting from eval.docker.port. Containers are round-robin assigned. + # starting from eval.docker.port. Workers are round-robin assigned. policy_servers: int = 1 + # Base port for policy servers in multiprocess mode. + port: int = 50051 def __post_init__(self) -> None: - if self.runtime not in {"local", "docker"}: - raise ValueError(f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker.") + if self.runtime not in {"local", "docker", "multiprocess"}: + raise ValueError( + f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker, multiprocess." + ) if self.instance_count < 1: raise ValueError("eval.instance_count must be >= 1.") if self.instance_id < 0 or self.instance_id >= self.instance_count: diff --git a/src/lerobot/envs/docker_runtime.py b/src/lerobot/envs/docker_runtime.py index 882914d14..a95a21762 100644 --- a/src/lerobot/envs/docker_runtime.py +++ b/src/lerobot/envs/docker_runtime.py @@ -320,3 +320,109 @@ def run_eval_in_docker(cfg: EvalPipelineConfig) -> None: json.dump(info, f, indent=2) logger.info("Docker eval complete. Results: %s/eval_info.json", output_dir) + + +def run_eval_multiprocess(cfg: EvalPipelineConfig) -> None: + """Run eval with multiple local worker processes and policy servers (no Docker). + + Same architecture as Docker runtime but spawns `lerobot-eval-worker` as local + subprocesses. Works on SLURM clusters and anywhere Docker is unavailable. + """ + from lerobot.scripts.lerobot_eval import _aggregate_eval_from_per_task + + start_t = time.time() + output_dir = Path(cfg.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + device = get_safe_torch_device(cfg.policy.device, log=True) + torch.backends.cudnn.benchmark = True + torch.backends.cuda.matmul.allow_tf32 = True + + policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env, rename_map=cfg.rename_map) + policy.eval() + + preprocessor_overrides: dict = { + "device_processor": {"device": str(device)}, + "rename_observations_processor": {"rename_map": cfg.rename_map}, + } + preprocessor, postprocessor = make_pre_post_processors( + policy_cfg=cfg.policy, + pretrained_path=cfg.policy.pretrained_path, + preprocessor_overrides=preprocessor_overrides, + ) + env_preprocessor, _env_postprocessor = make_env_pre_post_processors( + env_cfg=cfg.env, policy_cfg=cfg.policy, + ) + + n_policy_servers = cfg.eval.policy_servers + base_port = cfg.eval.port + instance_count = cfg.eval.instance_count + env_argv = _env_argv() + + servers: list[_InferenceServer] = [] + for s_idx in range(n_policy_servers): + port = base_port + s_idx + if s_idx > 0: + policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env, rename_map=cfg.rename_map) + policy.eval() + srv = _InferenceServer( + ("0.0.0.0", port), # nosec B104 + policy=policy, + env_preprocessor=env_preprocessor, + preprocessor=preprocessor, + postprocessor=postprocessor, + ) + t = threading.Thread(target=srv.serve_forever, daemon=True) + t.start() + servers.append(srv) + logger.info("Policy server %d/%d on port %d", s_idx + 1, n_policy_servers, port) + + worker_dirs: list[Path] = [] + procs: list[subprocess.Popen] = [] + try: + for i in range(instance_count): + assigned_port = base_port + (i % n_policy_servers) + shard_dir = output_dir / "shards" / str(i) + shard_dir.mkdir(parents=True, exist_ok=True) + worker_dirs.append(shard_dir) + + cmd = [ + sys.executable, "-m", "lerobot.scripts.lerobot_eval_worker", + *env_argv, + f"--server_address=127.0.0.1:{assigned_port}", + f"--n_episodes={cfg.eval.n_episodes}", + f"--seed={cfg.seed}", + f"--instance_id={i}", + f"--instance_count={instance_count}", + f"--output_path={shard_dir / f'worker_{i}.json'}", + ] + logger.info("Spawning worker %d/%d → port %d", i + 1, instance_count, assigned_port) + procs.append(subprocess.Popen(cmd)) # nosec B603 + + failed: list[tuple[int, int]] = [] + for i, proc in enumerate(procs): + rc = proc.wait() + if rc != 0: + failed.append((i, rc)) + logger.error("Worker %d/%d exited with code %d", i + 1, instance_count, rc) + if failed: + raise RuntimeError(f"Multiprocess eval workers failed (id, exit_code): {failed}") + + finally: + for srv in servers: + srv.shutdown() + + per_task: list[dict] = [] + for i, shard_dir in enumerate(worker_dirs): + result_file = shard_dir / f"worker_{i}.json" + with open(result_file) as f: + shard_data: dict = json.load(f) + per_task.extend(shard_data.get("per_task", [])) + + per_task.sort(key=lambda x: (x["task_group"], x["task_id"])) + + info = _aggregate_eval_from_per_task(per_task, total_eval_s=time.time() - start_t) + with open(output_dir / "eval_info.json", "w") as f: + json.dump(info, f, indent=2) + + logger.info("Multiprocess eval complete. Results: %s/eval_info.json", output_dir) diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index 7c8549688..5ac9bd146 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -74,7 +74,7 @@ from tqdm import trange from lerobot.configs import parser from lerobot.configs.eval import EvalPipelineConfig -from lerobot.envs.docker_runtime import run_eval_in_docker + from lerobot.envs.factory import make_env, make_env_pre_post_processors from lerobot.envs.lazy_vec_env import LazyVectorEnv from lerobot.envs.utils import ( @@ -674,8 +674,13 @@ def _maybe_push_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None: def _run_eval_worker(cfg: EvalPipelineConfig) -> dict: logging.info(pformat(asdict(cfg))) - if cfg.eval.runtime == "docker": - run_eval_in_docker(cfg) + if cfg.eval.runtime in ("docker", "multiprocess"): + from lerobot.envs.docker_runtime import run_eval_in_docker, run_eval_multiprocess + + if cfg.eval.runtime == "docker": + run_eval_in_docker(cfg) + else: + run_eval_multiprocess(cfg) output_dir = Path(cfg.output_dir) with open(output_dir / "eval_info.json") as f: return json.load(f)