mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-17 09:39:47 +00:00
Add tests for modeling_rtc
This commit is contained in:
@@ -390,11 +390,11 @@ def test_denoise_step_without_prev_chunk(rtc_processor_debug_disabled):
|
||||
|
||||
def test_denoise_step_with_prev_chunk(rtc_processor_debug_disabled):
|
||||
"""Test denoise_step with previous chunk applies guidance."""
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 50, 6)
|
||||
x_t = torch.ones(1, 20, 1)
|
||||
prev_chunk = torch.full((1, 20, 1), 0.1)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
return x * 0.5
|
||||
|
||||
result = rtc_processor_debug_disabled.denoise_step(
|
||||
x_t=x_t,
|
||||
@@ -404,12 +404,34 @@ def test_denoise_step_with_prev_chunk(rtc_processor_debug_disabled):
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
# Result should be different from base v_t (guidance applied)
|
||||
base_v_t = mock_denoiser(x_t)
|
||||
assert not torch.allclose(result, base_v_t)
|
||||
expected_result = torch.tensor(
|
||||
[
|
||||
[
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.5833],
|
||||
[1.3667],
|
||||
[1.1500],
|
||||
[0.9333],
|
||||
[0.7167],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
# Result should have same shape
|
||||
assert result.shape == x_t.shape
|
||||
assert torch.allclose(result, expected_result, atol=1e-4)
|
||||
|
||||
|
||||
def test_denoise_step_adds_batch_dimension():
|
||||
@@ -418,11 +440,11 @@ def test_denoise_step_adds_batch_dimension():
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
# 2D input (no batch dimension)
|
||||
x_t = torch.randn(50, 6)
|
||||
prev_chunk = torch.randn(50, 6)
|
||||
x_t = torch.randn(10, 6)
|
||||
prev_chunk = torch.randn(5, 6)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
return x * 0.5
|
||||
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
@@ -434,53 +456,7 @@ def test_denoise_step_adds_batch_dimension():
|
||||
|
||||
# Output should be 2D (batch dimension removed)
|
||||
assert result.ndim == 2
|
||||
assert result.shape == (50, 6)
|
||||
|
||||
|
||||
def test_denoise_step_pads_shorter_prev_chunk():
|
||||
"""Test denoise_step pads previous chunk if shorter than x_t."""
|
||||
config = RTCConfig(execution_horizon=10, max_guidance_weight=5.0)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 30, 6) # Shorter than x_t
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=torch.tensor(0.5),
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
# Should complete successfully (padding happens internally)
|
||||
assert result.shape == x_t.shape
|
||||
|
||||
|
||||
def test_denoise_step_pads_fewer_action_dims():
|
||||
"""Test denoise_step pads if prev_chunk has fewer action dimensions."""
|
||||
config = RTCConfig(execution_horizon=10, max_guidance_weight=5.0)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 50, 4) # Fewer action dims
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=torch.tensor(0.5),
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
# Should complete successfully (padding happens internally)
|
||||
assert result.shape == x_t.shape
|
||||
assert result.shape == (10, 6)
|
||||
|
||||
|
||||
def test_denoise_step_uses_custom_execution_horizon():
|
||||
@@ -488,73 +464,49 @@ def test_denoise_step_uses_custom_execution_horizon():
|
||||
config = RTCConfig(execution_horizon=10)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 50, 6)
|
||||
x_t = torch.ones(1, 20, 1)
|
||||
prev_chunk = torch.full((1, 15, 1), 0.1)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
return x * 0.5
|
||||
|
||||
# Use custom execution_horizon
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=torch.tensor(0.5),
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
execution_horizon=20, # Override config value
|
||||
execution_horizon=15,
|
||||
)
|
||||
|
||||
assert result.shape == x_t.shape
|
||||
|
||||
|
||||
def test_denoise_step_clamps_execution_horizon_to_prev_chunk_length():
|
||||
"""Test denoise_step clamps execution_horizon if prev_chunk is shorter."""
|
||||
config = RTCConfig(execution_horizon=100) # Very large
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 20, 6) # Only 20 timesteps
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
|
||||
# Should clamp execution_horizon to 20 internally
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=torch.tensor(0.5),
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
expected_result = torch.tensor(
|
||||
[
|
||||
[
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.8000],
|
||||
[1.6818],
|
||||
[1.5636],
|
||||
[1.4455],
|
||||
[1.3273],
|
||||
[1.2091],
|
||||
[1.0909],
|
||||
[0.9727],
|
||||
[0.8545],
|
||||
[0.7364],
|
||||
[0.6182],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
assert result.shape == x_t.shape
|
||||
|
||||
|
||||
def test_denoise_step_guidance_weight_calculation():
|
||||
"""Test denoise_step calculates guidance weight correctly."""
|
||||
config = RTCConfig(max_guidance_weight=10.0)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 50, 6)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
|
||||
# Time = 0.5 => tau = 1 - 0.5 = 0.5
|
||||
time = 0.5
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=time,
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
# Should produce valid output
|
||||
assert result.shape == x_t.shape
|
||||
assert not torch.any(torch.isnan(result))
|
||||
assert not torch.any(torch.isinf(result))
|
||||
assert torch.allclose(result, expected_result, atol=1e-4)
|
||||
|
||||
|
||||
def test_denoise_step_guidance_weight_at_time_zero():
|
||||
@@ -562,13 +514,12 @@ def test_denoise_step_guidance_weight_at_time_zero():
|
||||
config = RTCConfig(max_guidance_weight=10.0)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
x_t = torch.randn(1, 50, 6)
|
||||
prev_chunk = torch.randn(1, 50, 6)
|
||||
x_t = torch.ones(1, 20, 1)
|
||||
prev_chunk = torch.full((1, 20, 1), 0.1)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return torch.ones_like(x) * 0.5
|
||||
return x * 0.5
|
||||
|
||||
# Time = 0 => tau = 1, c = (1-tau)/tau = 0/1 = 0
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
@@ -577,9 +528,68 @@ def test_denoise_step_guidance_weight_at_time_zero():
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
# Should handle gracefully (no NaN/Inf)
|
||||
assert not torch.any(torch.isnan(result))
|
||||
assert not torch.any(torch.isinf(result))
|
||||
expected_result = torch.tensor(
|
||||
[
|
||||
[
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
[0.5000],
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
assert torch.allclose(result, expected_result, atol=1e-4)
|
||||
|
||||
|
||||
def test_denoise_step_with_real_denoise_step_partial():
|
||||
"""Test denoise_step with a real denoiser."""
|
||||
config = RTCConfig(max_guidance_weight=10.0)
|
||||
processor = RTCProcessor(config)
|
||||
|
||||
batch_size = 10
|
||||
action_dim = 6
|
||||
chunk_size = 20
|
||||
|
||||
x_t = torch.ones(batch_size, chunk_size, action_dim)
|
||||
prev_chunk = torch.full((batch_size, chunk_size, action_dim), 0.1)
|
||||
|
||||
velocity_function = torch.nn.Sequential(
|
||||
torch.nn.Linear(action_dim, 1000),
|
||||
torch.nn.ReLU(),
|
||||
torch.nn.Linear(1000, 256),
|
||||
torch.nn.ReLU(),
|
||||
torch.nn.Linear(256, action_dim),
|
||||
)
|
||||
|
||||
def mock_denoiser(x):
|
||||
return velocity_function(x)
|
||||
|
||||
result = processor.denoise_step(
|
||||
x_t=x_t,
|
||||
prev_chunk_left_over=prev_chunk,
|
||||
inference_delay=5,
|
||||
time=torch.tensor(0.5),
|
||||
original_denoise_step_partial=mock_denoiser,
|
||||
)
|
||||
|
||||
assert result.shape == (batch_size, chunk_size, action_dim)
|
||||
|
||||
|
||||
def test_denoise_step_guidance_weight_at_time_one():
|
||||
|
||||
Reference in New Issue
Block a user