add timing debugging, foot pedal and eval script

This commit is contained in:
croissant
2025-11-05 09:06:14 +01:00
committed by Michel Aractingi
parent 01c1735739
commit 4744f99990
7 changed files with 950 additions and 330 deletions
+357
View File
@@ -0,0 +1,357 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenArms Policy Evaluation
Evaluates a trained policy on the OpenArms robot by running inference and recording
the evaluation episodes to a dataset. Supports optional leader arm for manual resets.
Example usage:
python examples/openarms/evaluate.py
"""
import time
from pathlib import Path
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.processor import make_default_processors
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.scripts.lerobot_record import record_loop
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
HF_MODEL_ID = "<hf_username>/<model_repo_id>" # TODO: Replace with your trained model
HF_EVAL_DATASET_ID = "<hf_username>/<eval_dataset_repo_id>" # TODO: Replace with your eval dataset name
TASK_DESCRIPTION = "Your task description here" # TODO: Replace with your task, this should match!!
NUM_EPISODES = 5
FPS = 30
EPISODE_TIME_SEC = 120
RESET_TIME_SEC = 60
# Robot CAN interfaces
FOLLOWER_LEFT_PORT = "can2"
FOLLOWER_RIGHT_PORT = "can3"
# If enabled, you can manually reset the environment between evaluation episodes
USE_LEADER_FOR_RESETS = True # Set to False if you don't want to use leader
LEADER_LEFT_PORT = "can0"
LEADER_RIGHT_PORT = "can1"
# Camera configuration
CAMERA_CONFIG = {
"left_wrist": OpenCVCameraConfig(index_or_path="/dev/video0", width=640, height=480, fps=FPS),
"right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=FPS),
"base": OpenCVCameraConfig(index_or_path="/dev/video4", width=640, height=480, fps=FPS),
}
def main():
"""Main evaluation function."""
print("OpenArms Policy Evaluation")
print(f"\nModel: {HF_MODEL_ID}")
print(f"Evaluation Dataset: {HF_EVAL_DATASET_ID}")
print(f"Task: {TASK_DESCRIPTION}")
print(f"Episodes: {NUM_EPISODES}")
print(f"Episode Duration: {EPISODE_TIME_SEC}s")
print(f"Reset Duration: {RESET_TIME_SEC}s")
print(f"Use Leader for Resets: {USE_LEADER_FOR_RESETS}")
follower_config = OpenArmsFollowerConfig(
port_left=FOLLOWER_LEFT_PORT,
port_right=FOLLOWER_RIGHT_PORT,
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=10.0,
cameras=CAMERA_CONFIG,
)
follower = OpenArmsFollower(follower_config)
follower.connect(calibrate=False)
if not follower.is_connected:
raise RuntimeError("Follower robot failed to connect!")
leader = None
if USE_LEADER_FOR_RESETS:
leader_config = OpenArmsLeaderConfig(
port_left=LEADER_LEFT_PORT,
port_right=LEADER_RIGHT_PORT,
can_interface="socketcan",
id="openarms_leader",
manual_control=False, # Enable torque control for gravity compensation
)
leader = OpenArmsLeader(leader_config)
leader.connect(calibrate=False)
if not leader.is_connected:
raise RuntimeError("Leader robot failed to connect!")
# Enable gravity compensation
if leader.pin_robot is not None:
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
print(f"Leader connected with gravity compensation ({LEADER_LEFT_PORT}, {LEADER_RIGHT_PORT})")
else:
print(f"Leader connected but gravity compensation unavailable (no URDF)")
policy = make_policy.from_pretrained(HF_MODEL_ID)
# Build default processors for action and observation
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Build dataset features from robot features and processors
# For actions, only include positions (no velocity or torque)
action_features_hw = {}
for key, value in follower.action_features.items():
if key.endswith(".pos"):
action_features_hw[key] = value
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(action=action_features_hw),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
)
# Check if dataset already exists
dataset_path = Path.home() / ".cache" / "huggingface" / "lerobot" / HF_EVAL_DATASET_ID
if dataset_path.exists():
print(f"Evaluation dataset already exists at: {dataset_path}")
print("This will append new episodes to the existing dataset.")
choice = input(" Continue? (y/n): ").strip().lower()
if choice != 'y':
print(" Aborting evaluation.")
follower.disconnect()
if leader:
leader.disconnect()
return
# Create dataset
dataset = LeRobotDataset.create(
repo_id=HF_EVAL_DATASET_ID,
fps=FPS,
features=dataset_features,
robot_type=follower.name,
use_videos=True,
image_writer_processes=0,
image_writer_threads=12,
)
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy.config,
pretrained_path=HF_MODEL_ID,
dataset_stats=dataset.meta.stats,
preprocessor_overrides={
"device_processor": {"device": str(policy.config.device)}
},
)
print(f"\nRunning evaluation...")
# Initialize keyboard listener and visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="openarms_evaluation")
episode_idx = 0
try:
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Evaluating episode {episode_idx + 1} of {NUM_EPISODES}")
print(f"\nRunning inference for episode {episode_idx + 1}...")
# Run inference with policy
record_loop(
robot=follower,
events=events,
fps=FPS,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
policy=policy,
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
)
# Handle re-recording
if events["rerecord_episode"]:
log_say("Re-recording episode")
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
# Save episode
if dataset.episode_buffer is not None and dataset.episode_buffer.get("size", 0) > 0:
print(f"Saving episode {episode_idx + 1} ({dataset.episode_buffer['size']} frames)...")
dataset.save_episode()
episode_idx += 1
# Reset environment between episodes (if not last episode)
if not events["stop_recording"] and episode_idx < NUM_EPISODES:
if USE_LEADER_FOR_RESETS and leader:
log_say("Reset the environment using leader arms")
print(f"\nManual reset period ({RESET_TIME_SEC}s)...")
# Use leader for manual reset with gravity compensation
import numpy as np
dt = 1 / FPS
reset_start_time = time.perf_counter()
while time.perf_counter() - reset_start_time < RESET_TIME_SEC:
if events["exit_early"] or events["stop_recording"]:
break
loop_start = time.perf_counter()
# Get leader state
leader_action = leader.get_action()
# Extract positions and velocities
leader_positions_deg = {}
leader_velocities_deg_per_sec = {}
for motor in leader.bus_right.motors:
pos_key = f"right_{motor}.pos"
vel_key = f"right_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
for motor in leader.bus_left.motors:
pos_key = f"left_{motor}.pos"
vel_key = f"left_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
# Calculate gravity and friction torques
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec,
friction_scale=1.0
)
# Combine torques
leader_total_torques_nm = {}
for motor_name in leader_gravity_torques_nm:
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
friction = leader_friction_torques_nm.get(motor_name, 0.0)
leader_total_torques_nm[motor_name] = gravity + friction
# Apply compensation
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
kd = leader.get_damping_kd(motor)
leader.bus_right._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
kd = leader.get_damping_kd(motor)
leader.bus_left._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position,
velocity_deg_per_sec=0.0,
torque=torque,
)
# Send leader positions to follower
follower_action = {}
for joint in leader_positions_deg.keys():
pos_key = f"{joint}.pos"
if pos_key in leader_action:
follower_action[pos_key] = leader_action[pos_key]
if follower_action:
follower.send_action(follower_action)
# Maintain loop rate
loop_duration = time.perf_counter() - loop_start
sleep_time = dt - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
print("Reset complete")
else:
log_say("Waiting for manual reset")
print(f"Manually reset the environment and press ENTER to continue")
input("Press ENTER when ready...")
print(f"Evaluation complete! {episode_idx} episodes recorded")
log_say("Evaluation complete", blocking=True)
except KeyboardInterrupt:
print("\n\nEvaluation interrupted by user")
finally:
if leader:
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
leader.disconnect()
follower.disconnect()
if listener is not None:
listener.stop()
dataset.finalize()
print("\nUploading to Hugging Face Hub...")
dataset.push_to_hub(private=True)
if __name__ == "__main__":
main()
+15
View File
@@ -352,6 +352,21 @@ button {
transition: all 0.2s;
}
.btn-set-task {
background: #3b82f6;
color: white;
min-width: 120px;
}
.btn-set-task:hover:not(:disabled) {
background: #2563eb;
}
.btn-set-task:disabled {
background: #d1d5db;
cursor: not-allowed;
}
.btn-start {
background: #10b981;
color: white;
+49
View File
@@ -177,6 +177,42 @@ function App() {
}
};
// Set task only (for pedal use)
const setTaskOnly = async () => {
if (!task.trim()) {
setError('Please enter a task description');
return;
}
setError(null);
try {
const response = await fetch(`${API_BASE}/recording/set-task`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ task, ...config })
});
if (!response.ok) {
const data = await response.json();
throw new Error(data.detail || 'Failed to set task');
}
const result = await response.json();
setStatusMessage(result.message || `Task set: ${task}`);
saveConfig(config);
// Clear success message after 3 seconds
setTimeout(() => {
if (!isRecording && !isInitializing) {
setStatusMessage('Ready');
}
}, 3000);
} catch (e) {
setError(e.message);
}
};
// Start recording
const startRecording = async () => {
if (!task.trim()) {
@@ -497,7 +533,20 @@ function App() {
onChange={(e) => setTask(e.target.value)}
placeholder="Task description (e.g., 'pick and place')"
disabled={isRecording || isInitializing || isEncoding || isUploading}
onKeyPress={(e) => {
if (e.key === 'Enter' && robotsReady) {
setTaskOnly();
}
}}
/>
<button
onClick={setTaskOnly}
disabled={isRecording || isInitializing || isEncoding || isUploading || !robotsReady}
className="btn-set-task"
title={!robotsReady ? 'Please setup robots first' : 'Store task for pedal use (Enter key)'}
>
💾 Set Task
</button>
<button
onClick={startRecording}
disabled={isRecording || isInitializing || isEncoding || isUploading || !robotsReady}
@@ -22,6 +22,13 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
try:
from evdev import InputDevice, categorize, ecodes
PEDAL_AVAILABLE = True
except ImportError:
PEDAL_AVAILABLE = False
print("[Pedal] evdev not installed. Pedal support disabled. Install with: pip install evdev")
# LeRobot imports
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
@@ -80,8 +87,8 @@ robot_instances = {
}
# Camera frames from recording loop (for streaming during recording)
# Store pre-encoded JPEG bytes with timestamps to track freshness and discard stale frames
camera_frames_jpeg = {
# Store raw RGB frames directly from robot observation (no pre-encoding)
camera_frames_raw = {
"left_wrist": None,
"right_wrist": None,
"base": None,
@@ -93,7 +100,7 @@ camera_frames_timestamp = {
}
camera_frames_lock = threading.Lock()
# Maximum age for frames (in seconds) - discard frames older than this
MAX_FRAME_AGE = 0.2
MAX_FRAME_AGE = 0.15 # 150ms = allows for ~6-7 FPS freshness
# Recording control
recording_thread = None
stop_recording_flag = threading.Event()
@@ -101,6 +108,13 @@ FPS = 30
FRICTION_SCALE = 1.0
RAMP_UP_DURATION = 3.0 # Seconds to ramp up PID/torque from 0 to full
# Pedal configuration
PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
PEDAL_ENABLED = True # Set to False to disable pedal
pedal_thread = None
stop_pedal_flag = threading.Event()
pedal_action_lock = threading.Lock() # Prevent concurrent pedal actions
class RecordingConfig(BaseModel):
task: str
@@ -329,10 +343,9 @@ def initialize_robot_systems(config: RecordingConfig):
features=dataset_features,
robot_type=follower.name,
use_videos=True,
image_writer_processes=0, # Use threads only (faster for single process)
image_writer_threads=8, # More threads for 3 cameras (2-3 per camera)
image_writer_processes=0, # Use threads only (no multiprocessing)
image_writer_threads=12, # 4 threads per camera for 3 cameras
)
print(f"[Dataset] Created: {repo_id}")
except Exception as e:
print(f"[Dataset] Error: {e}")
raise RuntimeError(f"Failed to create dataset: {e}")
@@ -365,6 +378,21 @@ def record_loop_with_compensation():
dataset_features = robot_instances.get("dataset_features")
task = recording_state.get("task", "")
# Profiling: Track time spent in each operation
timing_stats = {
"get_action": [],
"gravity_calc": [],
"friction_calc": [],
"leader_control": [],
"follower_action": [],
"get_observation": [],
"frame_store": [],
"dataset_add": [],
"sleep": [],
}
profile_counter = 0
PROFILE_INTERVAL = 150 # Log every 150 frames (5 seconds @ 30 Hz)
if follower is None or leader is None or dataset is None:
recording_state["error"] = "Robot or dataset not initialized"
print(f"[Recording] Error: Missing components")
@@ -423,13 +451,6 @@ def record_loop_with_compensation():
if actual_loop_fps < 29.0:
print(f"[Recording] WARNING: Loop FPS low: {actual_loop_fps:.1f} Hz")
# More aggressive skip logic: skip encoding if loop is taking >90% of frame time
# OR if previous loop was slow
if hasattr(record_loop_with_compensation, '_last_loop_duration'):
skip_encoding = record_loop_with_compensation._last_loop_duration > dt * 0.9
else:
skip_encoding = False
# Calculate frame recording FPS every second (only after ramp-up)
if is_ramp_up_complete:
elapsed = loop_start - episode_start_time
@@ -441,7 +462,9 @@ def record_loop_with_compensation():
last_fps_update = loop_start
# Get leader state
t0 = time.perf_counter()
leader_action = leader.get_action()
timing_stats["get_action"].append(time.perf_counter() - t0)
# Extract positions and velocities
leader_positions_deg = {}
@@ -465,13 +488,19 @@ def record_loop_with_compensation():
# Calculate gravity and friction torques
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
t0 = time.perf_counter()
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
timing_stats["gravity_calc"].append(time.perf_counter() - t0)
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
t0 = time.perf_counter()
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec,
friction_scale=FRICTION_SCALE
)
timing_stats["friction_calc"].append(time.perf_counter() - t0)
# Combine torques
leader_total_torques_nm = {}
@@ -481,6 +510,7 @@ def record_loop_with_compensation():
leader_total_torques_nm[motor_name] = gravity + friction
# Apply compensation to RIGHT arm with ramp-up scaling
t0 = time.perf_counter()
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
@@ -511,6 +541,7 @@ def record_loop_with_compensation():
velocity_deg_per_sec=0.0,
torque=torque,
)
timing_stats["leader_control"].append(time.perf_counter() - t0)
# Send positions to follower with ramped PID gains using send_action
follower_action = {}
@@ -547,73 +578,104 @@ def record_loop_with_compensation():
custom_kd[full_name] = kd_base * ramp_factor
# Send action with custom ramped gains (includes safety checks)
t0 = time.perf_counter()
follower.send_action(follower_action, custom_kp=custom_kp, custom_kd=custom_kd)
timing_stats["follower_action"].append(time.perf_counter() - t0)
# Get observation
# Get observation (with detailed breakdown)
t0 = time.perf_counter()
observation = follower.get_observation()
obs_time = time.perf_counter() - t0
timing_stats["get_observation"].append(obs_time)
# Get current timestamp for frame freshness tracking
# Extract detailed timing breakdown from observation
if "_timing_breakdown" in observation:
for key, value_ms in observation["_timing_breakdown"].items():
if key not in timing_stats:
timing_stats[key] = []
timing_stats[key].append(value_ms / 1000) # Convert ms to seconds
# Remove timing metadata before storing in dataset
del observation["_timing_breakdown"]
# Store raw camera frames directly for streaming (no encoding in control loop!)
# Streaming endpoint will encode at 10 FPS as needed
# PRIORITY: 30 Hz control loop must not be slowed by encoding
frame_timestamp = time.perf_counter()
# Pre-encode camera frames as JPEG for streaming (minimizes lock time and encoding overhead)
# This avoids encoding the same frame multiple times for multiple stream endpoints
# OpenCV cameras configured with ColorMode.RGB return RGB frames (default config)
# Skip encoding if we're falling behind to maintain 30 FPS
encoded_frames = {}
# Only encode if we're not falling behind
# PRIORITY: 30 Hz control loop > streaming quality
if not skip_encoding:
for cam_name in ["left_wrist", "right_wrist", "base"]:
if cam_name in observation:
frame = observation[cam_name]
if frame is not None and len(frame.shape) == 3:
# Frame is already RGB from OpenCVCamera (ColorMode.RGB by default)
# Encode once, use many times - eliminates per-stream encoding overhead
# Use low quality (40) for fast streaming - control loop is priority
encode_start = time.perf_counter()
success, jpeg_bytes = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 40])
encode_duration = time.perf_counter() - encode_start
# Only use encoded frame if encoding was very fast (< 5ms = 15% of frame time)
if success and encode_duration < 0.005:
encoded_frames[cam_name] = jpeg_bytes.tobytes()
# If encoding took too long, skip this camera for this frame
# Store encoded frames with timestamps with minimal lock time
t0 = time.perf_counter()
with camera_frames_lock:
for cam_name, jpeg_bytes in encoded_frames.items():
camera_frames_jpeg[cam_name] = jpeg_bytes
camera_frames_timestamp[cam_name] = frame_timestamp
for cam_name in ["left_wrist", "right_wrist", "base"]:
if cam_name in observation and observation[cam_name] is not None:
# Store raw RGB frame directly - no encoding overhead
camera_frames_raw[cam_name] = observation[cam_name]
camera_frames_timestamp[cam_name] = frame_timestamp
timing_stats["frame_store"].append(time.perf_counter() - t0)
# Add to dataset ONLY after ramp-up completes
if is_ramp_up_complete:
try:
t0 = time.perf_counter()
obs_frame = build_dataset_frame(dataset_features, observation, prefix="observation")
action_frame = build_dataset_frame(dataset_features, follower_action, prefix="action")
frame = {**obs_frame, **action_frame}
frame["task"] = task
dataset.add_frame(frame)
timing_stats["dataset_add"].append(time.perf_counter() - t0)
frame_count += 1
profile_counter += 1
# Log progress every 5 seconds (150 frames @ 30 FPS)
if frame_count % 150 == 0:
# Log progress and profiling every 5 seconds (150 frames @ 30 FPS)
if profile_counter >= PROFILE_INTERVAL:
profile_counter = 0
# Calculate averages
avg_times = {k: (sum(v) / len(v) * 1000) if v else 0 for k, v in timing_stats.items()}
# Core operations for total time
core_ops = ['get_action', 'gravity_calc', 'friction_calc', 'leader_control',
'follower_action', 'get_observation', 'frame_store', 'dataset_add']
total_time = sum(avg_times.get(k, 0) for k in core_ops)
print(f"[Recording] {frame_count} frames | Loop: {recording_state['loop_fps']} Hz | Recording: {recording_state['current_fps']} FPS")
print(f"[Profiling] Time per operation (ms):")
print(f" get_action: {avg_times.get('get_action', 0):6.2f} ms ({avg_times.get('get_action', 0)/total_time*100:5.1f}%)")
print(f" gravity_calc: {avg_times.get('gravity_calc', 0):6.2f} ms ({avg_times.get('gravity_calc', 0)/total_time*100:5.1f}%)")
print(f" friction_calc: {avg_times.get('friction_calc', 0):6.2f} ms ({avg_times.get('friction_calc', 0)/total_time*100:5.1f}%)")
print(f" leader_control: {avg_times.get('leader_control', 0):6.2f} ms ({avg_times.get('leader_control', 0)/total_time*100:5.1f}%)")
print(f" follower_action: {avg_times.get('follower_action', 0):6.2f} ms ({avg_times.get('follower_action', 0)/total_time*100:5.1f}%)")
print(f" get_observation: {avg_times.get('get_observation', 0):6.2f} ms ({avg_times.get('get_observation', 0)/total_time*100:5.1f}%)")
# Show detailed breakdown of get_observation
if any(k.startswith('cam_') or k in ['right_motors', 'left_motors'] for k in avg_times):
print(f" └─ Breakdown:")
if 'right_motors' in avg_times:
print(f" right_motors: {avg_times['right_motors']:6.2f} ms")
if 'left_motors' in avg_times:
print(f" left_motors: {avg_times['left_motors']:6.2f} ms")
for cam in ['left_wrist', 'right_wrist', 'base']:
cam_key = f'cam_{cam}'
if cam_key in avg_times:
print(f" {cam:14s}: {avg_times[cam_key]:6.2f} ms")
print(f" frame_store: {avg_times.get('frame_store', 0):6.2f} ms ({avg_times.get('frame_store', 0)/total_time*100:5.1f}%)")
print(f" dataset_add: {avg_times.get('dataset_add', 0):6.2f} ms ({avg_times.get('dataset_add', 0)/total_time*100:5.1f}%)")
print(f" TOTAL: {total_time:6.2f} ms (target: 33.33 ms @ 30 Hz)")
# Clear stats for next interval
for k in timing_stats:
timing_stats[k].clear()
except Exception as frame_error:
print(f"[Recording] Frame error: {frame_error}")
# Continue recording even if one frame fails
# Maintain loop rate - don't wait if behind, just continue
loop_duration = time.perf_counter() - loop_start
# Track loop duration for next iteration's encoding skip decision
record_loop_with_compensation._last_loop_duration = loop_duration
sleep_time = dt - loop_duration
# Only sleep if we're ahead of schedule (more than 2ms remaining)
if sleep_time > 0.002:
t0 = time.perf_counter()
time.sleep(min(sleep_time, dt * 0.5)) # Never sleep more than half a frame time
timing_stats["sleep"].append(time.perf_counter() - t0)
# If behind, just continue without sleeping - maintain real-time rate
except Exception as e:
@@ -636,62 +698,33 @@ def record_loop_with_compensation():
print(f"[Recording] Local dataset variable: {dataset}")
@app.post("/api/recording/set-task")
async def set_task(config: RecordingConfig):
"""Store task and config without starting recording (for pedal use).
This allows you to set the task in advance, so the pedal can start recording
without needing to interact with the UI.
"""
if recording_state["is_recording"] or recording_state["is_initializing"]:
raise HTTPException(status_code=400, detail="Cannot change task while recording")
# Store the config and task for later use by pedal or start button
recording_state["config"] = config.model_dump()
recording_state["task"] = config.task
print(f"[SetTask] Task set to: {config.task}")
return {
"status": "success",
"task": config.task,
"message": "Task stored. You can now start recording with pedal or button."
}
@app.post("/api/recording/start")
async def start_recording(config: RecordingConfig):
"""Start recording an episode."""
global recording_state, recording_thread, stop_recording_flag
if recording_state["is_recording"] or recording_state["is_initializing"]:
raise HTTPException(status_code=400, detail="Already recording or initializing")
# Check if robots are available (either from setup or previous recording)
if not robot_instances.get("follower") or not robot_instances.get("leader"):
raise HTTPException(
status_code=400,
detail="Robots not initialized. Please click 'Setup Robots' first."
)
try:
# Update config
recording_state["config"] = config.model_dump()
recording_state["task"] = config.task
recording_state["error"] = None
recording_state["is_initializing"] = True
recording_state["status_message"] = "Initializing recording..."
# Initialize robot systems (will reuse pre-initialized robots if available)
initialize_robot_systems(config)
if robot_instances.get("dataset") is None:
raise RuntimeError("Dataset not created!")
# Start recording
recording_state["is_initializing"] = False
recording_state["is_recording"] = True
recording_state["start_time"] = time.time()
recording_state["recording_started_time"] = None # Will be set after ramp-up
recording_state["ramp_up_remaining"] = RAMP_UP_DURATION # Initialize ramp-up
stop_recording_flag.clear() # Clear the stop flag
# Start recording in background thread
recording_thread = threading.Thread(target=record_loop_with_compensation, daemon=True)
recording_thread.start()
time.sleep(0.1)
if not recording_thread.is_alive():
raise RuntimeError("Recording thread failed to start")
print(f"[Start] Recording started for: {config.task}")
return {"status": "started", "task": config.task}
except Exception as e:
recording_state["is_recording"] = False
recording_state["is_initializing"] = False
recording_state["error"] = str(e)
recording_state["status_message"] = f"Error: {str(e)}"
# Clean up dataset if initialization failed, but keep robots
cleanup_robot_systems(keep_robots=True)
raise HTTPException(status_code=500, detail=str(e))
return do_start_recording(config, source="API")
@@ -699,91 +732,7 @@ async def start_recording(config: RecordingConfig):
@app.post("/api/recording/stop")
async def stop_recording():
"""Stop recording, encode, and upload episode."""
global recording_state, recording_thread, stop_recording_flag
if not recording_state["is_recording"]:
raise HTTPException(status_code=400, detail="Not recording")
recording_state["is_recording"] = False
recording_state["status_message"] = "Stopping recording..."
recording_state["ramp_up_remaining"] = 0.0 # Reset ramp-up
recording_state["recording_started_time"] = None # Reset recording start time
# Stop the recording thread
stop_recording_flag.set()
if recording_thread:
recording_thread.join(timeout=5)
dataset = robot_instances.get("dataset")
dataset_name = robot_instances.get("repo_id", "").split("/")[-1] if robot_instances.get("repo_id") else ""
print(f"[Stop] Recording stopped")
print(f"[Stop] Dataset is None: {dataset is None}")
if dataset:
print(f"[Stop] Dataset type: {type(dataset)}")
print(f"[Stop] Episode buffer is None: {dataset.episode_buffer is None}")
print(f"[Stop] Episode buffer value: {dataset.episode_buffer}")
if dataset.episode_buffer:
print(f"[Stop] Episode buffer keys: {list(dataset.episode_buffer.keys())}")
print(f"[Stop] Episode buffer type: {type(dataset.episode_buffer)}")
print(f"[Stop] Episode buffer size: {dataset.episode_buffer.get('size', 'NO SIZE KEY')}")
# Process episode immediately (blocking)
try:
# Check buffer the same way as reference implementation
if dataset is not None and dataset.episode_buffer is not None and dataset.episode_buffer.get("size", 0) > 0:
buffer_size = dataset.episode_buffer.get("size", 0)
print(f"[Stop] Buffer size: {buffer_size}")
# Encode videos
recording_state["is_encoding"] = True
recording_state["is_uploading"] = False # Ensure upload flag is clear
recording_state["status_message"] = f"Encoding videos ({buffer_size} frames)..."
recording_state["upload_status"] = None # Clear upload status
print(f"[Stop] Saving episode...")
dataset.save_episode()
dataset.finalize()
recording_state["is_encoding"] = False
recording_state["status_message"] = "Encoding complete, uploading..."
print(f"[Stop] Episode saved")
# Upload to hub
recording_state["is_uploading"] = True
recording_state["status_message"] = "Uploading to HuggingFace..."
recording_state["upload_status"] = "Uploading..."
print(f"[Stop] Uploading to hub...")
dataset.push_to_hub(private=True)
recording_state["is_uploading"] = False
recording_state["upload_status"] = "✓ Upload successful!"
recording_state["status_message"] = "Ready"
recording_state["episode_count"] += 1
print(f"[Stop] Upload complete. Episode count: {recording_state['episode_count']}")
else:
recording_state["status_message"] = "No data"
recording_state["upload_status"] = "No data"
print(f"[Stop] No dataset or buffer (dataset={dataset is not None}, buffer={dataset.episode_buffer is not None if dataset else 'N/A'}, size={dataset.episode_buffer.get('size', 0) if dataset and dataset.episode_buffer else 'N/A'})")
# Clean up dataset, keep robots
cleanup_robot_systems(keep_robots=True)
except Exception as e:
recording_state["is_encoding"] = False
recording_state["is_uploading"] = False
recording_state["error"] = f"Upload failed: {str(e)}"
recording_state["status_message"] = f"Error: {str(e)}"
recording_state["upload_status"] = f"✗ Upload failed"
cleanup_robot_systems(keep_robots=True)
print(f"[Stop] Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
return {
"status": "stopped",
"dataset_name": dataset_name,
"episode_count": recording_state["episode_count"]
}
return do_stop_recording(source="API")
def cleanup_robot_systems(keep_robots=False):
@@ -815,6 +764,301 @@ def cleanup_robot_systems(keep_robots=False):
print(f"[Cleanup] Error: {e}")
def do_start_recording(config: RecordingConfig, source: str = "API"):
"""Core logic for starting recording (used by both API and pedal)."""
global recording_thread
if recording_state["is_recording"] or recording_state["is_initializing"]:
raise HTTPException(status_code=400, detail="Already recording or initializing")
if not robot_instances.get("follower") or not robot_instances.get("leader"):
raise HTTPException(
status_code=400,
detail="Robots not initialized. Please click 'Setup Robots' first."
)
try:
recording_state["config"] = config.model_dump()
recording_state["task"] = config.task
recording_state["error"] = None
recording_state["is_initializing"] = True
recording_state["status_message"] = "Initializing recording..."
initialize_robot_systems(config)
if robot_instances.get("dataset") is None:
raise RuntimeError("Dataset not created!")
recording_state["is_initializing"] = False
recording_state["is_recording"] = True
recording_state["start_time"] = time.time()
recording_state["recording_started_time"] = None
recording_state["ramp_up_remaining"] = RAMP_UP_DURATION
stop_recording_flag.clear()
recording_thread = threading.Thread(target=record_loop_with_compensation, daemon=True)
recording_thread.start()
time.sleep(0.1)
if not recording_thread.is_alive():
raise RuntimeError("Recording thread failed to start")
print(f"[{source}] Recording started for: {config.task}")
return {"status": "started", "task": config.task}
except Exception as e:
recording_state["is_recording"] = False
recording_state["is_initializing"] = False
recording_state["error"] = str(e)
recording_state["status_message"] = f"Error: {str(e)}"
cleanup_robot_systems(keep_robots=True)
raise HTTPException(status_code=500, detail=str(e))
def do_stop_recording(source: str = "API"):
"""Core logic for stopping recording (used by both API and pedal)."""
global recording_thread, stop_recording_flag
if not recording_state["is_recording"]:
raise HTTPException(status_code=400, detail="Not recording")
recording_state["is_recording"] = False
recording_state["status_message"] = "Stopping recording..."
recording_state["ramp_up_remaining"] = 0.0
recording_state["recording_started_time"] = None
stop_recording_flag.set()
if recording_thread:
recording_thread.join(timeout=5)
dataset = robot_instances.get("dataset")
dataset_name = robot_instances.get("repo_id", "").split("/")[-1] if robot_instances.get("repo_id") else ""
print(f"[{source}] Recording stopped")
try:
if dataset is not None and dataset.episode_buffer is not None and dataset.episode_buffer.get("size", 0) > 0:
buffer_size = dataset.episode_buffer.get("size", 0)
print(f"[{source}] Buffer size: {buffer_size}")
recording_state["is_encoding"] = True
recording_state["is_uploading"] = False
recording_state["status_message"] = f"Encoding videos ({buffer_size} frames)..."
recording_state["upload_status"] = None
print(f"[{source}] Saving episode...")
dataset.save_episode()
dataset.finalize()
recording_state["is_encoding"] = False
recording_state["status_message"] = "Encoding complete, uploading..."
print(f"[{source}] Episode saved")
recording_state["is_uploading"] = True
recording_state["status_message"] = "Uploading to HuggingFace..."
recording_state["upload_status"] = "Uploading..."
print(f"[{source}] Uploading to hub...")
dataset.push_to_hub(private=True)
recording_state["is_uploading"] = False
recording_state["upload_status"] = "✓ Upload successful!"
recording_state["status_message"] = "Ready"
recording_state["episode_count"] += 1
print(f"[{source}] Upload complete. Episode count: {recording_state['episode_count']}")
else:
recording_state["status_message"] = "No data"
recording_state["upload_status"] = "No data"
print(f"[{source}] No data to save")
cleanup_robot_systems(keep_robots=True)
except Exception as e:
recording_state["is_encoding"] = False
recording_state["is_uploading"] = False
recording_state["error"] = f"Upload failed: {str(e)}"
recording_state["status_message"] = f"Error: {str(e)}"
recording_state["upload_status"] = "✗ Upload failed"
cleanup_robot_systems(keep_robots=True)
print(f"[{source}] Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
return {
"status": "stopped",
"dataset_name": dataset_name,
"episode_count": recording_state["episode_count"]
}
def do_move_to_zero(source: str = "API"):
"""Core logic for moving to zero (used by both API and pedal)."""
if recording_state["is_recording"] or recording_state["is_initializing"]:
raise HTTPException(status_code=400, detail="Cannot move to zero while recording")
if not robot_instances.get("follower"):
raise HTTPException(status_code=400, detail="Robots not initialized. Please setup robots first.")
if recording_state["moving_to_zero"]:
raise HTTPException(status_code=400, detail="Already moving to zero position")
try:
recording_state["moving_to_zero"] = True
recording_state["status_message"] = "Moving to zero position..."
follower = robot_instances["follower"]
leader = robot_instances.get("leader")
if leader:
print(f"[{source}] Disabling leader torque...")
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
motor_index = {
"joint_1": 0, "joint_2": 1, "joint_3": 2, "joint_4": 3,
"joint_5": 4, "joint_6": 5, "joint_7": 6, "gripper": 7,
}
zero_action = {}
custom_kp = {}
custom_kd = {}
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
for motor in follower.bus_left.motors:
full_name = f"left_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
duration = 2.0
fps = 30
dt = 1 / fps
start_time = time.perf_counter()
while time.perf_counter() - start_time < duration:
loop_start = time.perf_counter()
follower.send_action(zero_action, custom_kp=custom_kp, custom_kd=custom_kd)
loop_duration = time.perf_counter() - loop_start
sleep_time = dt - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
if leader:
print(f"[{source}] Re-enabling leader torque...")
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
recording_state["moving_to_zero"] = False
recording_state["status_message"] = "Moved to zero position" if recording_state["robots_ready"] else "Ready"
print(f"[{source}] Move to zero complete")
return {"status": "success", "message": "Robot moved to zero position"}
except Exception as e:
recording_state["moving_to_zero"] = False
recording_state["status_message"] = f"Error: {str(e)}"
leader = robot_instances.get("leader")
if leader:
try:
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
except Exception as torque_error:
print(f"[{source}] Failed to re-enable leader torque: {torque_error}")
raise HTTPException(status_code=500, detail=str(e))
def pedal_reader():
"""Read foot pedal events and trigger actions.
Key mappings (configurable):
- KEY_B (left pedal) -> Start recording
- KEY_N (middle pedal) -> Move to zero
- KEY_M (right pedal) -> Stop recording
"""
if not PEDAL_AVAILABLE:
print(f"[Pedal] evdev not available, pedal support disabled")
return
# Key mapping - adjust these if your pedal sends different keys
KEY_START_RECORDING = "KEY_A" # Left pedal
KEY_MOVE_TO_ZERO = "KEY_B" # Middle pedal
KEY_STOP_RECORDING = "KEY_C" # Right pedal
try:
dev = InputDevice(PEDAL_DEVICE)
print(f"[Pedal] Using {dev.path} ({dev.name})")
print(f"[Pedal] Key mapping: {KEY_START_RECORDING}=Start | {KEY_MOVE_TO_ZERO}=Zero | {KEY_STOP_RECORDING}=Stop")
for ev in dev.read_loop():
if stop_pedal_flag.is_set():
break
if ev.type != ecodes.EV_KEY:
continue
key = categorize(ev)
code = key.keycode
if isinstance(code, (list, tuple)):
code = code[0]
# Only trigger on key down (not up or hold)
if key.keystate != 1: # 1 = DOWN
continue
print(f"[Pedal] Key pressed: {code}")
# Use lock to prevent concurrent pedal actions
with pedal_action_lock:
try:
if code == KEY_START_RECORDING:
# Check if task is set before starting
if not recording_state.get("task"):
print(f"[Pedal] Cannot start: no task set")
recording_state["error"] = "No task set. Please enter task in web UI."
continue
config = RecordingConfig(**recording_state["config"])
do_start_recording(config, source="Pedal")
elif code == KEY_MOVE_TO_ZERO:
do_move_to_zero(source="Pedal")
elif code == KEY_STOP_RECORDING:
do_stop_recording(source="Pedal")
else:
print(f"[Pedal] Unmapped key: {code}")
except HTTPException as e:
# Handle validation errors gracefully (don't crash pedal thread)
print(f"[Pedal] Action blocked: {e.detail}")
except Exception as e:
print(f"[Pedal] Action error: {e}")
except FileNotFoundError:
print(f"[Pedal] Device not found: {PEDAL_DEVICE}")
print(f"[Pedal] Pedal support disabled. Connect pedal and restart server.")
except PermissionError:
print(f"[Pedal] Permission denied: {PEDAL_DEVICE}")
print(f"[Pedal] Run: sudo setfacl -m u:$USER:rw {PEDAL_DEVICE}")
except Exception as e:
print(f"[Pedal] Error: {e}")
import traceback
traceback.print_exc()
@app.post("/api/robots/setup")
async def setup_robots(config: RobotSetupConfig):
"""Pre-initialize robots for faster recording start."""
@@ -877,111 +1121,7 @@ async def move_to_zero():
Also disables leader torque during the movement for safety and ease of repositioning.
"""
global recording_state
if recording_state["is_recording"] or recording_state["is_initializing"]:
raise HTTPException(status_code=400, detail="Cannot move to zero while recording")
if not robot_instances.get("follower"):
raise HTTPException(status_code=400, detail="Robots not initialized. Please setup robots first.")
if recording_state["moving_to_zero"]:
raise HTTPException(status_code=400, detail="Already moving to zero position")
try:
recording_state["moving_to_zero"] = True
recording_state["status_message"] = "Moving to zero position..."
follower = robot_instances["follower"]
leader = robot_instances.get("leader")
# Disable leader torque for safety and ease of repositioning
if leader:
print(f"[MoveToZero] Disabling leader torque...")
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
# Motor name to index mapping
motor_index = {
"joint_1": 0, "joint_2": 1, "joint_3": 2, "joint_4": 3,
"joint_5": 4, "joint_6": 5, "joint_7": 6, "gripper": 7,
}
# Build zero position action with reduced gains (20% of original)
zero_action = {}
custom_kp = {}
custom_kd = {}
# Right arm - all motors to zero
for motor in follower.bus_right.motors:
full_name = f"right_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
# Left arm - all motors to zero
for motor in follower.bus_left.motors:
full_name = f"left_{motor}"
zero_action[f"{full_name}.pos"] = 0.0
idx = motor_index.get(motor, 0)
kp_base = follower.config.position_kp[idx] if isinstance(follower.config.position_kp, list) else follower.config.position_kp
kd_base = follower.config.position_kd[idx] if isinstance(follower.config.position_kd, list) else follower.config.position_kd
custom_kp[full_name] = kp_base * 0.05
custom_kd[full_name] = kd_base
# Send zero position commands for 2 seconds at 30 FPS
duration = 2.0
fps = 30
dt = 1 / fps
start_time = time.perf_counter()
while time.perf_counter() - start_time < duration:
loop_start = time.perf_counter()
# Send action with reduced gains
follower.send_action(zero_action, custom_kp=custom_kp, custom_kd=custom_kd)
# Maintain loop rate
loop_duration = time.perf_counter() - loop_start
sleep_time = dt - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
# Re-enable leader torque for gravity compensation
if leader:
print(f"[MoveToZero] Re-enabling leader torque...")
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
recording_state["moving_to_zero"] = False
recording_state["status_message"] = "Moved to zero position" if recording_state["robots_ready"] else "Ready"
print(f"[MoveToZero] Complete - leader torque re-enabled")
return {"status": "success", "message": "Robot moved to zero position"}
except Exception as e:
recording_state["moving_to_zero"] = False
recording_state["status_message"] = f"Error: {str(e)}"
# Try to re-enable leader torque even on error
leader = robot_instances.get("leader")
if leader:
try:
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
except Exception as torque_error:
print(f"[MoveToZero] Failed to re-enable leader torque: {torque_error}")
raise HTTPException(status_code=500, detail=str(e))
return do_move_to_zero(source="API")
@app.get("/api/status")
@@ -1028,44 +1168,47 @@ async def set_counter(update: CounterUpdate):
@app.get("/api/camera/stream/{camera_name}")
async def stream_camera(camera_name: str):
"""Stream camera feed from robot.
"""Stream camera feed from robot at 10 FPS.
During recording: streams frames from the recording loop (no extra camera reads = no contention!)
During recording: streams raw frames from the recording loop (no extra camera reads!)
When not recording: streams directly from cameras.
Encoding happens here at 10 FPS to avoid slowing down 30 Hz control loop.
"""
def generate():
try:
while True:
frame = None
frame_rgb = None
# If recording, use pre-encoded JPEG frames from recording loop (zero encoding, minimal lock time!)
# If recording, get raw frames from recording loop
if recording_state["is_recording"]:
# Acquire lock only to read the reference and timestamp, not to encode
jpeg_bytes = None
frame_timestamp = 0.0
current_time = time.perf_counter()
# Quick read of raw frame with minimal lock time
with camera_frames_lock:
jpeg_bytes = camera_frames_jpeg.get(camera_name)
frame_rgb = camera_frames_raw.get(camera_name)
frame_timestamp = camera_frames_timestamp.get(camera_name, 0.0)
# Only serve frames that are fresh (not stale)
# Check frame freshness
frame_age = current_time - frame_timestamp
if jpeg_bytes is not None and frame_age <= MAX_FRAME_AGE:
# Frame is fresh and available - yield it
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg_bytes + b'\r\n')
time.sleep(0.1) # ~10 FPS for streaming (independent of recording FPS)
elif jpeg_bytes is not None and frame_age > MAX_FRAME_AGE:
# Frame is stale - skip it, wait a bit and try next frame
time.sleep(0.01)
continue
if frame_rgb is not None and frame_age <= MAX_FRAME_AGE:
# Frame is fresh - encode it now (only when sending to client)
try:
_, buffer = cv2.imencode('.jpg', frame_rgb, [cv2.IMWRITE_JPEG_QUALITY, 50])
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
time.sleep(0.1) # 10 FPS streaming
except Exception as encode_error:
print(f"[Camera] Encode error: {encode_error}")
time.sleep(0.01)
else:
# No frame available yet - wait a bit
# Frame is stale or not available - wait for next frame
time.sleep(0.01)
continue
else:
# Not recording: stream directly from camera
# Not recording: stream directly from camera at 10 FPS
follower = robot_instances.get("follower")
if not follower or not follower.cameras:
break
@@ -1080,14 +1223,14 @@ async def stream_camera(camera_name: str):
time.sleep(0.01)
continue
# Convert BGR to RGB
# Convert BGR to RGB and encode
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
_, buffer = cv2.imencode('.jpg', frame_rgb, [cv2.IMWRITE_JPEG_QUALITY, 60])
_, buffer = cv2.imencode('.jpg', frame_rgb, [cv2.IMWRITE_JPEG_QUALITY, 50])
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
time.sleep(0.05) # ~20 FPS when not recording
time.sleep(0.1) # 10 FPS when not recording
except Exception as e:
print(f"[Camera] Stream error: {e}")
@@ -1102,13 +1245,40 @@ async def stream_camera(camera_name: str):
)
@app.on_event("startup")
async def startup_event():
"""Initialize pedal monitoring on startup."""
global pedal_thread
if PEDAL_ENABLED and PEDAL_AVAILABLE:
print(f"[Startup] Starting pedal monitor...")
stop_pedal_flag.clear()
pedal_thread = threading.Thread(target=pedal_reader, daemon=True)
pedal_thread.start()
print(f"[Startup] Pedal monitor started")
else:
if not PEDAL_ENABLED:
print(f"[Startup] Pedal disabled in configuration")
elif not PEDAL_AVAILABLE:
print(f"[Startup] Pedal unavailable (evdev not installed)")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown."""
global stop_recording_flag
global stop_recording_flag, stop_pedal_flag
print(f"[Shutdown] Stopping recording...")
stop_recording_flag.set()
print(f"[Shutdown] Stopping pedal monitor...")
stop_pedal_flag.set()
if pedal_thread and pedal_thread.is_alive():
pedal_thread.join(timeout=2)
print(f"[Shutdown] Cleaning up robots...")
cleanup_robot_systems()
print(f"[Shutdown] Complete")
if __name__ == "__main__":