#!/usr/bin/env python # Copyright 2025 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for RTC LatencyTracker module.""" import pytest from lerobot.policies.rtc.latency_tracker import LatencyTracker # ====================== Fixtures ====================== @pytest.fixture def tracker(): """Create a LatencyTracker with default maxlen.""" return LatencyTracker(maxlen=100) @pytest.fixture def small_tracker(): """Create a LatencyTracker with small maxlen for overflow testing.""" return LatencyTracker(maxlen=5) # ====================== Initialization Tests ====================== def test_latency_tracker_initialization(): """Test LatencyTracker initializes correctly.""" tracker = LatencyTracker(maxlen=50) assert len(tracker) == 0 assert tracker.max_latency == 0.0 assert tracker.max() == 0.0 def test_latency_tracker_default_maxlen(): """Test LatencyTracker uses default maxlen.""" tracker = LatencyTracker() # Should accept default maxlen=100 assert len(tracker) == 0 # ====================== add() Tests ====================== def test_add_single_latency(tracker): """Test adding a single latency value.""" tracker.add(0.5) assert len(tracker) == 1 assert tracker.max() == 0.5 def test_add_multiple_latencies(tracker): """Test adding multiple latency values.""" latencies = [0.1, 0.5, 0.3, 0.8, 0.2] for lat in latencies: tracker.add(lat) assert len(tracker) == 5 assert tracker.max() == 0.8 def test_add_negative_latency_ignored(tracker): """Test that negative latencies are ignored.""" tracker.add(0.5) tracker.add(-0.1) tracker.add(0.3) # Should only have 2 valid latencies assert len(tracker) == 2 assert tracker.max() == 0.5 def test_add_zero_latency(tracker): """Test adding zero latency.""" tracker.add(0.0) assert len(tracker) == 1 assert tracker.max() == 0.0 def test_add_converts_to_float(tracker): """Test add() converts input to float.""" tracker.add(5) # Integer tracker.add("3.5") # String assert len(tracker) == 2 assert tracker.max() == 5.0 def test_add_updates_max_latency(tracker): """Test that max_latency is updated correctly.""" tracker.add(0.5) assert tracker.max_latency == 0.5 tracker.add(0.3) assert tracker.max_latency == 0.5 # Should not decrease tracker.add(0.9) assert tracker.max_latency == 0.9 # Should increase # ====================== reset() Tests ====================== def test_reset_clears_values(tracker): """Test reset() clears all values.""" tracker.add(0.5) tracker.add(0.8) tracker.add(0.3) assert len(tracker) == 3 tracker.reset() assert len(tracker) == 0 assert tracker.max_latency == 0.0 def test_reset_clears_max_latency(tracker): """Test reset() resets max_latency.""" tracker.add(1.5) assert tracker.max_latency == 1.5 tracker.reset() assert tracker.max_latency == 0.0 def test_reset_allows_new_values(tracker): """Test that tracker works correctly after reset.""" tracker.add(0.5) tracker.reset() tracker.add(0.3) assert len(tracker) == 1 assert tracker.max() == 0.3 # ====================== max() Tests ====================== def test_max_returns_zero_when_empty(tracker): """Test max() returns 0.0 when tracker is empty.""" assert tracker.max() == 0.0 def test_max_returns_maximum_value(tracker): """Test max() returns the maximum latency.""" latencies = [0.2, 0.8, 0.3, 0.5, 0.1] for lat in latencies: tracker.add(lat) assert tracker.max() == 0.8 def test_max_persists_after_sliding_window(small_tracker): """Test max() persists even after values slide out of window.""" # Add values that will exceed maxlen=5 small_tracker.add(0.1) small_tracker.add(0.9) # This is max small_tracker.add(0.2) small_tracker.add(0.3) small_tracker.add(0.4) small_tracker.add(0.5) # This pushes out 0.1 # Max should still be 0.9 even though only last 5 values kept assert small_tracker.max() == 0.9 def test_max_after_reset(tracker): """Test max() returns 0.0 after reset.""" tracker.add(1.5) tracker.reset() assert tracker.max() == 0.0 # ====================== percentile() Tests ====================== def test_percentile_returns_zero_when_empty(tracker): """Test percentile() returns 0.0 when tracker is empty.""" assert tracker.percentile(0.5) == 0.0 assert tracker.percentile(0.95) == 0.0 def test_percentile_median(tracker): """Test percentile(0.5) returns median.""" # Add sorted values for easier verification values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] for v in values: tracker.add(v) # Median should be around 0.5 median = tracker.percentile(0.5) assert 0.45 <= median <= 0.55 def test_percentile_minimum_with_zero(tracker): """Test percentile(0.0) returns minimum.""" tracker.add(0.5) tracker.add(0.2) tracker.add(0.8) assert tracker.percentile(0.0) == 0.2 def test_percentile_maximum_with_one(tracker): """Test percentile(1.0) returns maximum.""" tracker.add(0.5) tracker.add(0.2) tracker.add(0.8) assert tracker.percentile(1.0) == 0.8 def test_percentile_95(tracker): """Test percentile(0.95) returns 95th percentile.""" # Add 100 values from 0.0 to 0.99 for i in range(100): tracker.add(i / 100.0) p95 = tracker.percentile(0.95) # 95th percentile should be around 0.95 assert 0.93 <= p95 <= 0.96 def test_percentile_negative_value_returns_min(tracker): """Test percentile with negative q returns minimum.""" tracker.add(0.5) tracker.add(0.2) tracker.add(0.8) assert tracker.percentile(-0.5) == 0.2 def test_percentile_value_greater_than_one_returns_max(tracker): """Test percentile with q > 1.0 returns maximum.""" tracker.add(0.5) tracker.add(0.2) tracker.add(0.8) assert tracker.percentile(1.5) == 0.8 # ====================== p95() Tests ====================== def test_p95_returns_zero_when_empty(tracker): """Test p95() returns 0.0 when tracker is empty.""" assert tracker.p95() == 0.0 def test_p95_returns_95th_percentile(tracker): """Test p95() returns the 95th percentile.""" # Add 100 values for i in range(100): tracker.add(i / 100.0) p95 = tracker.p95() assert 0.93 <= p95 <= 0.96 def test_p95_equals_percentile_95(tracker): """Test p95() equals percentile(0.95).""" for i in range(50): tracker.add(i / 50.0) assert tracker.p95() == tracker.percentile(0.95) # ====================== __len__() Tests ====================== def test_len_returns_zero_initially(tracker): """Test __len__ returns 0 for new tracker.""" assert len(tracker) == 0 def test_len_increments_with_add(tracker): """Test __len__ increments as values are added.""" assert len(tracker) == 0 tracker.add(0.1) assert len(tracker) == 1 tracker.add(0.2) assert len(tracker) == 2 tracker.add(0.3) assert len(tracker) == 3 def test_len_respects_maxlen(small_tracker): """Test __len__ respects maxlen limit.""" # Add more than maxlen values for i in range(10): small_tracker.add(i / 10.0) # Should only keep last 5 assert len(small_tracker) == 5 def test_len_after_reset(tracker): """Test __len__ returns 0 after reset.""" tracker.add(0.5) tracker.add(0.3) assert len(tracker) == 2 tracker.reset() assert len(tracker) == 0 # ====================== Sliding Window Tests ====================== def test_sliding_window_removes_oldest(small_tracker): """Test sliding window removes oldest values.""" # Add 7 values to tracker with maxlen=5 values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7] for v in values: small_tracker.add(v) # Should only have last 5: [0.3, 0.4, 0.5, 0.6, 0.7] assert len(small_tracker) == 5 # Median should reflect last 5 values median = small_tracker.percentile(0.5) assert 0.45 <= median <= 0.55 def test_sliding_window_maintains_max(small_tracker): """Test sliding window maintains correct max even after overflow.""" small_tracker.add(0.1) small_tracker.add(0.9) small_tracker.add(0.2) small_tracker.add(0.3) small_tracker.add(0.4) small_tracker.add(0.5) # Pushes out 0.1 # Max should still be 0.9 assert small_tracker.max() == 0.9 # ====================== Edge Cases Tests ====================== def test_single_value(tracker): """Test tracker behavior with single value.""" tracker.add(0.75) assert len(tracker) == 1 assert tracker.max() == 0.75 assert tracker.percentile(0.0) == 0.75 assert tracker.percentile(0.5) == 0.75 assert tracker.percentile(1.0) == 0.75 def test_all_same_values(tracker): """Test tracker with all identical values.""" for _ in range(10): tracker.add(0.5) assert len(tracker) == 10 assert tracker.max() == 0.5 assert tracker.percentile(0.0) == 0.5 assert tracker.percentile(0.5) == 0.5 assert tracker.percentile(1.0) == 0.5 def test_very_small_values(tracker): """Test tracker with very small float values.""" tracker.add(1e-10) tracker.add(2e-10) tracker.add(3e-10) assert len(tracker) == 3 assert tracker.max() == pytest.approx(3e-10) def test_very_large_values(tracker): """Test tracker with very large float values.""" tracker.add(1e10) tracker.add(2e10) tracker.add(3e10) assert len(tracker) == 3 assert tracker.max() == pytest.approx(3e10) # ====================== Integration Tests ====================== def test_typical_usage_pattern(tracker): """Test a typical usage pattern of the tracker.""" # Simulate adding latencies over time latencies = [0.05, 0.08, 0.12, 0.07, 0.15, 0.09, 0.11, 0.06, 0.14, 0.10] for lat in latencies: tracker.add(lat) # Check statistics assert len(tracker) == 10 assert tracker.max() == 0.15 # p95 should be close to max since we have only 10 values p95 = tracker.p95() assert p95 >= tracker.percentile(0.5) # p95 should be >= median assert p95 <= tracker.max() # p95 should be <= max def test_reset_and_reuse(tracker): """Test resetting and reusing tracker.""" # First batch tracker.add(1.0) tracker.add(2.0) assert tracker.max() == 2.0 # Reset tracker.reset() # Second batch tracker.add(0.5) tracker.add(0.8) assert len(tracker) == 2 assert tracker.max() == 0.8 assert tracker.percentile(0.5) <= 0.8 def test_continuous_monitoring(small_tracker): """Test continuous monitoring with sliding window.""" # Simulate continuous latency monitoring # First 5 latencies for i in range(5): small_tracker.add(0.1 * (i + 1)) max_before = small_tracker.max() # Add 5 more (window slides) for i in range(5, 10): small_tracker.add(0.1 * (i + 1)) # Max should have increased assert small_tracker.max() > max_before assert len(small_tracker) == 5 # Window size maintained # ====================== Type Conversion Tests ====================== def test_add_with_integer(tracker): """Test adding integer values.""" tracker.add(5) assert len(tracker) == 1 assert tracker.max() == 5.0 def test_add_with_string_number(tracker): """Test adding string representation of number.""" tracker.add("3.14") assert len(tracker) == 1 assert tracker.max() == pytest.approx(3.14) def test_percentile_converts_q_to_float(tracker): """Test percentile converts q parameter to float.""" tracker.add(0.5) tracker.add(0.8) # Pass integer q result = tracker.percentile(1) assert result == 0.8