mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-20 11:09:59 +00:00
Feat/remote rerunviz encoded images (#2767)
* feat(visualization): allow remote viewer + compress rerun images * fix(tests): allow named argument in mocked rerun * feat(visualization): ip instead or url & cli arg for compressing images --------- Co-authored-by: J4nn1K <jannik@grothusen.de>
This commit is contained in:
@@ -96,6 +96,7 @@ def visualize_dataset(
|
|||||||
ws_port: int = 9087,
|
ws_port: int = 9087,
|
||||||
save: bool = False,
|
save: bool = False,
|
||||||
output_dir: Path | None = None,
|
output_dir: Path | None = None,
|
||||||
|
display_compressed_images: bool = False,
|
||||||
) -> Path | None:
|
) -> Path | None:
|
||||||
if save:
|
if save:
|
||||||
assert output_dir is not None, (
|
assert output_dir is not None, (
|
||||||
@@ -137,8 +138,9 @@ def visualize_dataset(
|
|||||||
|
|
||||||
# display each camera image
|
# display each camera image
|
||||||
for key in dataset.meta.camera_keys:
|
for key in dataset.meta.camera_keys:
|
||||||
# TODO(rcadene): add `.compress()`? is it lossless?
|
img = to_hwc_uint8_numpy(batch[key][i])
|
||||||
rr.log(key, rr.Image(to_hwc_uint8_numpy(batch[key][i])))
|
img_entity = rr.Image(img).compress() if display_compressed_images else rr.Image(img)
|
||||||
|
rr.log(key, entity=img_entity)
|
||||||
|
|
||||||
# display each dimension of action space (e.g. actuators command)
|
# display each dimension of action space (e.g. actuators command)
|
||||||
if ACTION in batch:
|
if ACTION in batch:
|
||||||
@@ -261,6 +263,14 @@ def main():
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--display-compressed-images",
|
||||||
|
type=bool,
|
||||||
|
required=True,
|
||||||
|
default=False,
|
||||||
|
help="If set, display compressed images in Rerun instead of uncompressed ones.",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
kwargs = vars(args)
|
kwargs = vars(args)
|
||||||
repo_id = kwargs.pop("repo_id")
|
repo_id = kwargs.pop("repo_id")
|
||||||
|
|||||||
@@ -185,6 +185,12 @@ class RecordConfig:
|
|||||||
policy: PreTrainedConfig | None = None
|
policy: PreTrainedConfig | None = None
|
||||||
# Display all cameras on screen
|
# Display all cameras on screen
|
||||||
display_data: bool = False
|
display_data: bool = False
|
||||||
|
# Display data on a remote Rerun server
|
||||||
|
display_ip: str | None = None
|
||||||
|
# Port of the remote Rerun server
|
||||||
|
display_port: int | None = None
|
||||||
|
# Whether to display compressed images in Rerun
|
||||||
|
display_compressed_images: bool = False
|
||||||
# Use vocal synthesis to read events.
|
# Use vocal synthesis to read events.
|
||||||
play_sounds: bool = True
|
play_sounds: bool = True
|
||||||
# Resume recording on an existing dataset.
|
# Resume recording on an existing dataset.
|
||||||
@@ -261,6 +267,7 @@ def record_loop(
|
|||||||
control_time_s: int | None = None,
|
control_time_s: int | None = None,
|
||||||
single_task: str | None = None,
|
single_task: str | None = None,
|
||||||
display_data: bool = False,
|
display_data: bool = False,
|
||||||
|
display_compressed_images: bool = False,
|
||||||
):
|
):
|
||||||
if dataset is not None and dataset.fps != fps:
|
if dataset is not None and dataset.fps != fps:
|
||||||
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
|
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
|
||||||
@@ -371,7 +378,9 @@ def record_loop(
|
|||||||
dataset.add_frame(frame)
|
dataset.add_frame(frame)
|
||||||
|
|
||||||
if display_data:
|
if display_data:
|
||||||
log_rerun_data(observation=obs_processed, action=action_values)
|
log_rerun_data(
|
||||||
|
observation=obs_processed, action=action_values, compress_images=display_compressed_images
|
||||||
|
)
|
||||||
|
|
||||||
dt_s = time.perf_counter() - start_loop_t
|
dt_s = time.perf_counter() - start_loop_t
|
||||||
precise_sleep(max(1 / fps - dt_s, 0.0))
|
precise_sleep(max(1 / fps - dt_s, 0.0))
|
||||||
@@ -384,7 +393,12 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
|||||||
init_logging()
|
init_logging()
|
||||||
logging.info(pformat(asdict(cfg)))
|
logging.info(pformat(asdict(cfg)))
|
||||||
if cfg.display_data:
|
if cfg.display_data:
|
||||||
init_rerun(session_name="recording")
|
init_rerun(session_name="recording", ip=cfg.display_ip, port=cfg.display_port)
|
||||||
|
display_compressed_images = (
|
||||||
|
True
|
||||||
|
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
|
||||||
|
else cfg.display_compressed_images
|
||||||
|
)
|
||||||
|
|
||||||
robot = make_robot_from_config(cfg.robot)
|
robot = make_robot_from_config(cfg.robot)
|
||||||
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
|
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
|
||||||
@@ -478,6 +492,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
|
|||||||
control_time_s=cfg.dataset.episode_time_s,
|
control_time_s=cfg.dataset.episode_time_s,
|
||||||
single_task=cfg.dataset.single_task,
|
single_task=cfg.dataset.single_task,
|
||||||
display_data=cfg.display_data,
|
display_data=cfg.display_data,
|
||||||
|
display_compressed_images=display_compressed_images,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Execute a few seconds without recording to give time to manually reset the environment
|
# Execute a few seconds without recording to give time to manually reset the environment
|
||||||
|
|||||||
@@ -108,6 +108,12 @@ class TeleoperateConfig:
|
|||||||
teleop_time_s: float | None = None
|
teleop_time_s: float | None = None
|
||||||
# Display all cameras on screen
|
# Display all cameras on screen
|
||||||
display_data: bool = False
|
display_data: bool = False
|
||||||
|
# Display data on a remote Rerun server
|
||||||
|
display_ip: str | None = None
|
||||||
|
# Port of the remote Rerun server
|
||||||
|
display_port: int | None = None
|
||||||
|
# Whether to display compressed images in Rerun
|
||||||
|
display_compressed_images: bool = False
|
||||||
|
|
||||||
|
|
||||||
def teleop_loop(
|
def teleop_loop(
|
||||||
@@ -119,6 +125,7 @@ def teleop_loop(
|
|||||||
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
|
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
|
||||||
display_data: bool = False,
|
display_data: bool = False,
|
||||||
duration: float | None = None,
|
duration: float | None = None,
|
||||||
|
display_compressed_images: bool = False,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
This function continuously reads actions from a teleoperation device, processes them through optional
|
This function continuously reads actions from a teleoperation device, processes them through optional
|
||||||
@@ -130,6 +137,7 @@ def teleop_loop(
|
|||||||
robot: The robot instance being controlled.
|
robot: The robot instance being controlled.
|
||||||
fps: The target frequency for the control loop in frames per second.
|
fps: The target frequency for the control loop in frames per second.
|
||||||
display_data: If True, fetches robot observations and displays them in the console and Rerun.
|
display_data: If True, fetches robot observations and displays them in the console and Rerun.
|
||||||
|
display_compressed_images: If True, compresses images before sending them to Rerun for display.
|
||||||
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
|
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
|
||||||
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
|
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
|
||||||
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
|
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
|
||||||
@@ -167,6 +175,7 @@ def teleop_loop(
|
|||||||
log_rerun_data(
|
log_rerun_data(
|
||||||
observation=obs_transition,
|
observation=obs_transition,
|
||||||
action=teleop_action,
|
action=teleop_action,
|
||||||
|
compress_images=display_compressed_images,
|
||||||
)
|
)
|
||||||
|
|
||||||
print("\n" + "-" * (display_len + 10))
|
print("\n" + "-" * (display_len + 10))
|
||||||
@@ -191,7 +200,12 @@ def teleoperate(cfg: TeleoperateConfig):
|
|||||||
init_logging()
|
init_logging()
|
||||||
logging.info(pformat(asdict(cfg)))
|
logging.info(pformat(asdict(cfg)))
|
||||||
if cfg.display_data:
|
if cfg.display_data:
|
||||||
init_rerun(session_name="teleoperation")
|
init_rerun(session_name="teleoperation", ip=cfg.display_ip, port=cfg.display_port)
|
||||||
|
display_compressed_images = (
|
||||||
|
True
|
||||||
|
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
|
||||||
|
else cfg.display_compressed_images
|
||||||
|
)
|
||||||
|
|
||||||
teleop = make_teleoperator_from_config(cfg.teleop)
|
teleop = make_teleoperator_from_config(cfg.teleop)
|
||||||
robot = make_robot_from_config(cfg.robot)
|
robot = make_robot_from_config(cfg.robot)
|
||||||
@@ -210,6 +224,7 @@ def teleoperate(cfg: TeleoperateConfig):
|
|||||||
teleop_action_processor=teleop_action_processor,
|
teleop_action_processor=teleop_action_processor,
|
||||||
robot_action_processor=robot_action_processor,
|
robot_action_processor=robot_action_processor,
|
||||||
robot_observation_processor=robot_observation_processor,
|
robot_observation_processor=robot_observation_processor,
|
||||||
|
display_compressed_images=display_compressed_images,
|
||||||
)
|
)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -22,13 +22,25 @@ import rerun as rr
|
|||||||
from .constants import OBS_PREFIX, OBS_STR
|
from .constants import OBS_PREFIX, OBS_STR
|
||||||
|
|
||||||
|
|
||||||
def init_rerun(session_name: str = "lerobot_control_loop") -> None:
|
def init_rerun(
|
||||||
"""Initializes the Rerun SDK for visualizing the control loop."""
|
session_name: str = "lerobot_control_loop", ip: str | None = None, port: int | None = None
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Initializes the Rerun SDK for visualizing the control loop.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_name: Name of the Rerun session.
|
||||||
|
ip: Optional IP for connecting to a Rerun server.
|
||||||
|
port: Optional port for connecting to a Rerun server.
|
||||||
|
"""
|
||||||
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
|
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
|
||||||
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
|
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
|
||||||
rr.init(session_name)
|
rr.init(session_name)
|
||||||
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
|
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
|
||||||
rr.spawn(memory_limit=memory_limit)
|
if ip and port:
|
||||||
|
rr.connect_grpc(url=f"rerun+http://{ip}:{port}/proxy")
|
||||||
|
else:
|
||||||
|
rr.spawn(memory_limit=memory_limit)
|
||||||
|
|
||||||
|
|
||||||
def _is_scalar(x):
|
def _is_scalar(x):
|
||||||
@@ -40,6 +52,7 @@ def _is_scalar(x):
|
|||||||
def log_rerun_data(
|
def log_rerun_data(
|
||||||
observation: dict[str, Any] | None = None,
|
observation: dict[str, Any] | None = None,
|
||||||
action: dict[str, Any] | None = None,
|
action: dict[str, Any] | None = None,
|
||||||
|
compress_images: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Logs observation and action data to Rerun for real-time visualization.
|
Logs observation and action data to Rerun for real-time visualization.
|
||||||
@@ -48,7 +61,7 @@ def log_rerun_data(
|
|||||||
to the Rerun viewer. It handles different data types appropriately:
|
to the Rerun viewer. It handles different data types appropriately:
|
||||||
- Scalars values (floats, ints) are logged as `rr.Scalars`.
|
- Scalars values (floats, ints) are logged as `rr.Scalars`.
|
||||||
- 3D NumPy arrays that resemble images (e.g., with 1, 3, or 4 channels first) are transposed
|
- 3D NumPy arrays that resemble images (e.g., with 1, 3, or 4 channels first) are transposed
|
||||||
from CHW to HWC format and logged as `rr.Image`.
|
from CHW to HWC format, (optionally) compressed to JPEG and logged as `rr.Image` or `rr.EncodedImage`.
|
||||||
- 1D NumPy arrays are logged as a series of individual scalars, with each element indexed.
|
- 1D NumPy arrays are logged as a series of individual scalars, with each element indexed.
|
||||||
- Other multi-dimensional arrays are flattened and logged as individual scalars.
|
- Other multi-dimensional arrays are flattened and logged as individual scalars.
|
||||||
|
|
||||||
@@ -57,6 +70,7 @@ def log_rerun_data(
|
|||||||
Args:
|
Args:
|
||||||
observation: An optional dictionary containing observation data to log.
|
observation: An optional dictionary containing observation data to log.
|
||||||
action: An optional dictionary containing action data to log.
|
action: An optional dictionary containing action data to log.
|
||||||
|
compress_images: Whether to compress images before logging to save bandwidth & memory in exchange for cpu and quality.
|
||||||
"""
|
"""
|
||||||
if observation:
|
if observation:
|
||||||
for k, v in observation.items():
|
for k, v in observation.items():
|
||||||
@@ -75,7 +89,8 @@ def log_rerun_data(
|
|||||||
for i, vi in enumerate(arr):
|
for i, vi in enumerate(arr):
|
||||||
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
|
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
|
||||||
else:
|
else:
|
||||||
rr.log(key, rr.Image(arr), static=True)
|
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
|
||||||
|
rr.log(key, entity=img_entity, static=True)
|
||||||
|
|
||||||
if action:
|
if action:
|
||||||
for k, v in action.items():
|
for k, v in action.items():
|
||||||
|
|||||||
@@ -41,7 +41,10 @@ def mock_rerun(monkeypatch):
|
|||||||
def __init__(self, arr):
|
def __init__(self, arr):
|
||||||
self.arr = arr
|
self.arr = arr
|
||||||
|
|
||||||
def dummy_log(key, obj, **kwargs):
|
def dummy_log(key, obj=None, **kwargs):
|
||||||
|
# Accept either positional `obj` or keyword `entity` and record remaining kwargs.
|
||||||
|
if obj is None and "entity" in kwargs:
|
||||||
|
obj = kwargs.pop("entity")
|
||||||
calls.append((key, obj, kwargs))
|
calls.append((key, obj, kwargs))
|
||||||
|
|
||||||
dummy_rr = SimpleNamespace(
|
dummy_rr = SimpleNamespace(
|
||||||
|
|||||||
Reference in New Issue
Block a user