mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-16 17:20:05 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 46e9e22b05 | |||
| b43f9ab048 |
@@ -67,7 +67,8 @@ class EvalConfig:
|
|||||||
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
|
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
|
||||||
batch_size: int = 50
|
batch_size: int = 50
|
||||||
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
|
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
|
||||||
use_async_envs: bool = False
|
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
|
||||||
|
use_async_envs: bool = True
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
if self.batch_size > self.n_episodes:
|
if self.batch_size > self.n_episodes:
|
||||||
|
|||||||
@@ -75,13 +75,14 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
|
|||||||
def create_envs(
|
def create_envs(
|
||||||
self,
|
self,
|
||||||
n_envs: int,
|
n_envs: int,
|
||||||
use_async_envs: bool = False,
|
use_async_envs: bool = True,
|
||||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||||
"""Create {suite: {task_id: VectorEnv}}.
|
"""Create {suite: {task_id: VectorEnv}}.
|
||||||
|
|
||||||
Default: single-task env via gym.make(). Multi-task benchmarks override.
|
Default: single-task env via gym.make(). Multi-task benchmarks override.
|
||||||
|
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
|
||||||
"""
|
"""
|
||||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||||
|
|
||||||
if self.gym_id not in gym_registry:
|
if self.gym_id not in gym_registry:
|
||||||
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
|
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
|
||||||
@@ -388,12 +389,12 @@ class LiberoEnv(EnvConfig):
|
|||||||
kwargs["task_ids"] = self.task_ids
|
kwargs["task_ids"] = self.task_ids
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
def create_envs(self, n_envs: int, use_async_envs: bool = False):
|
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||||
from lerobot.envs.libero import create_libero_envs
|
from lerobot.envs.libero import create_libero_envs
|
||||||
|
|
||||||
if self.task is None:
|
if self.task is None:
|
||||||
raise ValueError("LiberoEnv requires a task to be specified")
|
raise ValueError("LiberoEnv requires a task to be specified")
|
||||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||||
return create_libero_envs(
|
return create_libero_envs(
|
||||||
task=self.task,
|
task=self.task,
|
||||||
n_envs=n_envs,
|
n_envs=n_envs,
|
||||||
@@ -456,12 +457,12 @@ class MetaworldEnv(EnvConfig):
|
|||||||
"render_mode": self.render_mode,
|
"render_mode": self.render_mode,
|
||||||
}
|
}
|
||||||
|
|
||||||
def create_envs(self, n_envs: int, use_async_envs: bool = False):
|
def create_envs(self, n_envs: int, use_async_envs: bool = True):
|
||||||
from lerobot.envs.metaworld import create_metaworld_envs
|
from lerobot.envs.metaworld import create_metaworld_envs
|
||||||
|
|
||||||
if self.task is None:
|
if self.task is None:
|
||||||
raise ValueError("MetaWorld requires a task to be specified")
|
raise ValueError("MetaWorld requires a task to be specified")
|
||||||
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
|
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
|
||||||
return create_metaworld_envs(
|
return create_metaworld_envs(
|
||||||
task=self.task,
|
task=self.task,
|
||||||
n_envs=n_envs,
|
n_envs=n_envs,
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ def make_env_pre_post_processors(
|
|||||||
def make_env(
|
def make_env(
|
||||||
cfg: EnvConfig | str,
|
cfg: EnvConfig | str,
|
||||||
n_envs: int = 1,
|
n_envs: int = 1,
|
||||||
use_async_envs: bool = False,
|
use_async_envs: bool = True,
|
||||||
hub_cache_dir: str | None = None,
|
hub_cache_dir: str | None = None,
|
||||||
trust_remote_code: bool = False,
|
trust_remote_code: bool = False,
|
||||||
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
|
||||||
|
|||||||
+35
-17
@@ -150,7 +150,17 @@ class LiberoEnv(gym.Env):
|
|||||||
|
|
||||||
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
|
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
|
||||||
|
|
||||||
self._env = self._make_envs_task(task_suite, self.task_id)
|
# Extract task metadata without allocating GPU resources (safe before fork).
|
||||||
|
task = task_suite.get_task(task_id)
|
||||||
|
self.task = task.name
|
||||||
|
self.task_description = task.language
|
||||||
|
self._task_bddl_file = os.path.join(
|
||||||
|
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
|
||||||
|
)
|
||||||
|
self._env: OffScreenRenderEnv | None = (
|
||||||
|
None # deferred — created on first reset() inside the worker subprocess
|
||||||
|
)
|
||||||
|
|
||||||
default_steps = 500
|
default_steps = 500
|
||||||
self._max_episode_steps = (
|
self._max_episode_steps = (
|
||||||
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
|
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
|
||||||
@@ -221,28 +231,32 @@ class LiberoEnv(gym.Env):
|
|||||||
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
|
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _ensure_env(self) -> None:
|
||||||
|
"""Create the underlying OffScreenRenderEnv on first use.
|
||||||
|
|
||||||
|
Called inside the worker subprocess after fork(), so each worker gets
|
||||||
|
its own clean EGL context rather than inheriting a stale one from the
|
||||||
|
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
|
||||||
|
"""
|
||||||
|
if self._env is not None:
|
||||||
|
return
|
||||||
|
env = OffScreenRenderEnv(
|
||||||
|
bddl_file_name=self._task_bddl_file,
|
||||||
|
camera_heights=self.observation_height,
|
||||||
|
camera_widths=self.observation_width,
|
||||||
|
)
|
||||||
|
env.reset()
|
||||||
|
self._env = env
|
||||||
|
|
||||||
def render(self):
|
def render(self):
|
||||||
|
self._ensure_env()
|
||||||
raw_obs = self._env.env._get_observations()
|
raw_obs = self._env.env._get_observations()
|
||||||
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
|
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
|
||||||
image = image[::-1, ::-1] # flip both H and W for visualization
|
image = image[::-1, ::-1] # flip both H and W for visualization
|
||||||
return image
|
return image
|
||||||
|
|
||||||
def _make_envs_task(self, task_suite: Any, task_id: int = 0):
|
|
||||||
task = task_suite.get_task(task_id)
|
|
||||||
self.task = task.name
|
|
||||||
self.task_description = task.language
|
|
||||||
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)
|
|
||||||
|
|
||||||
env_args = {
|
|
||||||
"bddl_file_name": task_bddl_file,
|
|
||||||
"camera_heights": self.observation_height,
|
|
||||||
"camera_widths": self.observation_width,
|
|
||||||
}
|
|
||||||
env = OffScreenRenderEnv(**env_args)
|
|
||||||
env.reset()
|
|
||||||
return env
|
|
||||||
|
|
||||||
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
|
def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
|
||||||
|
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
|
||||||
images = {}
|
images = {}
|
||||||
for camera_name in self.camera_name:
|
for camera_name in self.camera_name:
|
||||||
image = raw_obs[camera_name]
|
image = raw_obs[camera_name]
|
||||||
@@ -294,6 +308,7 @@ class LiberoEnv(gym.Env):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def reset(self, seed=None, **kwargs):
|
def reset(self, seed=None, **kwargs):
|
||||||
|
self._ensure_env()
|
||||||
super().reset(seed=seed)
|
super().reset(seed=seed)
|
||||||
self._env.seed(seed)
|
self._env.seed(seed)
|
||||||
raw_obs = self._env.reset()
|
raw_obs = self._env.reset()
|
||||||
@@ -320,6 +335,8 @@ class LiberoEnv(gym.Env):
|
|||||||
return observation, info
|
return observation, info
|
||||||
|
|
||||||
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
|
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
|
||||||
|
self._ensure_env()
|
||||||
|
assert self._env is not None
|
||||||
if action.ndim != 1:
|
if action.ndim != 1:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||||
@@ -350,7 +367,8 @@ class LiberoEnv(gym.Env):
|
|||||||
return observation, reward, terminated, truncated, info
|
return observation, reward, terminated, truncated, info
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self._env.close()
|
if self._env is not None:
|
||||||
|
self._env.close()
|
||||||
|
|
||||||
|
|
||||||
def _make_env_fns(
|
def _make_env_fns(
|
||||||
|
|||||||
@@ -97,8 +97,9 @@ class MetaworldEnv(gym.Env):
|
|||||||
self.visualization_height = visualization_height
|
self.visualization_height = visualization_height
|
||||||
self.camera_name = camera_name
|
self.camera_name = camera_name
|
||||||
|
|
||||||
self._env = self._make_envs_task(self.task)
|
self._env_name = self.task # already stripped of "metaworld-" prefix above
|
||||||
self._max_episode_steps = self._env.max_path_length
|
self._env = None # deferred — created on first reset() inside the worker subprocess
|
||||||
|
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
|
||||||
self.task_description = TASK_DESCRIPTIONS[self.task]
|
self.task_description = TASK_DESCRIPTIONS[self.task]
|
||||||
|
|
||||||
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
|
self.expert_policy = TASK_POLICY_MAPPING[self.task]()
|
||||||
@@ -136,6 +137,24 @@ class MetaworldEnv(gym.Env):
|
|||||||
|
|
||||||
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
|
self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)
|
||||||
|
|
||||||
|
def _ensure_env(self) -> None:
|
||||||
|
"""Create the underlying MetaWorld env on first use.
|
||||||
|
|
||||||
|
Called inside the worker subprocess after fork(), so each worker gets
|
||||||
|
its own clean rendering context rather than inheriting a stale one from
|
||||||
|
the parent process (which causes crashes with AsyncVectorEnv).
|
||||||
|
"""
|
||||||
|
if self._env is not None:
|
||||||
|
return
|
||||||
|
mt1 = metaworld.MT1(self._env_name, seed=42)
|
||||||
|
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
||||||
|
env.set_task(mt1.train_tasks[0])
|
||||||
|
if self.camera_name == "corner2":
|
||||||
|
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
|
||||||
|
env.reset()
|
||||||
|
env._freeze_rand_vec = False # otherwise no randomization
|
||||||
|
self._env = env
|
||||||
|
|
||||||
def render(self) -> np.ndarray:
|
def render(self) -> np.ndarray:
|
||||||
"""
|
"""
|
||||||
Render the current environment frame.
|
Render the current environment frame.
|
||||||
@@ -143,26 +162,13 @@ class MetaworldEnv(gym.Env):
|
|||||||
Returns:
|
Returns:
|
||||||
np.ndarray: The rendered RGB image from the environment.
|
np.ndarray: The rendered RGB image from the environment.
|
||||||
"""
|
"""
|
||||||
|
self._ensure_env()
|
||||||
image = self._env.render()
|
image = self._env.render()
|
||||||
if self.camera_name == "corner2":
|
if self.camera_name == "corner2":
|
||||||
# Images from this camera are flipped — correct them
|
# Images from this camera are flipped — correct them
|
||||||
image = np.flip(image, (0, 1))
|
image = np.flip(image, (0, 1))
|
||||||
return image
|
return image
|
||||||
|
|
||||||
def _make_envs_task(self, env_name: str):
|
|
||||||
mt1 = metaworld.MT1(env_name, seed=42)
|
|
||||||
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
|
|
||||||
env.set_task(mt1.train_tasks[0])
|
|
||||||
if self.camera_name == "corner2":
|
|
||||||
env.model.cam_pos[2] = [
|
|
||||||
0.75,
|
|
||||||
0.075,
|
|
||||||
0.7,
|
|
||||||
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
|
|
||||||
env.reset()
|
|
||||||
env._freeze_rand_vec = False # otherwise no randomization
|
|
||||||
return env
|
|
||||||
|
|
||||||
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
|
def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
|
||||||
image = None
|
image = None
|
||||||
if self._env is not None:
|
if self._env is not None:
|
||||||
@@ -209,6 +215,7 @@ class MetaworldEnv(gym.Env):
|
|||||||
observation (RobotObservation): The initial formatted observation.
|
observation (RobotObservation): The initial formatted observation.
|
||||||
info (Dict[str, Any]): Additional info about the reset state.
|
info (Dict[str, Any]): Additional info about the reset state.
|
||||||
"""
|
"""
|
||||||
|
self._ensure_env()
|
||||||
super().reset(seed=seed)
|
super().reset(seed=seed)
|
||||||
|
|
||||||
raw_obs, info = self._env.reset(seed=seed)
|
raw_obs, info = self._env.reset(seed=seed)
|
||||||
@@ -232,6 +239,7 @@ class MetaworldEnv(gym.Env):
|
|||||||
truncated (bool): Whether the episode was truncated due to a time limit.
|
truncated (bool): Whether the episode was truncated due to a time limit.
|
||||||
info (Dict[str, Any]): Additional environment info.
|
info (Dict[str, Any]): Additional environment info.
|
||||||
"""
|
"""
|
||||||
|
self._ensure_env()
|
||||||
if action.ndim != 1:
|
if action.ndim != 1:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Expected action to be 1-D (shape (action_dim,)), "
|
f"Expected action to be 1-D (shape (action_dim,)), "
|
||||||
@@ -263,7 +271,8 @@ class MetaworldEnv(gym.Env):
|
|||||||
return observation, reward, terminated, truncated, info
|
return observation, reward, terminated, truncated, info
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self._env.close()
|
if self._env is not None:
|
||||||
|
self._env.close()
|
||||||
|
|
||||||
|
|
||||||
# ---- Main API ----------------------------------------------------------------
|
# ---- Main API ----------------------------------------------------------------
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig`
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import concurrent.futures as cf
|
import concurrent.futures as cf
|
||||||
|
import copy
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
@@ -56,7 +57,6 @@ from collections.abc import Callable
|
|||||||
from contextlib import nullcontext
|
from contextlib import nullcontext
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from functools import partial
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import Any, TypedDict
|
from typing import Any, TypedDict
|
||||||
@@ -73,7 +73,6 @@ from lerobot.configs import parser
|
|||||||
from lerobot.configs.eval import EvalPipelineConfig
|
from lerobot.configs.eval import EvalPipelineConfig
|
||||||
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
||||||
from lerobot.envs.utils import (
|
from lerobot.envs.utils import (
|
||||||
add_envs_task,
|
|
||||||
check_env_attributes_and_types,
|
check_env_attributes_and_types,
|
||||||
close_envs,
|
close_envs,
|
||||||
preprocess_observation,
|
preprocess_observation,
|
||||||
@@ -166,9 +165,9 @@ def rollout(
|
|||||||
if return_observations:
|
if return_observations:
|
||||||
all_observations.append(deepcopy(observation))
|
all_observations.append(deepcopy(observation))
|
||||||
|
|
||||||
# Infer "task" from attributes of environments.
|
# Infer "task" from sub-environments.
|
||||||
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
|
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
|
||||||
observation = add_envs_task(env, observation)
|
observation["task"] = env.call("task")
|
||||||
|
|
||||||
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
|
# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
|
||||||
observation = env_preprocessor(observation)
|
observation = env_preprocessor(observation)
|
||||||
@@ -734,34 +733,48 @@ def eval_policy_all(
|
|||||||
group_acc[group]["video_paths"].extend(paths)
|
group_acc[group]["video_paths"].extend(paths)
|
||||||
overall["video_paths"].extend(paths)
|
overall["video_paths"].extend(paths)
|
||||||
|
|
||||||
|
def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
|
||||||
|
"""Shallow copy sharing weight tensors, with independent per-thread state.
|
||||||
|
|
||||||
|
copy.copy() gives a new Python object whose _parameters dict is a shared
|
||||||
|
reference (same tensor storage, zero extra VRAM). reset() then rebinds
|
||||||
|
mutable state (action queues etc.) to fresh per-thread objects.
|
||||||
|
|
||||||
|
Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
|
||||||
|
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
|
||||||
|
"""
|
||||||
|
thread_p = copy.copy(p)
|
||||||
|
thread_p.reset()
|
||||||
|
return thread_p
|
||||||
|
|
||||||
# Choose runner (sequential vs threaded)
|
# Choose runner (sequential vs threaded)
|
||||||
task_runner = partial(
|
_runner_kwargs = {
|
||||||
run_one,
|
"env_preprocessor": env_preprocessor,
|
||||||
policy=policy,
|
"env_postprocessor": env_postprocessor,
|
||||||
env_preprocessor=env_preprocessor,
|
"preprocessor": preprocessor,
|
||||||
env_postprocessor=env_postprocessor,
|
"postprocessor": postprocessor,
|
||||||
preprocessor=preprocessor,
|
"n_episodes": n_episodes,
|
||||||
postprocessor=postprocessor,
|
"max_episodes_rendered": max_episodes_rendered,
|
||||||
n_episodes=n_episodes,
|
"videos_dir": videos_dir,
|
||||||
max_episodes_rendered=max_episodes_rendered,
|
"return_episode_data": return_episode_data,
|
||||||
videos_dir=videos_dir,
|
"start_seed": start_seed,
|
||||||
return_episode_data=return_episode_data,
|
}
|
||||||
start_seed=start_seed,
|
|
||||||
)
|
|
||||||
|
|
||||||
if max_parallel_tasks <= 1:
|
if max_parallel_tasks <= 1:
|
||||||
# sequential path (single accumulator path on the main thread)
|
# sequential path (single accumulator path on the main thread)
|
||||||
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
|
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
|
||||||
for task_group, task_id, env in tasks:
|
for task_group, task_id, env in tasks:
|
||||||
tg, tid, metrics = task_runner(task_group, task_id, env)
|
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
|
||||||
_accumulate_to(tg, metrics)
|
_accumulate_to(tg, metrics)
|
||||||
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
|
||||||
else:
|
else:
|
||||||
# threaded path: submit all tasks, consume completions on main thread and accumulate there
|
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
|
||||||
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
|
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
|
||||||
fut2meta = {}
|
fut2meta = {}
|
||||||
for task_group, task_id, env in tasks:
|
for task_group, task_id, env in tasks:
|
||||||
fut = executor.submit(task_runner, task_group, task_id, env)
|
fut = executor.submit(
|
||||||
|
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
|
||||||
|
)
|
||||||
fut2meta[fut] = (task_group, task_id)
|
fut2meta[fut] = (task_group, task_id)
|
||||||
for fut in cf.as_completed(fut2meta):
|
for fut in cf.as_completed(fut2meta):
|
||||||
tg, tid, metrics = fut.result()
|
tg, tid, metrics = fut.result()
|
||||||
|
|||||||
Reference in New Issue
Block a user