mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-30 22:57:00 +00:00
feat(eval): add multiprocess runtime -- no Docker needed
New eval.runtime=multiprocess spawns local lerobot-eval-worker
subprocesses instead of Docker containers. Supports eval.policy_servers
for parallel inference. Works on SLURM clusters and anywhere Docker
is unavailable.
Usage: lerobot-eval --eval.runtime=multiprocess \
--eval.instance_count=8 --eval.policy_servers=4 --eval.port=50051
Made-with: Cursor
This commit is contained in:
@@ -77,7 +77,8 @@ class EvalConfig:
|
||||
# Use AsyncVectorEnv (multiprocessing). Prefer SyncVectorEnv unless your environment
|
||||
# spends significant time in Python-side stepping and can benefit from process parallelism.
|
||||
use_async_envs: bool = False
|
||||
# Runtime where evaluation executes: "local" or "docker".
|
||||
# Runtime where evaluation executes: "local", "docker", or "multiprocess".
|
||||
# "multiprocess" spawns local worker processes + policy servers.
|
||||
runtime: str = "local"
|
||||
docker: EvalDockerConfig = field(default_factory=EvalDockerConfig)
|
||||
# Number of parallel eval script instances to launch for one run.
|
||||
@@ -86,14 +87,18 @@ class EvalConfig:
|
||||
# 0-indexed shard id for this process. Users usually leave this at 0.
|
||||
# Additional shards are launched automatically by `lerobot-eval` when instance_count > 1.
|
||||
instance_id: int = 0
|
||||
# Number of policy inference servers to run in parallel (Docker runtime only).
|
||||
# Number of policy inference servers to run in parallel (docker/multiprocess runtimes).
|
||||
# Each server loads a copy of the model and listens on consecutive ports
|
||||
# starting from eval.docker.port. Containers are round-robin assigned.
|
||||
# starting from eval.docker.port. Workers are round-robin assigned.
|
||||
policy_servers: int = 1
|
||||
# Base port for policy servers in multiprocess mode.
|
||||
port: int = 50051
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.runtime not in {"local", "docker"}:
|
||||
raise ValueError(f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker.")
|
||||
if self.runtime not in {"local", "docker", "multiprocess"}:
|
||||
raise ValueError(
|
||||
f"Unsupported eval.runtime '{self.runtime}'. Expected one of: local, docker, multiprocess."
|
||||
)
|
||||
if self.instance_count < 1:
|
||||
raise ValueError("eval.instance_count must be >= 1.")
|
||||
if self.instance_id < 0 or self.instance_id >= self.instance_count:
|
||||
|
||||
@@ -320,3 +320,109 @@ def run_eval_in_docker(cfg: EvalPipelineConfig) -> None:
|
||||
json.dump(info, f, indent=2)
|
||||
|
||||
logger.info("Docker eval complete. Results: %s/eval_info.json", output_dir)
|
||||
|
||||
|
||||
def run_eval_multiprocess(cfg: EvalPipelineConfig) -> None:
|
||||
"""Run eval with multiple local worker processes and policy servers (no Docker).
|
||||
|
||||
Same architecture as Docker runtime but spawns `lerobot-eval-worker` as local
|
||||
subprocesses. Works on SLURM clusters and anywhere Docker is unavailable.
|
||||
"""
|
||||
from lerobot.scripts.lerobot_eval import _aggregate_eval_from_per_task
|
||||
|
||||
start_t = time.time()
|
||||
output_dir = Path(cfg.output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
device = get_safe_torch_device(cfg.policy.device, log=True)
|
||||
torch.backends.cudnn.benchmark = True
|
||||
torch.backends.cuda.matmul.allow_tf32 = True
|
||||
|
||||
policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env, rename_map=cfg.rename_map)
|
||||
policy.eval()
|
||||
|
||||
preprocessor_overrides: dict = {
|
||||
"device_processor": {"device": str(device)},
|
||||
"rename_observations_processor": {"rename_map": cfg.rename_map},
|
||||
}
|
||||
preprocessor, postprocessor = make_pre_post_processors(
|
||||
policy_cfg=cfg.policy,
|
||||
pretrained_path=cfg.policy.pretrained_path,
|
||||
preprocessor_overrides=preprocessor_overrides,
|
||||
)
|
||||
env_preprocessor, _env_postprocessor = make_env_pre_post_processors(
|
||||
env_cfg=cfg.env, policy_cfg=cfg.policy,
|
||||
)
|
||||
|
||||
n_policy_servers = cfg.eval.policy_servers
|
||||
base_port = cfg.eval.port
|
||||
instance_count = cfg.eval.instance_count
|
||||
env_argv = _env_argv()
|
||||
|
||||
servers: list[_InferenceServer] = []
|
||||
for s_idx in range(n_policy_servers):
|
||||
port = base_port + s_idx
|
||||
if s_idx > 0:
|
||||
policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env, rename_map=cfg.rename_map)
|
||||
policy.eval()
|
||||
srv = _InferenceServer(
|
||||
("0.0.0.0", port), # nosec B104
|
||||
policy=policy,
|
||||
env_preprocessor=env_preprocessor,
|
||||
preprocessor=preprocessor,
|
||||
postprocessor=postprocessor,
|
||||
)
|
||||
t = threading.Thread(target=srv.serve_forever, daemon=True)
|
||||
t.start()
|
||||
servers.append(srv)
|
||||
logger.info("Policy server %d/%d on port %d", s_idx + 1, n_policy_servers, port)
|
||||
|
||||
worker_dirs: list[Path] = []
|
||||
procs: list[subprocess.Popen] = []
|
||||
try:
|
||||
for i in range(instance_count):
|
||||
assigned_port = base_port + (i % n_policy_servers)
|
||||
shard_dir = output_dir / "shards" / str(i)
|
||||
shard_dir.mkdir(parents=True, exist_ok=True)
|
||||
worker_dirs.append(shard_dir)
|
||||
|
||||
cmd = [
|
||||
sys.executable, "-m", "lerobot.scripts.lerobot_eval_worker",
|
||||
*env_argv,
|
||||
f"--server_address=127.0.0.1:{assigned_port}",
|
||||
f"--n_episodes={cfg.eval.n_episodes}",
|
||||
f"--seed={cfg.seed}",
|
||||
f"--instance_id={i}",
|
||||
f"--instance_count={instance_count}",
|
||||
f"--output_path={shard_dir / f'worker_{i}.json'}",
|
||||
]
|
||||
logger.info("Spawning worker %d/%d → port %d", i + 1, instance_count, assigned_port)
|
||||
procs.append(subprocess.Popen(cmd)) # nosec B603
|
||||
|
||||
failed: list[tuple[int, int]] = []
|
||||
for i, proc in enumerate(procs):
|
||||
rc = proc.wait()
|
||||
if rc != 0:
|
||||
failed.append((i, rc))
|
||||
logger.error("Worker %d/%d exited with code %d", i + 1, instance_count, rc)
|
||||
if failed:
|
||||
raise RuntimeError(f"Multiprocess eval workers failed (id, exit_code): {failed}")
|
||||
|
||||
finally:
|
||||
for srv in servers:
|
||||
srv.shutdown()
|
||||
|
||||
per_task: list[dict] = []
|
||||
for i, shard_dir in enumerate(worker_dirs):
|
||||
result_file = shard_dir / f"worker_{i}.json"
|
||||
with open(result_file) as f:
|
||||
shard_data: dict = json.load(f)
|
||||
per_task.extend(shard_data.get("per_task", []))
|
||||
|
||||
per_task.sort(key=lambda x: (x["task_group"], x["task_id"]))
|
||||
|
||||
info = _aggregate_eval_from_per_task(per_task, total_eval_s=time.time() - start_t)
|
||||
with open(output_dir / "eval_info.json", "w") as f:
|
||||
json.dump(info, f, indent=2)
|
||||
|
||||
logger.info("Multiprocess eval complete. Results: %s/eval_info.json", output_dir)
|
||||
|
||||
@@ -74,7 +74,7 @@ from tqdm import trange
|
||||
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.envs.docker_runtime import run_eval_in_docker
|
||||
|
||||
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
||||
from lerobot.envs.lazy_vec_env import LazyVectorEnv
|
||||
from lerobot.envs.utils import (
|
||||
@@ -674,8 +674,13 @@ def _maybe_push_eval_outputs(cfg: EvalPipelineConfig, info: dict) -> None:
|
||||
def _run_eval_worker(cfg: EvalPipelineConfig) -> dict:
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
|
||||
if cfg.eval.runtime == "docker":
|
||||
run_eval_in_docker(cfg)
|
||||
if cfg.eval.runtime in ("docker", "multiprocess"):
|
||||
from lerobot.envs.docker_runtime import run_eval_in_docker, run_eval_multiprocess
|
||||
|
||||
if cfg.eval.runtime == "docker":
|
||||
run_eval_in_docker(cfg)
|
||||
else:
|
||||
run_eval_multiprocess(cfg)
|
||||
output_dir = Path(cfg.output_dir)
|
||||
with open(output_dir / "eval_info.json") as f:
|
||||
return json.load(f)
|
||||
|
||||
Reference in New Issue
Block a user