#!/usr/bin/env python # Copyright 2025 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Tests for SARM utility functions. Tests the implementation of SARM paper formulas: - Formula (1): compute_temporal_proportions - dataset-level temporal proportions - Formula (2): compute_tau, compute_cumulative_progress - progress labels """ import pytest import numpy as np import torch from lerobot.policies.sarm.sarm_utils import SubtaskAnnotation, Subtask, Timestamp from lerobot.policies.sarm.sarm_utils import ( compute_temporal_proportions, compute_tau, compute_cumulative_progress_batch, ) def make_annotation(subtasks: list[tuple[str, int, int]]) -> SubtaskAnnotation: """Helper to create SubtaskAnnotation from list of (name, start_sec, end_sec).""" return SubtaskAnnotation( subtasks=[ Subtask( name=name, timestamps=Timestamp( start=f"{start // 60:02d}:{start % 60:02d}", end=f"{end // 60:02d}:{end % 60:02d}" ) ) for name, start, end in subtasks ] ) class TestComputeTemporalProportions: """Tests for compute_temporal_proportions (SARM Paper Formula 1). Formula: ᾱ_k = (1/M) × Σ_i (L_{i,k} / T_i) Key insight: This averages the PROPORTION of each subtask within each trajectory, giving equal weight to all trajectories regardless of absolute length. """ def test_basic_two_trajectories_equal_proportions(self): """Test with two trajectories that have equal proportions.""" # Both trajectories: subtask1 = 50%, subtask2 = 50% # Traj 1: T=100s, subtask1=50s, subtask2=50s # Traj 2: T=200s, subtask1=100s, subtask2=100s annotations = { 0: make_annotation([('subtask1', 0, 50), ('subtask2', 50, 100)]), 1: make_annotation([('subtask1', 0, 100), ('subtask2', 100, 200)]), } result = compute_temporal_proportions(annotations) # Both should be 0.5 assert abs(result['subtask1'] - 0.5) < 1e-6 assert abs(result['subtask2'] - 0.5) < 1e-6 def test_paper_example_different_from_avg_durations(self): """Test that compute_temporal_proportions differs from naive average duration approach. This is the key test showing the difference between: - Paper formula: average of (L_i,k / T_i) - Naive approach: mean(L_i,k) / sum(mean(L_i,j)) """ # Episode 1: T=100s, subtask1=80s, subtask2=20s (proportions: 0.8, 0.2) # Episode 2: T=200s, subtask1=40s, subtask2=160s (proportions: 0.2, 0.8) annotations = { 0: make_annotation([('subtask1', 0, 80), ('subtask2', 80, 100)]), 1: make_annotation([('subtask1', 0, 40), ('subtask2', 40, 200)]), } result = compute_temporal_proportions(annotations) # Paper formula: # ᾱ_1 = (1/2) × (80/100 + 40/200) = (1/2) × (0.8 + 0.2) = 0.5 # ᾱ_2 = (1/2) × (20/100 + 160/200) = (1/2) × (0.2 + 0.8) = 0.5 assert abs(result['subtask1'] - 0.5) < 1e-6 assert abs(result['subtask2'] - 0.5) < 1e-6 def test_single_trajectory(self): """Test with a single trajectory.""" # T=100s, reach=30s, grasp=20s, lift=50s annotations = { 0: make_annotation([('reach', 0, 30), ('grasp', 30, 50), ('lift', 50, 100)]), } result = compute_temporal_proportions(annotations) assert abs(result['reach'] - 0.3) < 1e-6 assert abs(result['grasp'] - 0.2) < 1e-6 assert abs(result['lift'] - 0.5) < 1e-6 def test_sum_to_one(self): """Test that proportions always sum to 1.""" # Three episodes with varying proportions annotations = { 0: make_annotation([('a', 0, 10), ('b', 10, 50), ('c', 50, 100)]), # 0.1, 0.4, 0.5 1: make_annotation([('a', 0, 20), ('b', 20, 70), ('c', 70, 100)]), # 0.2, 0.5, 0.3 2: make_annotation([('a', 0, 30), ('b', 30, 90), ('c', 90, 100)]), # 0.3, 0.6, 0.1 } result = compute_temporal_proportions(annotations) total = sum(result.values()) assert abs(total - 1.0) < 1e-6 def test_empty_annotations_returns_empty(self): """Test that empty annotations returns empty dict.""" result = compute_temporal_proportions({}) assert result == {} def test_uniform_proportions(self): """Test with uniform proportions across subtasks.""" # Each subtask takes 25% of each episode annotations = { 0: make_annotation([('a', 0, 25), ('b', 25, 50), ('c', 50, 75), ('d', 75, 100)]), 1: make_annotation([('a', 0, 50), ('b', 50, 100), ('c', 100, 150), ('d', 150, 200)]), } result = compute_temporal_proportions(annotations) for name in ['a', 'b', 'c', 'd']: assert abs(result[name] - 0.25) < 1e-6 class TestComputeTau: """Tests for compute_tau (within-subtask progress). Formula: τ_t = (t - s_k) / (e_k - s_k) ∈ [0, 1] """ def test_at_start(self): """τ should be 0 at subtask start.""" tau = compute_tau(current_frame=10, subtask_start=10, subtask_end=50) assert tau == 0.0 def test_at_end(self): """τ should be 1 at subtask end.""" tau = compute_tau(current_frame=50, subtask_start=10, subtask_end=50) assert tau == 1.0 def test_at_middle(self): """τ should be 0.5 at subtask midpoint.""" tau = compute_tau(current_frame=30, subtask_start=10, subtask_end=50) assert abs(tau - 0.5) < 1e-6 def test_quarter_progress(self): """Test τ at 25% through subtask.""" tau = compute_tau(current_frame=20, subtask_start=0, subtask_end=80) assert abs(tau - 0.25) < 1e-6 def test_zero_duration_subtask(self): """τ should be 1.0 for zero-duration subtask.""" tau = compute_tau(current_frame=10, subtask_start=10, subtask_end=10) assert tau == 1.0 def test_clamps_below_zero(self): """τ should be clamped to 0 if frame is before subtask.""" tau = compute_tau(current_frame=5, subtask_start=10, subtask_end=50) assert tau == 0.0 def test_clamps_above_one(self): """τ should be clamped to 1 if frame is after subtask.""" tau = compute_tau(current_frame=60, subtask_start=10, subtask_end=50) assert tau == 1.0 def test_float_inputs(self): """Test with float frame indices (from interpolation).""" tau = compute_tau(current_frame=25.5, subtask_start=10.0, subtask_end=50.0) expected = (25.5 - 10.0) / (50.0 - 10.0) assert abs(tau - expected) < 1e-6 class TestComputeCumulativeProgressBatchScalar: """Tests for compute_cumulative_progress_batch with scalar inputs (normalized progress y_t). Formula: y_t = P_{k-1} + ᾱ_k × τ_t ∈ [0, 1] """ def test_first_subtask_start(self): """y should be 0 at start of first subtask.""" proportions = [0.3, 0.5, 0.2] y = compute_cumulative_progress_batch(tau=0.0, stage_indices=0, alpha=proportions) assert y == 0.0 def test_first_subtask_end(self): """y should equal ᾱ_1 at end of first subtask.""" proportions = [0.3, 0.5, 0.2] y = compute_cumulative_progress_batch(tau=1.0, stage_indices=0, alpha=proportions) assert abs(y - 0.3) < 1e-6 def test_second_subtask_start(self): """y should equal P_1 at start of second subtask.""" proportions = [0.3, 0.5, 0.2] y = compute_cumulative_progress_batch(tau=0.0, stage_indices=1, alpha=proportions) assert abs(y - 0.3) < 1e-6 def test_second_subtask_end(self): """y should equal P_2 at end of second subtask.""" proportions = [0.3, 0.5, 0.2] y = compute_cumulative_progress_batch(tau=1.0, stage_indices=1, alpha=proportions) assert abs(y - 0.8) < 1e-6 # 0.3 + 0.5 def test_third_subtask_end(self): """y should be 1.0 at end of last subtask.""" proportions = [0.3, 0.5, 0.2] y = compute_cumulative_progress_batch(tau=1.0, stage_indices=2, alpha=proportions) assert abs(y - 1.0) < 1e-6 def test_midpoint_of_subtask(self): """Test progress at midpoint of a subtask.""" proportions = [0.4, 0.6] # At τ=0.5 in subtask 1: y = P_0 + ᾱ_1 × 0.5 = 0 + 0.4 × 0.5 = 0.2 y = compute_cumulative_progress_batch(tau=0.5, stage_indices=0, alpha=proportions) assert abs(y - 0.2) < 1e-6 # At τ=0.5 in subtask 2: y = P_1 + ᾱ_2 × 0.5 = 0.4 + 0.6 × 0.5 = 0.7 y = compute_cumulative_progress_batch(tau=0.5, stage_indices=1, alpha=proportions) assert abs(y - 0.7) < 1e-6 def test_uniform_proportions(self): """Test with uniform proportions.""" proportions = [0.25, 0.25, 0.25, 0.25] # At end of each subtask, progress should be 0.25, 0.5, 0.75, 1.0 for i in range(4): y = compute_cumulative_progress_batch(tau=1.0, stage_indices=i, alpha=proportions) expected = (i + 1) * 0.25 assert abs(y - expected) < 1e-6 class TestComputeCumulativeProgressBatchTensor: """Tests for compute_cumulative_progress_batch with tensor inputs (GPU batch version).""" def test_tensor_matches_scalar_version(self): """Test that tensor version matches scalar version.""" proportions = [0.3, 0.5, 0.2] alpha = torch.tensor(proportions, dtype=torch.float32) cumulative = torch.zeros(len(proportions) + 1, dtype=torch.float32) cumulative[1:] = torch.cumsum(alpha, dim=0) test_cases = [ (0.0, 0), # start of subtask 0 (1.0, 0), # end of subtask 0 (0.0, 1), # start of subtask 1 (0.5, 1), # middle of subtask 1 (1.0, 2), # end of subtask 2 ] for tau_val, stage_idx in test_cases: # Scalar version expected = compute_cumulative_progress_batch(tau_val, stage_idx, proportions) # Tensor version (single element) tau = torch.tensor([[[tau_val]]]) # (1, 1, 1) stages = torch.tensor([[stage_idx]]) # (1, 1) result = compute_cumulative_progress_batch(tau, stages, alpha, cumulative) assert abs(result[0, 0, 0].item() - expected) < 1e-6 def test_batch_processing(self): """Test batch processing with multiple samples.""" proportions = [0.4, 0.6] alpha = torch.tensor(proportions, dtype=torch.float32) cumulative = torch.zeros(3, dtype=torch.float32) cumulative[1:] = torch.cumsum(alpha, dim=0) # Batch of 2 samples, sequence length 3 tau = torch.tensor([ [[0.0], [0.5], [1.0]], # sample 1 [[0.0], [0.5], [1.0]], # sample 2 ]) stages = torch.tensor([ [0, 0, 0], # sample 1: all in subtask 0 [1, 1, 1], # sample 2: all in subtask 1 ]) result = compute_cumulative_progress_batch(tau, stages, alpha, cumulative) # Sample 1: subtask 0 with tau 0, 0.5, 1.0 -> y = 0, 0.2, 0.4 assert abs(result[0, 0, 0].item() - 0.0) < 1e-6 assert abs(result[0, 1, 0].item() - 0.2) < 1e-6 assert abs(result[0, 2, 0].item() - 0.4) < 1e-6 # Sample 2: subtask 1 with tau 0, 0.5, 1.0 -> y = 0.4, 0.7, 1.0 assert abs(result[1, 0, 0].item() - 0.4) < 1e-6 assert abs(result[1, 1, 0].item() - 0.7) < 1e-6 assert abs(result[1, 2, 0].item() - 1.0) < 1e-6 def test_auto_compute_cumulative_prior(self): """Test that cumulative_prior is auto-computed when not provided.""" proportions = [0.3, 0.5, 0.2] alpha = torch.tensor(proportions, dtype=torch.float32) tau = torch.tensor([[[0.5]]]) stages = torch.tensor([[1]]) # Without cumulative_prior (should auto-compute) result = compute_cumulative_progress_batch(tau, stages, alpha) # Expected: P_0 + alpha_1 * 0.5 = 0.3 + 0.5 * 0.5 = 0.55 assert abs(result[0, 0, 0].item() - 0.55) < 1e-6 class TestEndToEndProgressLabeling: """End-to-end tests for progress label computation.""" def test_consistent_semantic_meaning(self): """Test that same subtask completion maps to same progress across trajectories. This is the key semantic property: "end of subtask 1" should always mean the same progress value regardless of trajectory speed. """ proportions = [0.3, 0.5, 0.2] # Fast trajectory: subtask 1 ends at frame 30 (of 100) tau_fast = compute_tau(30, 0, 30) # = 1.0 y_fast = compute_cumulative_progress_batch(tau_fast, 0, proportions) # Slow trajectory: subtask 1 ends at frame 90 (of 300) tau_slow = compute_tau(90, 0, 90) # = 1.0 y_slow = compute_cumulative_progress_batch(tau_slow, 0, proportions) # Both should map to same progress (0.3 = end of subtask 1) assert abs(y_fast - y_slow) < 1e-6 assert abs(y_fast - 0.3) < 1e-6 def test_monotonic_within_subtask(self): """Test that progress is monotonically increasing within a subtask.""" proportions = [0.4, 0.6] prev_y = -1 for tau in np.linspace(0, 1, 11): y = compute_cumulative_progress_batch(tau, 0, proportions) assert y > prev_y or (tau == 0 and y == 0) prev_y = y def test_continuous_across_subtasks(self): """Test that progress is continuous at subtask boundaries.""" proportions = [0.3, 0.5, 0.2] # End of subtask 0 (tau=1.0) y_end_0 = compute_cumulative_progress_batch(1.0, 0, proportions) # Start of subtask 1 (tau=0.0) y_start_1 = compute_cumulative_progress_batch(0.0, 1, proportions) # Should be equal (P_1 = 0.3) assert abs(y_end_0 - y_start_1) < 1e-6 # End of subtask 1 y_end_1 = compute_cumulative_progress_batch(1.0, 1, proportions) # Start of subtask 2 y_start_2 = compute_cumulative_progress_batch(0.0, 2, proportions) # Should be equal (P_2 = 0.8) assert abs(y_end_1 - y_start_2) < 1e-6