diff --git a/docs/source/hilserl.mdx b/docs/source/hilserl.mdx index 2f73d0964..7c63c41a8 100644 --- a/docs/source/hilserl.mdx +++ b/docs/source/hilserl.mdx @@ -4,7 +4,13 @@ In this tutorial you will go through the full Human-in-the-Loop Sample-Efficient HIL-SERL is a sample-efficient reinforcement learning algorithm that combines human demonstrations with online learning and human interventions. The approach starts from a small set of human demonstrations, uses them to train a reward classifier, and then employs an actor-learner architecture where humans can intervene during policy execution to guide exploration and correct unsafe behaviors. In this tutorial, you'll use a gamepad to provide interventions and control the robot during the learning process. -It combines three key ingredients: 1. **Offline demonstrations & reward classifier:** a handful of human-teleop episodes plus a vision-based success detector give the policy a shaped starting point. 2. **On-robot actor / learner loop with human interventions:** a distributed Soft Actor Critic (SAC) learner updates the policy while an actor explores on the physical robot; the human can jump in at any time to correct dangerous or unproductive behaviour. 3. **Safety & efficiency tools:** joint/end-effector (EE) bounds, crop region of interest (ROI) preprocessing and WandB monitoring keep the data useful and the hardware safe. +It combines three key ingredients: + +1. **Offline demonstrations & reward classifier:** a handful of human-teleop episodes plus a vision-based success detector give the policy a shaped starting point. + +2. **On-robot actor / learner loop with human interventions:** a distributed Soft Actor Critic (SAC) learner updates the policy while an actor explores on the physical robot; the human can jump in at any time to correct dangerous or unproductive behaviour. + +3. **Safety & efficiency tools:** joint/end-effector (EE) bounds, crop region of interest (ROI) preprocessing and WandB monitoring keep the data useful and the hardware safe. Together these elements let HIL-SERL reach near-perfect task success and faster cycle times than imitation-only baselines. @@ -56,30 +62,243 @@ pip install -e ".[hilserl]" ### Understanding Configuration -The training process begins with proper configuration for the HILSerl environment. The configuration class of interest is `HILSerlRobotEnvConfig` in `lerobot/envs/configs.py`. Which is defined as: +The training process begins with proper configuration for the HILSerl environment. The main configuration class is `GymManipulatorConfig` in `lerobot/scripts/rl/gym_manipulator.py`, which contains nested `HILSerlRobotEnvConfig` and `DatasetConfig`. The configuration is organized into focused, nested sub-configs: ```python +class GymManipulatorConfig: + env: HILSerlRobotEnvConfig # Environment configuration (nested) + dataset: DatasetConfig # Dataset recording/replay configuration (nested) + mode: str | None = None # "record", "replay", or None (for training) + device: str = "cpu" # Compute device + class HILSerlRobotEnvConfig(EnvConfig): robot: RobotConfig | None = None # Main robot agent (defined in `lerobot/robots`) - teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm, (defined in `lerobot/teleoperators`) - wrapper: EnvTransformConfig | None = None # Environment wrapper settings; check `lerobot/scripts/server/gym_manipulator.py` - fps: int = 10 # Control frequency + teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm + processor: HILSerlProcessorConfig # Processing pipeline configuration (nested) name: str = "real_robot" # Environment name - mode: str = None # "record", "replay", or None (for training) - repo_id: str | None = None # LeRobot dataset repository ID - dataset_root: str | None = None # Local dataset root (optional) - task: str = "" # Task identifier - num_episodes: int = 10 # Number of episodes for recording - episode: int = 0 # episode index for replay - device: str = "cuda" # Compute device - push_to_hub: bool = True # Whether to push the recorded datasets to Hub - pretrained_policy_name_or_path: str | None = None # For policy loading - reward_classifier_pretrained_path: str | None = None # For reward model - number_of_steps_after_success: int = 0 # For reward classifier, collect more positive examples after a success to train a classifier + task: str | None = None # Task identifier + fps: int = 10 # Control frequency + +# Nested processor configuration +class HILSerlProcessorConfig: + control_mode: str = "gamepad" # Control mode + observation: ObservationConfig | None = None # Observation processing settings + image_preprocessing: ImagePreprocessingConfig | None = None # Image crop/resize settings + gripper: GripperConfig | None = None # Gripper control and penalty settings + reset: ResetConfig | None = None # Environment reset and timing settings + inverse_kinematics: InverseKinematicsConfig | None = None # IK processing settings + reward_classifier: RewardClassifierConfig | None = None # Reward classifier settings + max_gripper_pos: float | None = 100.0 # Maximum gripper position + +# Sub-configuration classes +class ObservationConfig: + add_joint_velocity_to_observation: bool = False # Add joint velocities to state + add_current_to_observation: bool = False # Add motor currents to state + add_ee_pose_to_observation: bool = False # Add end-effector pose to state + display_cameras: bool = False # Display camera feeds during execution + +class ImagePreprocessingConfig: + crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None # Image cropping parameters + resize_size: tuple[int, int] | None = None # Target image size + +class GripperConfig: + use_gripper: bool = True # Enable gripper control + gripper_penalty: float = 0.0 # Penalty for inappropriate gripper usage + gripper_penalty_in_reward: bool = False # Include gripper penalty in reward + +class ResetConfig: + fixed_reset_joint_positions: Any | None = None # Joint positions for reset + reset_time_s: float = 5.0 # Time to wait during reset + control_time_s: float = 20.0 # Maximum episode duration + terminate_on_success: bool = True # Whether to terminate episodes on success detection + +class InverseKinematicsConfig: + urdf_path: str | None = None # Path to robot URDF file + target_frame_name: str | None = None # End-effector frame name + end_effector_bounds: dict[str, list[float]] | None = None # EE workspace bounds + end_effector_step_sizes: dict[str, float] | None = None # EE step sizes per axis + +class RewardClassifierConfig: + pretrained_path: str | None = None # Path to pretrained reward classifier + success_threshold: float = 0.5 # Success detection threshold + success_reward: float = 1.0 # Reward value for successful episodes + +# Dataset configuration +class DatasetConfig: + repo_id: str # LeRobot dataset repository ID + dataset_root: str # Local dataset root directory + task: str # Task identifier + num_episodes: int # Number of episodes for recording + episode: int # Episode index for replay + push_to_hub: bool # Whether to push datasets to Hub ``` +### Processor Pipeline Architecture + +HIL-SERL uses a modular processor pipeline architecture that processes robot observations and actions through a series of composable steps. The pipeline is divided into two main components: + +#### Environment Processor Pipeline + +The environment processor (`env_processor`) handles incoming observations and environment state: + +1. **VanillaObservationProcessor**: Converts raw robot observations into standardized format +2. **JointVelocityProcessor** (optional): Adds joint velocity information to observations +3. **MotorCurrentProcessor** (optional): Adds motor current readings to observations +4. **ForwardKinematicsJointsToEE** (optional): Computes end-effector pose from joint positions +5. **ImageCropResizeProcessor** (optional): Crops and resizes camera images +6. **TimeLimitProcessor** (optional): Enforces episode time limits +7. **GripperPenaltyProcessor** (optional): Applies penalties for inappropriate gripper usage +8. **RewardClassifierProcessor** (optional): Automated reward detection using vision models +9. **ToBatchProcessor**: Converts data to batch format for neural network processing +10. **DeviceProcessor**: Moves data to the specified compute device (CPU/GPU) + +#### Action Processor Pipeline + +The action processor (`action_processor`) handles outgoing actions and human interventions: + +1. **AddTeleopActionAsComplimentaryData**: Captures teleoperator actions for logging +2. **AddTeleopEventsAsInfo**: Records intervention events and episode control signals +3. **AddRobotObservationAsComplimentaryData**: Stores raw robot state for processing +4. **InterventionActionProcessor**: Handles human interventions and episode termination +5. **Inverse Kinematics Pipeline** (when enabled): + - **MapDeltaActionToRobotAction**: Converts delta actions to robot action format + - **EEReferenceAndDelta**: Computes end-effector reference and delta movements + - **EEBoundsAndSafety**: Enforces workspace safety bounds + - **InverseKinematicsEEToJoints**: Converts end-effector actions to joint targets + - **GripperVelocityToJoint**: Handles gripper control commands + +#### Configuration Examples + +**Basic Observation Processing**: + +```json +{ + "env": { + "processor": { + "observation": { + "add_joint_velocity_to_observation": true, + "add_current_to_observation": false, + "display_cameras": false + } + } + } +} +``` + +**Image Processing**: + +```json +{ + "env": { + "processor": { + "image_preprocessing": { + "crop_params_dict": { + "observation.images.front": [180, 250, 120, 150], + "observation.images.side": [180, 207, 180, 200] + }, + "resize_size": [128, 128] + } + } + } +} +``` + +**Inverse Kinematics Setup**: + +```json +{ + "env": { + "processor": { + "inverse_kinematics": { + "urdf_path": "path/to/robot.urdf", + "target_frame_name": "end_effector", + "end_effector_bounds": { + "min": [0.16, -0.08, 0.03], + "max": [0.24, 0.2, 0.1] + }, + "end_effector_step_sizes": { + "x": 0.02, + "y": 0.02, + "z": 0.02 + } + } + } + } +} +``` + +### Advanced Observation Processing + +The HIL-SERL framework supports additional observation processing features that can improve policy learning: + +#### Joint Velocity Processing + +Enable joint velocity estimation to provide the policy with motion information: + +```json +{ + "env": { + "processor": { + "observation": { + "add_joint_velocity_to_observation": true + } + } + } +} +``` + +This processor: + +- Estimates joint velocities using finite differences between consecutive joint position readings +- Adds velocity information to the observation state vector +- Useful for policies that need motion awareness for dynamic tasks + +#### Motor Current Processing + +Monitor motor currents to detect contact forces and load conditions: + +```json +{ + "env": { + "processor": { + "observation": { + "add_current_to_observation": true + } + } + } +} +``` + +This processor: + +- Reads motor current values from the robot's control system +- Adds current measurements to the observation state vector +- Helps detect contact events, object weights, and mechanical resistance +- Useful for contact-rich manipulation tasks + +#### Combined Observation Processing + +You can enable multiple observation processing features simultaneously: + +```json +{ + "env": { + "processor": { + "observation": { + "add_joint_velocity_to_observation": true, + "add_current_to_observation": true, + "add_ee_pose_to_observation": false, + "display_cameras": false + } + } + } +} +``` + +**Note**: Enabling additional observation features increases the state space dimensionality, which may require adjusting your policy network architecture and potentially collecting more training data. + ### Finding Robot Workspace Bounds Before collecting demonstrations, you need to determine the appropriate operational bounds for your robot. @@ -130,22 +349,56 @@ With the bounds defined, you can safely collect demonstrations for training. Tra Create a configuration file for recording demonstrations (or edit an existing one like [env_config_so100.json](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_so100.json)): -1. Set `mode` to `"record"` -2. Specify a unique `repo_id` for your dataset (e.g., "username/task_name") -3. Set `num_episodes` to the number of demonstrations you want to collect -4. Set `crop_params_dict` to `null` initially (we'll determine crops later) -5. Configure `robot`, `cameras`, and other hardware settings +1. Set `mode` to `"record"` at the root level +2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name") +3. Set `num_episodes` in the `dataset` section to the number of demonstrations you want to collect +4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later) +5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section Example configuration section: ```json -"mode": "record", -"repo_id": "username/pick_lift_cube", -"dataset_root": null, -"task": "pick_and_lift", -"num_episodes": 15, -"episode": 0, -"push_to_hub": true +{ + "env": { + "type": "gym_manipulator", + "name": "real_robot", + "fps": 10, + "processor": { + "control_mode": "gamepad", + "observation": { + "display_cameras": false + }, + "image_preprocessing": { + "crop_params_dict": {}, + "resize_size": [128, 128] + }, + "gripper": { + "use_gripper": true, + "gripper_penalty": 0.0 + }, + "reset": { + "reset_time_s": 5.0, + "control_time_s": 20.0 + } + }, + "robot": { + // ... robot configuration ... + }, + "teleop": { + // ... teleoperator configuration ... + } + }, + "dataset": { + "repo_id": "username/pick_lift_cube", + "dataset_root": null, + "task": "pick_and_lift", + "num_episodes": 15, + "episode": 0, + "push_to_hub": true + }, + "mode": "record", + "device": "cpu" +} ``` ### Using a Teleoperation Device @@ -191,10 +444,20 @@ The gamepad provides a very convenient way to control the robot and the episode To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and define the `teleop` section in the configuration file. ```json +{ + "env": { "teleop": { - "type": "gamepad", - "use_gripper": true + "type": "gamepad", + "use_gripper": true }, + "processor": { + "control_mode": "gamepad", + "gripper": { + "use_gripper": true + } + } + } +} ```

@@ -216,11 +479,21 @@ The SO101 leader arm has reduced gears that allows it to move and track the foll To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file. ```json +{ + "env": { "teleop": { - "type": "so101_leader", - "port": "/dev/tty.usbmodem585A0077921", # check your port number - "use_degrees": true + "type": "so101_leader", + "port": "/dev/tty.usbmodem585A0077921", + "use_degrees": true }, + "processor": { + "control_mode": "leader", + "gripper": { + "use_gripper": true + } + } + } +} ``` In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure. @@ -251,7 +524,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/e During recording: -1. The robot will reset to the initial position defined in the configuration file `fixed_reset_joint_positions` +1. The robot will reset to the initial position defined in the configuration file `env.processor.reset.fixed_reset_joint_positions` 2. Complete the task successfully 3. The episode ends with a reward of 1 when you press the "success" button 4. If the time limit is reached, or the fail button is pressed, the episode ends with a reward of 0 @@ -310,11 +583,19 @@ observation.images.front: [180, 250, 120, 150] Add these crop parameters to your training configuration: ```json -"crop_params_dict": { - "observation.images.side": [180, 207, 180, 200], - "observation.images.front": [180, 250, 120, 150] -}, -"resize_size": [128, 128] +{ + "env": { + "processor": { + "image_preprocessing": { + "crop_params_dict": { + "observation.images.side": [180, 207, 180, 200], + "observation.images.front": [180, 250, 120, 150] + }, + "resize_size": [128, 128] + } + } + } +} ``` **Recommended image resolution** @@ -343,26 +624,52 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r **Key Parameters for Data Collection** -- **mode**: set it to `"record"` to collect a dataset -- **repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub -- **num_episodes**: Number of episodes to record -- **number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected -- **fps**: Number of frames per second to record -- **push_to_hub**: Whether to push the dataset to the hub +- **mode**: set it to `"record"` to collect a dataset (at root level) +- **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub +- **dataset.num_episodes**: Number of episodes to record +- **env.processor.reset.terminate_on_success**: Whether to automatically terminate episodes when success is detected (default: `true`) +- **env.fps**: Number of frames per second to record +- **dataset.push_to_hub**: Whether to push the dataset to the hub -The `number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier. +The `env.processor.reset.terminate_on_success` parameter allows you to control episode termination behavior. When set to `false`, episodes will continue even after success is detected, allowing you to collect more positive examples with the reward=1 label. This is crucial for training reward classifiers as it provides more success state examples in your dataset. When set to `true` (default), episodes terminate immediately upon success detection. + +**Important**: For reward classifier training, set `terminate_on_success: false` to collect sufficient positive examples. For regular HIL-SERL training, keep it as `true` to enable automatic episode termination when the task is completed successfully. Example configuration section for data collection: ```json { + "env": { + "type": "gym_manipulator", + "name": "real_robot", + "fps": 10, + "processor": { + "reset": { + "reset_time_s": 5.0, + "control_time_s": 20.0, + "terminate_on_success": false + }, + "gripper": { + "use_gripper": true + } + }, + "robot": { + // ... robot configuration ... + }, + "teleop": { + // ... teleoperator configuration ... + } + }, + "dataset": { + "repo_id": "hf_username/dataset_name", + "dataset_root": "data/your_dataset", + "task": "reward_classifier_task", + "num_episodes": 20, + "episode": 0, + "push_to_hub": true + }, "mode": "record", - "repo_id": "hf_username/dataset_name", - "dataset_root": "data/your_dataset", - "num_episodes": 20, - "push_to_hub": true, - "fps": 10, - "number_of_steps_after_success": 15 + "device": "cpu" } ``` @@ -421,9 +728,17 @@ To use your trained reward classifier, configure the `HILSerlRobotEnvConfig` to ```python -env_config = HILSerlRobotEnvConfig( - reward_classifier_pretrained_path="path_to_your_pretrained_trained_model", - # Other environment parameters +config = GymManipulatorConfig( + env=HILSerlRobotEnvConfig( + processor=HILSerlProcessorConfig( + reward_classifier=RewardClassifierConfig( + pretrained_path="path_to_your_pretrained_trained_model" + ) + ), + # Other environment parameters + ), + dataset=DatasetConfig(...), + mode=None # For training ) ``` @@ -432,7 +747,18 @@ or set the argument in the json config file. ```json { - "reward_classifier_pretrained_path": "path_to_your_pretrained_model" + "env": { + "processor": { + "reward_classifier": { + "pretrained_path": "path_to_your_pretrained_model", + "success_threshold": 0.7, + "success_reward": 1.0 + }, + "reset": { + "terminate_on_success": true + } + } + } } ``` diff --git a/docs/source/hilserl_sim.mdx b/docs/source/hilserl_sim.mdx index c739be835..bbb0cc6f9 100644 --- a/docs/source/hilserl_sim.mdx +++ b/docs/source/hilserl_sim.mdx @@ -32,9 +32,12 @@ To use `gym_hil` with LeRobot, you need to create a configuration file. An examp ```json { - "type": "hil", - "name": "franka_sim", - "task": "PandaPickCubeGamepad-v0", + "env": { + "type": "gym_manipulator", + "name": "gym_hil", + "task": "PandaPickCubeGamepad-v0", + "fps": 10 + }, "device": "cuda" } ``` @@ -45,28 +48,40 @@ Available tasks: - `PandaPickCubeGamepad-v0`: With gamepad control - `PandaPickCubeKeyboard-v0`: With keyboard control -### Gym Wrappers Configuration +### Processor Configuration ```json -"wrapper": { - "gripper_penalty": -0.02, - "control_time_s": 15.0, - "use_gripper": true, - "fixed_reset_joint_positions": [0.0, 0.195, 0.0, -2.43, 0.0, 2.62, 0.785], - "end_effector_step_sizes": { - "x": 0.025, - "y": 0.025, - "z": 0.025 - }, - "control_mode": "gamepad" +{ + "env": { + "processor": { + "control_mode": "gamepad", + "gripper": { + "use_gripper": true, + "gripper_penalty": -0.02 + }, + "reset": { + "control_time_s": 15.0, + "fixed_reset_joint_positions": [ + 0.0, 0.195, 0.0, -2.43, 0.0, 2.62, 0.785 + ] + }, + "inverse_kinematics": { + "end_effector_step_sizes": { + "x": 0.025, + "y": 0.025, + "z": 0.025 + } + } } + } +} ``` Important parameters: -- `gripper_penalty`: Penalty for excessive gripper movement -- `use_gripper`: Whether to enable gripper control -- `end_effector_step_sizes`: Size of the steps in the x,y,z axes of the end-effector +- `gripper.gripper_penalty`: Penalty for excessive gripper movement +- `gripper.use_gripper`: Whether to enable gripper control +- `inverse_kinematics.end_effector_step_sizes`: Size of the steps in the x,y,z axes of the end-effector - `control_mode`: Set to `"gamepad"` to use a gamepad controller ## Running with HIL RL of LeRobot @@ -75,39 +90,50 @@ Important parameters: To run the environment, set mode to null: - -```python +```bash python -m lerobot.scripts.rl.gym_manipulator --config_path path/to/gym_hil_env.json ``` - ### Recording a Dataset To collect a dataset, set the mode to `record` whilst defining the repo_id and number of episodes to record: - -```python +```json +{ + "env": { + "type": "gym_manipulator", + "name": "gym_hil", + "task": "PandaPickCubeGamepad-v0" + }, + "dataset": { + "repo_id": "username/sim_dataset", + "dataset_root": null, + "task": "pick_cube", + "num_episodes": 10, + "episode": 0, + "push_to_hub": true + }, + "mode": "record" +} +``` + +```bash python -m lerobot.scripts.rl.gym_manipulator --config_path path/to/gym_hil_env.json ``` - ### Training a Policy To train a policy, checkout the configuration example available [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/train_gym_hil_env.json) and run the actor and learner servers: - -```python +```bash python -m lerobot.scripts.rl.actor --config_path path/to/train_gym_hil_env.json ``` - In a different terminal, run the learner server: - -```python +```bash python -m lerobot.scripts.rl.learner --config_path path/to/train_gym_hil_env.json ``` - The simulation environment provides a safe and repeatable way to develop and test your Human-In-the-Loop reinforcement learning components before deploying to real robots. diff --git a/docs/source/il_sim.mdx b/docs/source/il_sim.mdx index 761e24e0f..6047bf884 100644 --- a/docs/source/il_sim.mdx +++ b/docs/source/il_sim.mdx @@ -24,11 +24,36 @@ pip install -e ".[hilserl]" To use `gym_hil` with LeRobot, you need to use a configuration file. An example config file can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_gym_hil_il.json). -To teleoperate and collect a dataset, we need to modify this config file and you should add your `repo_id` here: `"repo_id": "il_gym",` and `"num_episodes": 30,` and make sure you set `mode` to `record`, "mode": "record". +To teleoperate and collect a dataset, we need to modify this config file. Here's an example configuration for imitation learning data collection: -If you do not have a Nvidia GPU also change `"device": "cuda"` parameter in the config file (for example to `mps` for MacOS). +```json +{ + "env": { + "type": "gym_manipulator", + "name": "gym_hil", + "task": "PandaPickCubeGamepad-v0", + "fps": 10 + }, + "dataset": { + "repo_id": "your_username/il_gym", + "dataset_root": null, + "task": "pick_cube", + "num_episodes": 30, + "episode": 0, + "push_to_hub": true + }, + "mode": "record", + "device": "cuda" +} +``` -By default the config file assumes you use a controller. To use your keyboard please change the envoirment specified at `"task"` in the config file and set it to `"PandaPickCubeKeyboard-v0"`. +Key configuration points: + +- Set your `repo_id` in the `dataset` section: `"repo_id": "your_username/il_gym"` +- Set `num_episodes: 30` to collect 30 demonstration episodes +- Ensure `mode` is set to `"record"` +- If you don't have an NVIDIA GPU, change `"device": "cuda"` to `"mps"` for macOS or `"cpu"` +- To use keyboard instead of gamepad, change `"task"` to `"PandaPickCubeKeyboard-v0"` Then we can run this command to start: @@ -140,9 +165,32 @@ huggingface-cli upload ${HF_USER}/il_sim_test${CKPT} \ ## Evaluate your policy in Sim -To evaluate your policy we have to use the config file that can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/eval_config_gym_hil.json). +To evaluate your policy we have to use a configuration file. An example can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/eval_config_gym_hil.json). -Make sure to replace the `repo_id` with the dataset you trained on, for example `pepijn223/il_sim_dataset` and replace the `pretrained_policy_name_or_path` with your model id, for example `pepijn223/il_sim_model` +Here's an example evaluation configuration: + +```json +{ + "env": { + "type": "gym_manipulator", + "name": "gym_hil", + "task": "PandaPickCubeGamepad-v0", + "fps": 10 + }, + "dataset": { + "repo_id": "your_username/il_sim_dataset", + "dataset_root": null, + "task": "pick_cube" + }, + "pretrained_policy_name_or_path": "your_username/il_sim_model", + "device": "cuda" +} +``` + +Make sure to replace: + +- `repo_id` with the dataset you trained on (e.g., `your_username/il_sim_dataset`) +- `pretrained_policy_name_or_path` with your model ID (e.g., `your_username/il_sim_model`) Then you can run this command to visualize your trained policy diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index 35797c6ed..f71aca70d 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -161,35 +161,73 @@ class XarmEnv(EnvConfig): @dataclass -class VideoRecordConfig: - """Configuration for video recording in ManiSkill environments.""" - - enabled: bool = False - record_dir: str = "videos" - trajectory_name: str = "trajectory" +class ImagePreprocessingConfig: + crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None + resize_size: tuple[int, int] | None = None @dataclass -class EnvTransformConfig: - """Configuration for environment wrappers.""" +class RewardClassifierConfig: + """Configuration for reward classification.""" + + pretrained_path: str | None = None + success_threshold: float = 0.5 + success_reward: float = 1.0 + + +@dataclass +class InverseKinematicsConfig: + """Configuration for inverse kinematics processing.""" + + urdf_path: str | None = None + target_frame_name: str | None = None + end_effector_bounds: dict[str, list[float]] | None = None + end_effector_step_sizes: dict[str, float] | None = None + + +@dataclass +class ObservationConfig: + """Configuration for observation processing.""" - # ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig) - control_mode: str = "gamepad" - display_cameras: bool = False add_joint_velocity_to_observation: bool = False add_current_to_observation: bool = False add_ee_pose_to_observation: bool = False - crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None - resize_size: tuple[int, int] | None = None - control_time_s: float = 20.0 - fixed_reset_joint_positions: Any | None = None - reset_time_s: float = 5.0 + display_cameras: bool = False + + +@dataclass +class GripperConfig: + """Configuration for gripper control and penalties.""" + use_gripper: bool = True - gripper_quantization_threshold: float | None = 0.8 gripper_penalty: float = 0.0 gripper_penalty_in_reward: bool = False +@dataclass +class ResetConfig: + """Configuration for environment reset behavior.""" + + fixed_reset_joint_positions: Any | None = None + reset_time_s: float = 5.0 + control_time_s: float = 20.0 + terminate_on_success: bool = True + + +@dataclass +class HILSerlProcessorConfig: + """Configuration for environment processing pipeline.""" + + control_mode: str = "gamepad" + observation: ObservationConfig | None = None + image_preprocessing: ImagePreprocessingConfig | None = None + gripper: GripperConfig | None = None + reset: ResetConfig | None = None + inverse_kinematics: InverseKinematicsConfig | None = None + reward_classifier: RewardClassifierConfig | None = None + max_gripper_pos: float | None = 100.0 + + @EnvConfig.register_subclass(name="gym_manipulator") @dataclass class HILSerlRobotEnvConfig(EnvConfig): @@ -197,77 +235,10 @@ class HILSerlRobotEnvConfig(EnvConfig): robot: RobotConfig | None = None teleop: TeleoperatorConfig | None = None - wrapper: EnvTransformConfig | None = None - fps: int = 10 + processor: HILSerlProcessorConfig = field(default_factory=HILSerlProcessorConfig) + name: str = "real_robot" - mode: str | None = None # Either "record", "replay", None - repo_id: str | None = None - dataset_root: str | None = None - task: str | None = "" - num_episodes: int = 10 # only for record mode - episode: int = 0 - device: str = "cuda" - push_to_hub: bool = True - pretrained_policy_name_or_path: str | None = None - reward_classifier_pretrained_path: str | None = None - # For the reward classifier, to record more positive examples after a success - number_of_steps_after_success: int = 0 @property def gym_kwargs(self) -> dict: return {} - - -@EnvConfig.register_subclass("hil") -@dataclass -class HILEnvConfig(EnvConfig): - """Configuration for the HIL environment.""" - - name: str = "PandaPickCube" - task: str | None = "PandaPickCubeKeyboard-v0" - use_viewer: bool = True - gripper_penalty: float = 0.0 - use_gamepad: bool = True - state_dim: int = 18 - action_dim: int = 4 - fps: int = 100 - episode_length: int = 100 - video_record: VideoRecordConfig = field(default_factory=VideoRecordConfig) - features: dict[str, PolicyFeature] = field( - default_factory=lambda: { - "action": PolicyFeature(type=FeatureType.ACTION, shape=(4,)), - "observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128)), - "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(18,)), - } - ) - features_map: dict[str, str] = field( - default_factory=lambda: { - "action": ACTION, - "observation.image": OBS_IMAGE, - "observation.state": OBS_STATE, - } - ) - ################# args from hilserlrobotenv - reward_classifier_pretrained_path: str | None = None - robot_config: RobotConfig | None = None - teleop_config: TeleoperatorConfig | None = None - wrapper: EnvTransformConfig | None = None - mode: str | None = None # Either "record", "replay", None - repo_id: str | None = None - dataset_root: str | None = None - num_episodes: int = 10 # only for record mode - episode: int = 0 - device: str = "cuda" - push_to_hub: bool = True - pretrained_policy_name_or_path: str | None = None - # For the reward classifier, to record more positive examples after a success - number_of_steps_after_success: int = 0 - ############################ - - @property - def gym_kwargs(self) -> dict: - return { - "use_viewer": self.use_viewer, - "use_gamepad": self.use_gamepad, - "gripper_penalty": self.gripper_penalty, - } diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 304229292..979f7ebc4 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -15,7 +15,20 @@ # limitations under the License. from .batch_processor import ToBatchProcessor +from .delta_action_processor import MapDeltaActionToRobotAction from .device_processor import DeviceProcessor +from .hil_processor import ( + AddTeleopActionAsComplimentaryData, + AddTeleopEventsAsInfo, + GripperPenaltyProcessor, + ImageCropResizeProcessor, + InterventionActionProcessor, + Numpy2TorchActionProcessor, + RewardClassifierProcessor, + TimeLimitProcessor, + Torch2NumpyActionProcessor, +) +from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats from .observation_processor import VanillaObservationProcessor from .pipeline import ( @@ -37,11 +50,20 @@ from .tokenizer_processor import TokenizerProcessor __all__ = [ "ActionProcessor", + "AddTeleopActionAsComplimentaryData", + "AddTeleopEventsAsInfo", "DeviceProcessor", "DoneProcessor", + "MapDeltaActionToRobotAction", "EnvTransition", + "GripperPenaltyProcessor", "IdentityProcessor", + "ImageCropResizeProcessor", "InfoProcessor", + "InterventionActionProcessor", + "JointVelocityProcessor", + "MapDeltaActionToRobotAction", + "MotorCurrentProcessor", "NormalizerProcessor", "UnnormalizerProcessor", "hotswap_stats", @@ -49,10 +71,14 @@ __all__ = [ "ProcessorStep", "ProcessorStepRegistry", "RenameProcessor", + "RewardClassifierProcessor", "RewardProcessor", "RobotProcessor", "ToBatchProcessor", "TokenizerProcessor", + "TimeLimitProcessor", + "Numpy2TorchActionProcessor", + "Torch2NumpyActionProcessor", "TransitionKey", "TruncatedProcessor", "VanillaObservationProcessor", diff --git a/src/lerobot/processor/delta_action_processor.py b/src/lerobot/processor/delta_action_processor.py new file mode 100644 index 000000000..a575bb07a --- /dev/null +++ b/src/lerobot/processor/delta_action_processor.py @@ -0,0 +1,125 @@ +# !/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from torch import Tensor + +from lerobot.configs.types import FeatureType, PolicyFeature +from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry + + +@ProcessorStepRegistry.register("map_delta_action_to_robot_action") +@dataclass +class MapDeltaActionToRobotAction(ActionProcessor): + """ + Map delta actions from teleoperators (gamepad, keyboard) to robot target actions + for use with inverse kinematics processors. + + Expected input ACTION keys: + { + "action.delta_x": float, + "action.delta_y": float, + "action.delta_z": float, + "action.gripper": float (optional), + } + + Output ACTION keys: + { + "action.enabled": bool, + "action.target_x": float, + "action.target_y": float, + "action.target_z": float, + "action.target_wx": float, + "action.target_wy": float, + "action.target_wz": float, + "action.gripper": float, + } + """ + + # Scale factors for delta movements + position_scale: float = 1.0 + rotation_scale: float = 0.0 # No rotation deltas for gamepad/keyboard + gripper_deadzone: float = 0.1 # Threshold for gripper activation + _prev_enabled: bool = field(default=False, init=False, repr=False) + + def action(self, action: dict | Tensor | None) -> dict: + if action is None: + return {} + + # NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy + # TODO (maractingi): changing this target_xyz naming convention from the teleop_devices + if isinstance(action, dict): + delta_x = action.pop("action.delta_x", 0.0) + delta_y = action.pop("action.delta_y", 0.0) + delta_z = action.pop("action.delta_z", 0.0) + gripper = action.pop("action.gripper", 1.0) # Default to "stay" (1.0) + else: + delta_x = action[0].item() + delta_y = action[1].item() + delta_z = action[2].item() + gripper = action[3].item() + + # Determine if the teleoperator is actively providing input + # Consider enabled if any significant movement delta is detected + position_magnitude = abs(delta_x) + abs(delta_y) + abs(delta_z) + enabled = position_magnitude > 1e-6 # Small threshold to avoid noise + + # Scale the deltas appropriately + scaled_delta_x = float(delta_x) * self.position_scale + scaled_delta_y = float(delta_y) * self.position_scale + scaled_delta_z = float(delta_z) * self.position_scale + + # For gamepad/keyboard, we don't have rotation input, so set to 0 + # These could be extended in the future for more sophisticated teleoperators + target_wx = 0.0 + target_wy = 0.0 + target_wz = 0.0 + + # Update action with robot target format + action = { + "action.enabled": enabled, + "action.target_x": scaled_delta_x, + "action.target_y": scaled_delta_y, + "action.target_z": scaled_delta_z, + "action.target_wx": target_wx, + "action.target_wy": target_wy, + "action.target_wz": target_wz, + "action.gripper": float(gripper), + } + + self._prev_enabled = enabled + return action + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + """Transform features to match output format.""" + # Update features to reflect the new action format + features.update( + { + "action.enabled": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_x": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_y": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_z": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_wx": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_wy": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.target_wz": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + "action.gripper": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), + } + ) + return features + + def reset(self): + self._prev_enabled = False diff --git a/src/lerobot/processor/hil_processor.py b/src/lerobot/processor/hil_processor.py new file mode 100644 index 000000000..9e31548b2 --- /dev/null +++ b/src/lerobot/processor/hil_processor.py @@ -0,0 +1,418 @@ +import time +from dataclasses import dataclass +from typing import Any + +import numpy as np +import torch +import torchvision.transforms.functional as F # noqa: N812 + +from lerobot.configs.types import PolicyFeature +from lerobot.processor.pipeline import ( + ActionProcessor, + ComplementaryDataProcessor, + EnvTransition, + InfoProcessor, + ObservationProcessor, + ProcessorStepRegistry, + TransitionKey, +) +from lerobot.teleoperators.teleoperator import Teleoperator +from lerobot.teleoperators.utils import TeleopEvents + +GRIPPER_KEY = "gripper" + + +@ProcessorStepRegistry.register("add_teleop_action_as_complementary_data") +@dataclass +class AddTeleopActionAsComplimentaryData(ComplementaryDataProcessor): + """Add teleoperator action to transition complementary data.""" + + teleop_device: Teleoperator + + def complementary_data(self, complementary_data: dict | None) -> dict: + complementary_data = {} if complementary_data is None else dict(complementary_data) + complementary_data["teleop_action"] = self.teleop_device.get_action() + return complementary_data + + +@ProcessorStepRegistry.register("add_teleop_action_as_info") +@dataclass +class AddTeleopEventsAsInfo(InfoProcessor): + """Add teleoperator control events to transition info.""" + + teleop_device: Teleoperator + + def info(self, info: dict | None) -> dict: + info = {} if info is None else dict(info) + teleop_events = getattr(self.teleop_device, "get_teleop_events", lambda: {})() + info.update(teleop_events) + return info + + +@ProcessorStepRegistry.register("torch2numpy_action_processor") +@dataclass +class Torch2NumpyActionProcessor(ActionProcessor): + """Convert PyTorch tensor actions to NumPy arrays.""" + + squeeze_batch_dim: bool = True + + def action(self, action: torch.Tensor | None) -> np.ndarray | None: + if action is None: + return None + + if not isinstance(action, torch.Tensor): + raise TypeError( + f"Expected torch.Tensor or None, got {type(action).__name__}. " + "Use appropriate processor for non-tensor actions." + ) + + numpy_action = action.detach().cpu().numpy() + + # Remove batch dimensions but preserve action dimensions + # Only squeeze if there's a batch dimension (first dim == 1) + if ( + self.squeeze_batch_dim + and numpy_action.shape + and len(numpy_action.shape) > 1 + and numpy_action.shape[0] == 1 + ): + numpy_action = numpy_action.squeeze(0) + + return numpy_action + + +@ProcessorStepRegistry.register("numpy2torch_action_processor") +@dataclass +class Numpy2TorchActionProcessor(ActionProcessor): + """Convert NumPy array action to PyTorch tensor.""" + + def action(self, action: np.ndarray | None) -> torch.Tensor | None: + if action is None: + return None + if not isinstance(action, np.ndarray): + raise TypeError( + f"Expected np.ndarray or None, got {type(action).__name__}. " + "Use appropriate processor for non-tensor actions." + ) + torch_action = torch.from_numpy(action) + return torch_action + + +@ProcessorStepRegistry.register("image_crop_resize_processor") +@dataclass +class ImageCropResizeProcessor(ObservationProcessor): + """Crop and resize image observations.""" + + crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None + resize_size: tuple[int, int] | None = None + + def observation(self, observation: dict | None) -> dict | None: + if observation is None: + return None + + if self.resize_size is None and not self.crop_params_dict: + return observation + + new_observation = dict(observation) + + # Process all image keys in the observation + for key in observation: + if "image" not in key: + continue + + image = observation[key] + device = image.device + # NOTE (maractingi): No mps kernel for crop and resize, so we need to move to cpu + if device.type == "mps": + image = image.cpu() + # Crop if crop params are provided for this key + if self.crop_params_dict is not None and key in self.crop_params_dict: + crop_params = self.crop_params_dict[key] + image = F.crop(image, *crop_params) + if self.resize_size is not None: + image = F.resize(image, self.resize_size) + image = image.clamp(0.0, 1.0) + new_observation[key] = image.to(device) + + return new_observation + + def get_config(self) -> dict[str, Any]: + return { + "crop_params_dict": self.crop_params_dict, + "resize_size": self.resize_size, + } + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + if self.resize_size is None: + return features + for key in features: + if "image" in key: + features[key] = PolicyFeature(type=features[key].type, shape=self.resize_size) + return features + + +@dataclass +@ProcessorStepRegistry.register("time_limit_processor") +class TimeLimitProcessor: + """Track episode steps and enforce time limits.""" + + max_episode_steps: int + current_step: int = 0 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + truncated = transition.get(TransitionKey.TRUNCATED) + if truncated is None: + return transition + + self.current_step += 1 + if self.current_step >= self.max_episode_steps: + truncated = True + new_transition = transition.copy() + new_transition[TransitionKey.TRUNCATED] = truncated + return new_transition + + def get_config(self) -> dict[str, Any]: + return { + "max_episode_steps": self.max_episode_steps, + } + + def state_dict(self) -> dict[str, torch.Tensor]: + return {} + + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + pass + + def reset(self) -> None: + self.current_step = 0 + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + return features + + +@dataclass +@ProcessorStepRegistry.register("gripper_penalty_processor") +class GripperPenaltyProcessor: + """Apply penalty for inappropriate gripper usage.""" + + penalty: float = -0.01 + max_gripper_pos: float = 30.0 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + """Calculate gripper penalty and add to complementary data.""" + action = transition.get(TransitionKey.ACTION) + complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA) + + if complementary_data is None or action is None: + return transition + + current_gripper_pos = complementary_data.get("raw_joint_positions", None).get(GRIPPER_KEY, None) + if current_gripper_pos is None: + return transition + + gripper_action = action[f"action.{GRIPPER_KEY}.pos"] + gripper_action_normalized = gripper_action / self.max_gripper_pos + + # Normalize gripper state and action + gripper_state_normalized = current_gripper_pos / self.max_gripper_pos + + # Calculate penalty boolean as in original + gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or ( + gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5 + ) + + gripper_penalty = self.penalty * int(gripper_penalty_bool) + + # Add penalty information to complementary data + complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + + # Create new complementary data with penalty info + new_complementary_data = dict(complementary_data) + new_complementary_data["discrete_penalty"] = gripper_penalty + + # Create new transition with updated complementary data + new_transition = transition.copy() + existing_comp_data = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + existing_comp_data.update(new_complementary_data) + new_transition[TransitionKey.COMPLEMENTARY_DATA] = existing_comp_data # type: ignore[misc] + return new_transition + + def get_config(self) -> dict[str, Any]: + return { + "penalty": self.penalty, + "max_gripper_pos": self.max_gripper_pos, + } + + def state_dict(self) -> dict[str, torch.Tensor]: + return {} + + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + pass + + def reset(self) -> None: + """Reset the processor state.""" + self.last_gripper_state = None + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + return features + + +@dataclass +@ProcessorStepRegistry.register("intervention_action_processor") +class InterventionActionProcessor: + """Handle human intervention actions and episode termination.""" + + use_gripper: bool = False + terminate_on_success: bool = True + + def __call__(self, transition: EnvTransition) -> EnvTransition: + action = transition.get(TransitionKey.ACTION) + if action is None: + return transition + + # Get intervention signals from complementary data + info = transition.get(TransitionKey.INFO, {}) + teleop_action = info.get("teleop_action", {}) + is_intervention = info.get(TeleopEvents.IS_INTERVENTION, False) + terminate_episode = info.get(TeleopEvents.TERMINATE_EPISODE, False) + success = info.get(TeleopEvents.SUCCESS, False) + rerecord_episode = info.get(TeleopEvents.RERECORD_EPISODE, False) + + new_transition = transition.copy() + + # Override action if intervention is active + if is_intervention and teleop_action is not None: + if isinstance(teleop_action, dict): + # Convert teleop_action dict to tensor format + action_list = [ + teleop_action.get("action.delta_x", 0.0), + teleop_action.get("action.delta_y", 0.0), + teleop_action.get("action.delta_z", 0.0), + ] + if self.use_gripper: + action_list.append(teleop_action.get("gripper", 1.0)) + elif isinstance(teleop_action, np.ndarray): + action_list = teleop_action.tolist() + else: + action_list = teleop_action + + teleop_action_tensor = torch.tensor(action_list, dtype=action.dtype, device=action.device) + new_transition[TransitionKey.ACTION] = teleop_action_tensor + + # Handle episode termination + new_transition[TransitionKey.DONE] = bool(terminate_episode) or ( + self.terminate_on_success and success + ) + new_transition[TransitionKey.REWARD] = float(success) + + # Update info with intervention metadata + info = new_transition.get(TransitionKey.INFO, {}) + info[TeleopEvents.IS_INTERVENTION] = is_intervention + info[TeleopEvents.RERECORD_EPISODE] = rerecord_episode + info[TeleopEvents.SUCCESS] = success + new_transition[TransitionKey.INFO] = info + + # Update complementary data with teleop action + complementary_data = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + complementary_data["teleop_action"] = new_transition.get(TransitionKey.ACTION) + new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data + + return new_transition + + def get_config(self) -> dict[str, Any]: + return { + "use_gripper": self.use_gripper, + } + + def state_dict(self) -> dict[str, torch.Tensor]: + return {} + + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + pass + + def reset(self) -> None: + pass + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + return features + + +@dataclass +@ProcessorStepRegistry.register("reward_classifier_processor") +class RewardClassifierProcessor: + """Apply reward classification to image observations.""" + + pretrained_path: str | None = None + device: str = "cpu" + success_threshold: float = 0.5 + success_reward: float = 1.0 + terminate_on_success: bool = True + + reward_classifier: Any = None + + def __post_init__(self): + """Initialize the reward classifier after dataclass initialization.""" + if self.pretrained_path is not None: + from lerobot.policies.sac.reward_model.modeling_classifier import Classifier + + self.reward_classifier = Classifier.from_pretrained(self.pretrained_path) + self.reward_classifier.to(self.device) + self.reward_classifier.eval() + + def __call__(self, transition: EnvTransition) -> EnvTransition: + observation = transition.get(TransitionKey.OBSERVATION) + if observation is None or self.reward_classifier is None: + return transition + + # Extract images from observation + images = {key: value for key, value in observation.items() if "image" in key} + + if not images: + return transition + + # Run reward classifier + start_time = time.perf_counter() + with torch.inference_mode(): + success = self.reward_classifier.predict_reward(images, threshold=self.success_threshold) + + classifier_frequency = 1 / (time.perf_counter() - start_time) + + # Calculate reward and termination + reward = transition.get(TransitionKey.REWARD, 0.0) + terminated = transition.get(TransitionKey.DONE, False) + + if success == 1.0: + reward = self.success_reward + if self.terminate_on_success: + terminated = True + + # Update transition + new_transition = transition.copy() + new_transition[TransitionKey.REWARD] = reward + new_transition[TransitionKey.DONE] = terminated + + # Update info with classifier frequency + info = new_transition.get(TransitionKey.INFO, {}) + info["reward_classifier_frequency"] = classifier_frequency + new_transition[TransitionKey.INFO] = info + + return new_transition + + def get_config(self) -> dict[str, Any]: + return { + "device": self.device, + "success_threshold": self.success_threshold, + "success_reward": self.success_reward, + "terminate_on_success": self.terminate_on_success, + } + + def state_dict(self) -> dict[str, torch.Tensor]: + return {} + + def load_state_dict(self, state: dict[str, torch.Tensor]) -> None: + pass + + def reset(self) -> None: + pass + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + return features diff --git a/src/lerobot/processor/joint_observations_processor.py b/src/lerobot/processor/joint_observations_processor.py new file mode 100644 index 000000000..185db4048 --- /dev/null +++ b/src/lerobot/processor/joint_observations_processor.py @@ -0,0 +1,116 @@ +from dataclasses import dataclass +from typing import Any + +import torch + +from lerobot.configs.types import PolicyFeature +from lerobot.processor.pipeline import ( + ObservationProcessor, + ProcessorStepRegistry, +) +from lerobot.robots import Robot + + +@dataclass +@ProcessorStepRegistry.register("joint_velocity_processor") +class JointVelocityProcessor: + """Add joint velocity information to observations.""" + + joint_velocity_limits: float = 100.0 + dt: float = 1.0 / 10 + num_dof: int | None = None + + last_joint_positions: torch.Tensor | None = None + + def observation(self, observation: dict | None) -> dict | None: + if observation is None: + return None + + # Get current joint positions (assuming they're in observation.state) + current_positions = observation.get("observation.state") + if current_positions is None: + return observation + + # Initialize last joint positions if not already set + if self.last_joint_positions is None: + self.last_joint_positions = current_positions.clone() + + # Compute velocities + joint_velocities = (current_positions - self.last_joint_positions) / self.dt + self.last_joint_positions = current_positions.clone() + + # Extend observation with velocities + extended_state = torch.cat([current_positions, joint_velocities], dim=-1) + + # Create new observation dict + new_observation = dict(observation) + new_observation["observation.state"] = extended_state + + return new_observation + + def get_config(self) -> dict[str, Any]: + return { + "joint_velocity_limits": self.joint_velocity_limits, + "dt": self.dt, + } + + def reset(self) -> None: + self.last_joint_positions = None + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + if "observation.state" in features and self.num_dof is not None: + from lerobot.configs.types import PolicyFeature + + original_feature = features["observation.state"] + # Double the shape to account for positions + velocities + new_shape = (original_feature.shape[0] + self.num_dof,) + original_feature.shape[1:] + features["observation.state"] = PolicyFeature(type=original_feature.type, shape=new_shape) + return features + + +@dataclass +@ProcessorStepRegistry.register("current_processor") +class MotorCurrentProcessor(ObservationProcessor): + """Add motor current information to observations.""" + + robot: Robot | None = None + + def observation(self, observation: dict | None) -> dict | None: + if observation is None: + return None + + # Get current values from robot state + if self.robot is None: + return observation + present_current_dict = self.robot.bus.sync_read("Present_Current") # type: ignore[attr-defined] + motor_currents = torch.tensor( + [present_current_dict[name] for name in self.robot.bus.motors], # type: ignore[attr-defined] + dtype=torch.float32, + ).unsqueeze(0) + + current_state = observation.get("observation.state") + if current_state is None: + return observation + + extended_state = torch.cat([current_state, motor_currents], dim=-1) + + # Create new observation dict + new_observation = dict(observation) + new_observation["observation.state"] = extended_state + + return new_observation + + def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: + if "observation.state" in features and self.robot is not None: + from lerobot.configs.types import PolicyFeature + + original_feature = features["observation.state"] + # Add motor current dimensions to the original state shape + num_motors = 0 + if hasattr(self.robot, "bus") and hasattr(self.robot.bus, "motors"): # type: ignore[attr-defined] + num_motors = len(self.robot.bus.motors) # type: ignore[attr-defined] + + if num_motors > 0: + new_shape = (original_feature.shape[0] + num_motors,) + original_feature.shape[1:] + features["observation.state"] = PolicyFeature(type=original_feature.type, shape=new_shape) + return features diff --git a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py index ed498557f..6d85507bb 100644 --- a/src/lerobot/robots/so100_follower/robot_kinematic_processor.py +++ b/src/lerobot/robots/so100_follower/robot_kinematic_processor.py @@ -53,6 +53,9 @@ class EEReferenceAndDelta: kinematics: RobotKinematics end_effector_step_sizes: dict motor_names: list[str] + use_latched_reference: bool = ( + True # If True, latch reference on enable; if False, always use current pose + ) reference_ee_pose: np.ndarray | None = field(default=None, init=False, repr=False) _prev_enabled: bool = field(default=False, init=False, repr=False) @@ -69,7 +72,10 @@ class EEReferenceAndDelta: "raw_joint_positions is not in complementary data and is required for EEReferenceAndDelta" ) - q = np.array([float(raw[n]) for n in self.motor_names], dtype=float) + if "reference_joint_positions" in comp: + q = comp["reference_joint_positions"] + else: + q = np.array([float(raw[n]) for n in self.motor_names], dtype=float) # Current pose from FK on measured joints t_curr = self.kinematics.forward_kinematics(q) @@ -85,11 +91,12 @@ class EEReferenceAndDelta: desired = None if enabled: - # Latch a reference at the rising edge; also be defensive if None - if not self._prev_enabled or self.reference_ee_pose is None: - self.reference_ee_pose = t_curr.copy() - - ref = self.reference_ee_pose if self.reference_ee_pose is not None else t_curr + ref = t_curr + if self.use_latched_reference: + # Latched reference mode: latch reference at the rising edge + if not self._prev_enabled or self.reference_ee_pose is None: + self.reference_ee_pose = t_curr.copy() + ref = self.reference_ee_pose if self.reference_ee_pose is not None else t_curr delta_p = np.array( [ @@ -100,7 +107,6 @@ class EEReferenceAndDelta: dtype=float, ) r_abs = Rotation.from_rotvec([wx, wy, wz]).as_matrix() - desired = np.eye(4, dtype=float) desired[:3, :3] = ref[:3, :3] @ r_abs desired[:3, 3] = ref[:3, 3] + delta_p @@ -292,6 +298,8 @@ class InverseKinematicsEEToJoints: else: new_act[f"action.{name}.pos"] = float(q_target[i]) transition[TransitionKey.ACTION] = new_act + if not self.initial_guess_current_joints: + transition[TransitionKey.COMPLEMENTARY_DATA]["reference_joint_positions"] = q_target return transition def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]: @@ -332,6 +340,7 @@ class GripperVelocityToJoint: speed_factor: float = 20.0 clip_min: float = 0.0 clip_max: float = 100.0 + discrete_gripper: bool = False def __call__(self, transition: EnvTransition) -> EnvTransition: obs = transition.get(TransitionKey.OBSERVATION) or {} @@ -347,6 +356,15 @@ class GripperVelocityToJoint: transition[TransitionKey.ACTION] = new_act return transition + if self.discrete_gripper: + # Discrete gripper actions are in [0, 1, 2] + # 0: open, 1: close, 2: stay + # We need to shift them to [-1, 0, 1] and then scale them to clip_max + gripper_action = act.get("action.gripper", 1.0) + gripper_action = gripper_action - 1.0 + gripper_action *= self.clip_max + act["action.gripper"] = gripper_action + # Get current gripper position from complementary data raw = comp.get("raw_joint_positions") or {} curr_pos = float(raw.get("gripper")) diff --git a/src/lerobot/scripts/rl/actor.py b/src/lerobot/scripts/rl/actor.py index 1c8f9286b..5f44d3c5f 100644 --- a/src/lerobot/scripts/rl/actor.py +++ b/src/lerobot/scripts/rl/actor.py @@ -62,9 +62,16 @@ from lerobot.configs import parser from lerobot.configs.train import TrainRLServerPipelineConfig from lerobot.policies.factory import make_policy from lerobot.policies.sac.modeling_sac import SACPolicy +from lerobot.processor.pipeline import TransitionKey from lerobot.robots import so100_follower # noqa: F401 -from lerobot.scripts.rl.gym_manipulator import make_robot_env +from lerobot.scripts.rl.gym_manipulator import ( + create_transition, + make_processors, + make_robot_env, + step_env_and_process_transition, +) from lerobot.teleoperators import gamepad, so101_leader # noqa: F401 +from lerobot.teleoperators.utils import TeleopEvents from lerobot.transport import services_pb2, services_pb2_grpc from lerobot.transport.utils import ( bytes_to_state_dict, @@ -236,7 +243,8 @@ def act_with_policy( logging.info("make_env online") - online_env = make_robot_env(cfg=cfg.env) + online_env, teleop_device = make_robot_env(cfg=cfg.env) + env_processor, action_processor = make_processors(online_env, teleop_device, cfg.env, cfg.policy.device) set_seed(cfg.seed) device = get_safe_torch_device(cfg.policy.device, log=True) @@ -257,6 +265,12 @@ def act_with_policy( assert isinstance(policy, nn.Module) obs, info = online_env.reset() + env_processor.reset() + action_processor.reset() + + # Process initial observation + transition = create_transition(observation=obs, info=info) + transition = env_processor(transition) # NOTE: For the moment we will solely handle the case of a single environment sum_reward_episode = 0 @@ -274,45 +288,61 @@ def act_with_policy( logging.info("[ACTOR] Shutting down act_with_policy") return - if interaction_step >= cfg.policy.online_step_before_learning: - # Time policy inference and check if it meets FPS requirement - with policy_timer: - action = policy.select_action(batch=obs) - policy_fps = policy_timer.fps_last + observation = transition[TransitionKey.OBSERVATION] - log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step) + # Time policy inference and check if it meets FPS requirement + with policy_timer: + # Extract observation from transition for policy + action = policy.select_action(batch=observation) + policy_fps = policy_timer.fps_last - else: - action = online_env.action_space.sample() + log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step) - next_obs, reward, done, truncated, info = online_env.step(action) + # Use the new step function + new_transition = step_env_and_process_transition( + env=online_env, + transition=transition, + action=action, + env_processor=env_processor, + action_processor=action_processor, + ) + + # Extract values from processed transition + next_observation = new_transition[TransitionKey.OBSERVATION] + executed_action = new_transition[TransitionKey.ACTION] + reward = new_transition[TransitionKey.REWARD] + done = new_transition.get(TransitionKey.DONE, False) + truncated = new_transition.get(TransitionKey.TRUNCATED, False) sum_reward_episode += float(reward) - # Increment total steps counter for intervention rate episode_total_steps += 1 - # NOTE: We override the action if the intervention is True, because the action applied is the intervention action - if "is_intervention" in info and info["is_intervention"]: - # NOTE: The action space for demonstration before hand is with the full action space - # but sometimes for example we want to deactivate the gripper - action = info["action_intervention"] + # Check for intervention from transition info + intervention_info = new_transition[TransitionKey.INFO] + if intervention_info.get(TeleopEvents.IS_INTERVENTION, False): episode_intervention = True - # Increment intervention steps counter episode_intervention_steps += 1 + complementary_info = { + "discrete_penalty": torch.tensor( + [new_transition[TransitionKey.COMPLEMENTARY_DATA].get("discrete_penalty", 0.0)] + ), + } + # Create transition for learner (convert to old format) list_transition_to_send_to_learner.append( Transition( - state=obs, - action=action, + state=observation, + action=executed_action, reward=reward, - next_state=next_obs, + next_state=next_observation, done=done, - truncated=truncated, # TODO: (azouitine) Handle truncation properly - complementary_info=info, + truncated=truncated, + complementary_info=complementary_info, ) ) - # assign obs to the next obs and continue the rollout - obs = next_obs + + # Update transition for next iteration + transition = new_transition if done or truncated: logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}") @@ -347,12 +377,20 @@ def act_with_policy( ) ) - # Reset intervention counters + # Reset intervention counters and environment sum_reward_episode = 0.0 episode_intervention = False episode_intervention_steps = 0 episode_total_steps = 0 + + # Reset environment and processors obs, info = online_env.reset() + env_processor.reset() + action_processor.reset() + + # Process initial observation + transition = create_transition(observation=obs, info=info) + transition = env_processor(transition) if cfg.env.fps is not None: dt_time = time.perf_counter() - start_time diff --git a/src/lerobot/scripts/rl/gym_manipulator.py b/src/lerobot/scripts/rl/gym_manipulator.py index c8be6b7dd..37ff1cc7e 100644 --- a/src/lerobot/scripts/rl/gym_manipulator.py +++ b/src/lerobot/scripts/rl/gym_manipulator.py @@ -14,65 +14,106 @@ # See the License for the specific language governing permissions and # limitations under the License. - -""" -Robot Environment for LeRobot Manipulation Tasks - -This module provides a comprehensive gym-compatible environment for robot manipulation -with support for: -- Multiple robot types (SO100, SO101, Koch and Moss) -- Human intervention via leader-follower control or gamepad - -- End-effector and joint space control -- Image processing (cropping and resizing) - -The environment is built using a composable wrapper pattern where each wrapper -adds specific functionality to the base RobotEnv. - -Example: - env = make_robot_env(cfg) - obs, info = env.reset() - action = policy.select_action(obs) - obs, reward, terminated, truncated, info = env.step(action) -""" - import logging import time -from collections import deque -from collections.abc import Sequence -from threading import Lock -from typing import Annotated, Any +from dataclasses import dataclass +from typing import Any import gymnasium as gym import numpy as np import torch -import torchvision.transforms.functional as F # noqa: N812 from lerobot.cameras import opencv # noqa: F401 from lerobot.configs import parser -from lerobot.envs.configs import EnvConfig -from lerobot.envs.utils import preprocess_observation +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.envs.configs import HILSerlRobotEnvConfig from lerobot.model.kinematics import RobotKinematics +from lerobot.processor import ( + AddTeleopActionAsComplimentaryData, + AddTeleopEventsAsInfo, + DeviceProcessor, + GripperPenaltyProcessor, + ImageCropResizeProcessor, + InterventionActionProcessor, + JointVelocityProcessor, + MapDeltaActionToRobotAction, + MotorCurrentProcessor, + Numpy2TorchActionProcessor, + RewardClassifierProcessor, + RobotProcessor, + TimeLimitProcessor, + ToBatchProcessor, + Torch2NumpyActionProcessor, + VanillaObservationProcessor, +) +from lerobot.processor.pipeline import EnvTransition, TransitionKey from lerobot.robots import ( # noqa: F401 RobotConfig, make_robot_from_config, so100_follower, ) +from lerobot.robots.robot import Robot +from lerobot.robots.so100_follower.robot_kinematic_processor import ( + AddRobotObservationAsComplimentaryData, + EEBoundsAndSafety, + EEReferenceAndDelta, + ForwardKinematicsJointsToEE, + GripperVelocityToJoint, + InverseKinematicsEEToJoints, +) from lerobot.teleoperators import ( gamepad, # noqa: F401 keyboard, # noqa: F401 make_teleoperator_from_config, so101_leader, # noqa: F401 ) -from lerobot.teleoperators.gamepad.teleop_gamepad import GamepadTeleop -from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardEndEffectorTeleop +from lerobot.teleoperators.teleoperator import Teleoperator +from lerobot.teleoperators.utils import TeleopEvents from lerobot.utils.robot_utils import busy_wait from lerobot.utils.utils import log_say logging.basicConfig(level=logging.INFO) -def reset_follower_position(robot_arm, target_position): +@dataclass +class DatasetConfig: + """Configuration for dataset creation and management.""" + + repo_id: str + dataset_root: str + task: str + num_episodes: int + episode: int + push_to_hub: bool + + +@dataclass +class GymManipulatorConfig: + """Main configuration for gym manipulator environment.""" + + env: HILSerlRobotEnvConfig + dataset: DatasetConfig + mode: str | None = None # Either "record", "replay", None + device: str = "cpu" + + +def create_transition( + observation=None, action=None, reward=0.0, done=False, truncated=False, info=None, complementary_data=None +) -> dict[str, Any]: + """Create an EnvTransition dictionary with default values.""" + return { + TransitionKey.OBSERVATION: observation, + TransitionKey.ACTION: action, + TransitionKey.REWARD: reward, + TransitionKey.DONE: done, + TransitionKey.TRUNCATED: truncated, + TransitionKey.INFO: info if info is not None else {}, + TransitionKey.COMPLEMENTARY_DATA: complementary_data if complementary_data is not None else {}, + } + + +def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None: + """Reset robot arm to target position using smooth trajectory.""" current_position_dict = robot_arm.bus.sync_read("Present_Position") current_position = np.array( [current_position_dict[name] for name in current_position_dict], dtype=np.float32 @@ -86,158 +127,25 @@ def reset_follower_position(robot_arm, target_position): busy_wait(0.015) -class TorchBox(gym.spaces.Box): - """ - A version of gym.spaces.Box that handles PyTorch tensors. - - This class extends gym.spaces.Box to work with PyTorch tensors, - providing compatibility between NumPy arrays and PyTorch tensors. - """ - - def __init__( - self, - low: float | Sequence[float] | np.ndarray, - high: float | Sequence[float] | np.ndarray, - shape: Sequence[int] | None = None, - np_dtype: np.dtype | type = np.float32, - torch_dtype: torch.dtype = torch.float32, - device: str = "cpu", - seed: int | np.random.Generator | None = None, - ) -> None: - """ - Initialize the PyTorch-compatible Box space. - - Args: - low: Lower bounds of the space. - high: Upper bounds of the space. - shape: Shape of the space. If None, inferred from low and high. - np_dtype: NumPy data type for internal storage. - torch_dtype: PyTorch data type for tensor conversion. - device: PyTorch device for returned tensors. - seed: Random seed for sampling. - """ - super().__init__(low, high, shape=shape, dtype=np_dtype, seed=seed) - self.torch_dtype = torch_dtype - self.device = device - - def sample(self) -> torch.Tensor: - """ - Sample a random point from the space. - - Returns: - A PyTorch tensor within the space bounds. - """ - arr = super().sample() - return torch.as_tensor(arr, dtype=self.torch_dtype, device=self.device) - - def contains(self, x: torch.Tensor) -> bool: - """ - Check if a tensor is within the space bounds. - - Args: - x: The PyTorch tensor to check. - - Returns: - Boolean indicating whether the tensor is within bounds. - """ - # Move to CPU/numpy and cast to the internal dtype - arr = x.detach().cpu().numpy().astype(self.dtype, copy=False) - return super().contains(arr) - - def seed(self, seed: int | np.random.Generator | None = None): - """ - Set the random seed for sampling. - - Args: - seed: The random seed to use. - - Returns: - List containing the seed. - """ - super().seed(seed) - return [seed] - - def __repr__(self) -> str: - """ - Return a string representation of the space. - - Returns: - Formatted string with space details. - """ - return ( - f"TorchBox({self.low_repr}, {self.high_repr}, {self.shape}, " - f"np={self.dtype.name}, torch={self.torch_dtype}, device={self.device})" - ) - - -class TorchActionWrapper(gym.Wrapper): - """ - Wrapper that changes the action space to use PyTorch tensors. - - This wrapper modifies the action space to return PyTorch tensors when sampled - and handles converting PyTorch actions to NumPy when stepping the environment. - """ - - def __init__(self, env: gym.Env, device: str): - """ - Initialize the PyTorch action space wrapper. - - Args: - env: The environment to wrap. - device: The PyTorch device to use for tensor operations. - """ - super().__init__(env) - self.action_space = TorchBox( - low=env.action_space.low, - high=env.action_space.high, - shape=env.action_space.shape, - torch_dtype=torch.float32, - device=torch.device("cpu"), - ) - - def step(self, action: torch.Tensor): - """ - Step the environment with a PyTorch tensor action. - - This method handles conversion from PyTorch tensors to NumPy arrays - for compatibility with the underlying environment. - - Args: - action: PyTorch tensor action to take. - - Returns: - Tuple of (observation, reward, terminated, truncated, info). - """ - if action.dim() == 2: - action = action.squeeze(0) - action = action.detach().cpu().numpy() - return self.env.step(action) - - class RobotEnv(gym.Env): - """ - Gym-compatible environment for evaluating robotic control policies with integrated human intervention. - - This environment wraps a robot interface to provide a consistent API for policy evaluation. It supports both relative (delta) - and absolute joint position commands and automatically configures its observation and action spaces based on the robot's - sensors and configuration. - """ + """Gym environment for robotic control with human intervention support.""" def __init__( self, robot, use_gripper: bool = False, display_cameras: bool = False, - ): - """ - Initialize the RobotEnv environment. - - The environment is set up with a robot interface, which is used to capture observations and send joint commands. The setup - supports both relative (delta) adjustments and absolute joint positions for controlling the robot. + reset_pose: list[float] | None = None, + reset_time_s: float = 5.0, + ) -> None: + """Initialize robot environment with configuration options. Args: - robot: The robot interface object used to connect and interact with the physical robot. - display_cameras: If True, the robot's camera feeds will be displayed during execution. + robot: Robot interface for hardware communication. + use_gripper: Whether to include gripper in action space. + display_cameras: Whether to show camera feeds during execution. + reset_pose: Joint positions for environment reset. + reset_time_s: Time to wait during reset. """ super().__init__() @@ -255,52 +163,45 @@ class RobotEnv(gym.Env): self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors] self._image_keys = self.robot.cameras.keys() - self.current_observation = None + self.reset_pose = reset_pose + self.reset_time_s = reset_time_s self.use_gripper = use_gripper self._setup_spaces() - def _get_observation(self) -> dict[str, np.ndarray]: - """Helper to convert a dictionary from bus.sync_read to an ordered numpy array.""" + def _get_observation(self) -> dict[str, Any]: + """Get current robot observation including joint positions and camera images.""" obs_dict = self.robot.get_observation() joint_positions = np.array([obs_dict[name] for name in self._joint_names]) images = {key: obs_dict[key] for key in self._image_keys} - self.current_observation = {"agent_pos": joint_positions, "pixels": images} + return {"agent_pos": joint_positions, "pixels": images} - def _setup_spaces(self): - """ - Dynamically configure the observation and action spaces based on the robot's capabilities. - - Observation Space: - - For keys with "image": A Box space with pixel values ranging from 0 to 255. - - For non-image keys: A nested Dict space is created under 'observation.state' with a suitable range. - - Action Space: - - The action space is defined as a Box space representing joint position commands. It is defined as relative (delta) - or absolute, based on the configuration. - """ - self._get_observation() + def _setup_spaces(self) -> None: + """Configure observation and action spaces based on robot capabilities.""" + current_observation = self._get_observation() observation_spaces = {} # Define observation spaces for images and other states. - if "pixels" in self.current_observation: + if current_observation is not None and "pixels" in current_observation: prefix = "observation.images" observation_spaces = { f"{prefix}.{key}": gym.spaces.Box( - low=0, high=255, shape=self.current_observation["pixels"][key].shape, dtype=np.uint8 + low=0, high=255, shape=current_observation["pixels"][key].shape, dtype=np.uint8 ) - for key in self.current_observation["pixels"] + for key in current_observation["pixels"] } - observation_spaces["observation.state"] = gym.spaces.Box( - low=0, - high=10, - shape=self.current_observation["agent_pos"].shape, - dtype=np.float32, - ) + if current_observation is not None: + agent_pos = current_observation["agent_pos"] + observation_spaces["observation.state"] = gym.spaces.Box( + low=0, + high=10, + shape=agent_pos.shape, + dtype=np.float32, + ) self.observation_space = gym.spaces.Dict(observation_spaces) @@ -322,57 +223,48 @@ class RobotEnv(gym.Env): dtype=np.float32, ) - def reset(self, seed=None, options=None) -> tuple[dict[str, np.ndarray], dict[str, Any]]: - """ - Reset the environment to its initial state. - This method resets the step counter and clears any episodic data. + def reset( + self, *, seed: int | None = None, options: dict[str, Any] | None = None + ) -> tuple[dict[str, Any], dict[str, Any]]: + """Reset environment to initial state. Args: - seed: A seed for random number generation to ensure reproducibility. - options: Additional options to influence the reset behavior. + seed: Random seed for reproducibility. + options: Additional reset options. Returns: - A tuple containing: - - observation (dict): The initial sensor observation. - - info (dict): A dictionary with supplementary information, including the key "is_intervention". + Tuple of (observation, info) dictionaries. """ - super().reset(seed=seed, options=options) + # Reset the robot + # self.robot.reset() + start_time = time.perf_counter() + if self.reset_pose is not None: + log_say("Reset the environment.", play_sounds=True) + reset_follower_position(self.robot, np.array(self.reset_pose)) + log_say("Reset the environment done.", play_sounds=True) - self.robot.reset() + busy_wait(self.reset_time_s - (time.perf_counter() - start_time)) + + super().reset(seed=seed, options=options) # Reset episode tracking variables. self.current_step = 0 self.episode_data = None - self.current_observation = None - self._get_observation() - return self.current_observation, {"is_intervention": False} + obs = self._get_observation() + return obs, { + TeleopEvents.IS_INTERVENTION: False, + "raw_joint_positions": obs["agent_pos"], + } def step(self, action) -> tuple[dict[str, np.ndarray], float, bool, bool, dict[str, Any]]: - """ - Execute a single step within the environment using the specified action. + """Execute one environment step with given action.""" + joint_targets_dict = { + f"{key}.pos": action[f"action.{key}.pos"] for key in self.robot.bus.motors.keys() + } - The provided action is processed and sent to the robot as joint position commands - that may be either absolute values or deltas based on the environment configuration. + self.robot.send_action(joint_targets_dict) - Args: - action: The commanded joint positions as a numpy array or torch tensor. - - Returns: - A tuple containing: - - observation (dict): The new sensor observation after taking the step. - - reward (float): The step reward (default is 0.0 within this wrapper). - - terminated (bool): True if the episode has reached a terminal state. - - truncated (bool): True if the episode was truncated (e.g., time constraints). - - info (dict): Additional debugging information including intervention status. - """ - action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2]} - - # 1.0 action corresponds to no-op action - action_dict["gripper"] = action[3] if self.use_gripper else 1.0 - - self.robot.send_action(action_dict) - - self._get_observation() + obs = self._get_observation() if self.display_cameras: self.render() @@ -384,1879 +276,477 @@ class RobotEnv(gym.Env): truncated = False return ( - self.current_observation, + obs, reward, terminated, truncated, - {"is_intervention": False}, + {TeleopEvents.IS_INTERVENTION: False, "raw_joint_positions": obs["agent_pos"]}, ) - def render(self): - """ - Render the current state of the environment by displaying the robot's camera feeds. - """ + def render(self) -> None: + """Display robot camera feeds.""" import cv2 - image_keys = [key for key in self.current_observation if "image" in key] + current_observation = self._get_observation() + if current_observation is not None: + image_keys = [key for key in current_observation if "image" in key] - for key in image_keys: - cv2.imshow(key, cv2.cvtColor(self.current_observation[key].numpy(), cv2.COLOR_RGB2BGR)) - cv2.waitKey(1) + for key in image_keys: + cv2.imshow(key, cv2.cvtColor(current_observation[key].numpy(), cv2.COLOR_RGB2BGR)) + cv2.waitKey(1) - def close(self): - """ - Close the environment and clean up resources by disconnecting the robot. - - If the robot is currently connected, this method properly terminates the connection to ensure that all - associated resources are released. - """ + def close(self) -> None: + """Close environment and disconnect robot.""" if self.robot.is_connected: self.robot.disconnect() -class AddJointVelocityToObservation(gym.ObservationWrapper): - """ - Wrapper that adds joint velocity information to the observation. - - This wrapper computes joint velocities by tracking changes in joint positions over time, - and extends the observation space to include these velocities. - """ - - def __init__(self, env, joint_velocity_limits=100.0, fps=30, num_dof=6): - """ - Initialize the joint velocity wrapper. - - Args: - env: The environment to wrap. - joint_velocity_limits: Maximum expected joint velocity for space bounds. - fps: Frames per second used to calculate velocity (position delta / time). - num_dof: Number of degrees of freedom (joints) in the robot. - """ - super().__init__(env) - - # Extend observation space to include joint velocities - old_low = self.observation_space["observation.state"].low - old_high = self.observation_space["observation.state"].high - old_shape = self.observation_space["observation.state"].shape - - self.last_joint_positions = np.zeros(num_dof) - - new_low = np.concatenate([old_low, np.ones(num_dof) * -joint_velocity_limits]) - new_high = np.concatenate([old_high, np.ones(num_dof) * joint_velocity_limits]) - - new_shape = (old_shape[0] + num_dof,) - - self.observation_space["observation.state"] = gym.spaces.Box( - low=new_low, - high=new_high, - shape=new_shape, - dtype=np.float32, - ) - - self.dt = 1.0 / fps - - def observation(self, observation): - """ - Add joint velocity information to the observation. - - Args: - observation: The original observation from the environment. - - Returns: - The modified observation with joint velocities. - """ - joint_velocities = (observation["agent_pos"] - self.last_joint_positions) / self.dt - self.last_joint_positions = observation["agent_pos"] - observation["agent_pos"] = np.concatenate([observation["agent_pos"], joint_velocities], axis=-1) - return observation - - -class AddCurrentToObservation(gym.ObservationWrapper): - """ - Wrapper that adds motor current information to the observation. - - This wrapper extends the observation space to include the current values - from each motor, providing information about the forces being applied. - """ - - def __init__(self, env, max_current=500, num_dof=6): - """ - Initialize the current observation wrapper. - - Args: - env: The environment to wrap. - max_current: Maximum expected current for space bounds. - num_dof: Number of degrees of freedom (joints) in the robot. - """ - super().__init__(env) - - # Extend observation space to include joint velocities - old_low = self.observation_space["observation.state"].low - old_high = self.observation_space["observation.state"].high - old_shape = self.observation_space["observation.state"].shape - - new_low = np.concatenate([old_low, np.zeros(num_dof)]) - new_high = np.concatenate([old_high, np.ones(num_dof) * max_current]) - - new_shape = (old_shape[0] + num_dof,) - - self.observation_space["observation.state"] = gym.spaces.Box( - low=new_low, - high=new_high, - shape=new_shape, - dtype=np.float32, - ) - - def observation(self, observation): - """ - Add current information to the observation. - - Args: - observation: The original observation from the environment. - - Returns: - The modified observation with current values. - """ - present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current") - present_current_observation = np.array( - [present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors] - ) - observation["agent_pos"] = np.concatenate( - [observation["agent_pos"], present_current_observation], axis=-1 - ) - return observation - - -class RewardWrapper(gym.Wrapper): - def __init__(self, env, reward_classifier, device="cuda"): - """ - Wrapper to add reward prediction to the environment using a trained classifier. - - Args: - env: The environment to wrap. - reward_classifier: The reward classifier model. - device: The device to run the model on. - """ - self.env = env - - self.device = device - - self.reward_classifier = torch.compile(reward_classifier) - self.reward_classifier.to(self.device) - - def step(self, action): - """ - Execute a step and compute the reward using the classifier. - - Args: - action: The action to take in the environment. - - Returns: - Tuple of (observation, reward, terminated, truncated, info). - """ - observation, _, terminated, truncated, info = self.env.step(action) - - images = {} - for key in observation: - if "image" in key: - images[key] = observation[key].to(self.device, non_blocking=(self.device == "cuda")) - if images[key].dim() == 3: - images[key] = images[key].unsqueeze(0) - - start_time = time.perf_counter() - with torch.inference_mode(): - success = ( - self.reward_classifier.predict_reward(images, threshold=0.7) - if self.reward_classifier is not None - else 0.0 - ) - info["Reward classifier frequency"] = 1 / (time.perf_counter() - start_time) - - reward = 0.0 - if success == 1.0: - terminated = True - reward = 1.0 - - return observation, reward, terminated, truncated, info - - def reset(self, seed=None, options=None): - """ - Reset the environment. - - Args: - seed: Random seed for reproducibility. - options: Additional reset options. - - Returns: - The initial observation and info from the wrapped environment. - """ - return self.env.reset(seed=seed, options=options) - - -class TimeLimitWrapper(gym.Wrapper): - """ - Wrapper that adds a time limit to episodes and tracks execution time. - - This wrapper terminates episodes after a specified time has elapsed, providing - better control over episode length. - """ - - def __init__(self, env, control_time_s, fps): - """ - Initialize the time limit wrapper. - - Args: - env: The environment to wrap. - control_time_s: Maximum episode duration in seconds. - fps: Frames per second for calculating the maximum number of steps. - """ - self.env = env - self.control_time_s = control_time_s - self.fps = fps - - self.last_timestamp = 0.0 - self.episode_time_in_s = 0.0 - - self.max_episode_steps = int(self.control_time_s * self.fps) - - self.current_step = 0 - - def step(self, action): - """ - Step the environment and track time elapsed. - - Args: - action: The action to take in the environment. - - Returns: - Tuple of (observation, reward, terminated, truncated, info). - """ - obs, reward, terminated, truncated, info = self.env.step(action) - time_since_last_step = time.perf_counter() - self.last_timestamp - self.episode_time_in_s += time_since_last_step - self.last_timestamp = time.perf_counter() - self.current_step += 1 - # check if last timestep took more time than the expected fps - if 1.0 / time_since_last_step < self.fps: - logging.debug(f"Current timestep exceeded expected fps {self.fps}") - - if self.current_step >= self.max_episode_steps: - terminated = True - return obs, reward, terminated, truncated, info - - def reset(self, seed=None, options=None): - """ - Reset the environment and time tracking. - - Args: - seed: Random seed for reproducibility. - options: Additional reset options. - - Returns: - The initial observation and info from the wrapped environment. - """ - self.episode_time_in_s = 0.0 - self.last_timestamp = time.perf_counter() - self.current_step = 0 - return self.env.reset(seed=seed, options=options) - - -class ImageCropResizeWrapper(gym.Wrapper): - """ - Wrapper that crops and resizes image observations. - - This wrapper processes image observations to focus on relevant regions by - cropping and then resizing to a standard size. - """ - - def __init__( - self, - env, - crop_params_dict: dict[str, Annotated[tuple[int], 4]], - resize_size=None, - ): - """ - Initialize the image crop and resize wrapper. - - Args: - env: The environment to wrap. - crop_params_dict: Dictionary mapping image observation keys to crop parameters - (top, left, height, width). - resize_size: Target size for resized images (height, width). Defaults to (128, 128). - """ - super().__init__(env) - self.env = env - self.crop_params_dict = crop_params_dict - print(f"obs_keys , {self.env.observation_space}") - print(f"crop params dict {crop_params_dict.keys()}") - for key_crop in crop_params_dict: - if key_crop not in self.env.observation_space.keys(): # noqa: SIM118 - raise ValueError(f"Key {key_crop} not in observation space") - for key in crop_params_dict: - new_shape = (3, resize_size[0], resize_size[1]) - self.observation_space[key] = gym.spaces.Box(low=0, high=255, shape=new_shape) - - self.resize_size = resize_size - if self.resize_size is None: - self.resize_size = (128, 128) - - def step(self, action): - """ - Step the environment and process image observations. - - Args: - action: The action to take in the environment. - - Returns: - Tuple of (observation, reward, terminated, truncated, info) with processed images. - """ - obs, reward, terminated, truncated, info = self.env.step(action) - for k in self.crop_params_dict: - device = obs[k].device - if obs[k].dim() >= 3: - # Reshape to combine height and width dimensions for easier calculation - batch_size = obs[k].size(0) - channels = obs[k].size(1) - flattened_spatial_dims = obs[k].view(batch_size, channels, -1) - - # Calculate standard deviation across spatial dimensions (H, W) - # If any channel has std=0, all pixels in that channel have the same value - # This is helpful if one camera mistakenly covered or the image is black - std_per_channel = torch.std(flattened_spatial_dims, dim=2) - if (std_per_channel <= 0.02).any(): - logging.warning( - f"Potential hardware issue detected: All pixels have the same value in observation {k}" - ) - - if device == torch.device("mps:0"): - obs[k] = obs[k].cpu() - - obs[k] = F.crop(obs[k], *self.crop_params_dict[k]) - obs[k] = F.resize(obs[k], self.resize_size) - # TODO (michel-aractingi): Bug in resize, it returns values outside [0, 1] - obs[k] = obs[k].clamp(0.0, 1.0) - obs[k] = obs[k].to(device) - - return obs, reward, terminated, truncated, info - - def reset(self, seed=None, options=None): - """ - Reset the environment and process image observations. - - Args: - seed: Random seed for reproducibility. - options: Additional reset options. - - Returns: - Tuple of (observation, info) with processed images. - """ - obs, info = self.env.reset(seed=seed, options=options) - for k in self.crop_params_dict: - device = obs[k].device - if device == torch.device("mps:0"): - obs[k] = obs[k].cpu() - obs[k] = F.crop(obs[k], *self.crop_params_dict[k]) - obs[k] = F.resize(obs[k], self.resize_size) - obs[k] = obs[k].clamp(0.0, 1.0) - obs[k] = obs[k].to(device) - return obs, info - - -class ConvertToLeRobotObservation(gym.ObservationWrapper): - """ - Wrapper that converts standard observations to LeRobot format. - - This wrapper processes observations to match the expected format for LeRobot, - including normalizing image values and moving tensors to the specified device. - """ - - def __init__(self, env, device: str = "cpu"): - """ - Initialize the LeRobot observation converter. - - Args: - env: The environment to wrap. - device: Target device for the observation tensors. - """ - super().__init__(env) - - self.device = torch.device(device) - - def observation(self, observation): - """ - Convert observations to LeRobot format. - - Args: - observation: The original observation from the environment. - - Returns: - The processed observation with normalized images and proper tensor formats. - """ - observation = preprocess_observation(observation) - observation = { - key: observation[key].to(self.device, non_blocking=self.device.type == "cuda") - for key in observation - } - return observation - - -class ResetWrapper(gym.Wrapper): - """ - Wrapper that handles environment reset procedures. - - This wrapper provides additional functionality during environment reset, - including the option to reset to a fixed pose or allow manual reset. - """ - - def __init__( - self, - env: RobotEnv, - reset_pose: np.ndarray | None = None, - reset_time_s: float = 5, - ): - """ - Initialize the reset wrapper. - - Args: - env: The environment to wrap. - reset_pose: Fixed joint positions to reset to. If None, manual reset is used. - reset_time_s: Time in seconds to wait after reset or allowed for manual reset. - """ - super().__init__(env) - self.reset_time_s = reset_time_s - self.reset_pose = reset_pose - self.robot = self.unwrapped.robot - - def reset(self, *, seed=None, options=None): - """ - Reset the environment with either fixed or manual reset procedure. - - If reset_pose is provided, the robot will move to that position. - Otherwise, manual teleoperation control is allowed for reset_time_s seconds. - - Args: - seed: Random seed for reproducibility. - options: Additional reset options. - - Returns: - The initial observation and info from the wrapped environment. - """ - start_time = time.perf_counter() - if self.reset_pose is not None: - log_say("Reset the environment.", play_sounds=True) - reset_follower_position(self.unwrapped.robot, self.reset_pose) - log_say("Reset the environment done.", play_sounds=True) - - if hasattr(self.env, "robot_leader"): - self.env.robot_leader.bus.sync_write("Torque_Enable", 1) - log_say("Reset the leader robot.", play_sounds=True) - reset_follower_position(self.env.robot_leader, self.reset_pose) - log_say("Reset the leader robot done.", play_sounds=True) - else: - log_say( - f"Manually reset the environment for {self.reset_time_s} seconds.", - play_sounds=True, - ) - start_time = time.perf_counter() - while time.perf_counter() - start_time < self.reset_time_s: - action = self.env.robot_leader.get_action() - self.unwrapped.robot.send_action(action) - - log_say("Manual reset of the environment done.", play_sounds=True) - - busy_wait(self.reset_time_s - (time.perf_counter() - start_time)) - - return super().reset(seed=seed, options=options) - - -class BatchCompatibleWrapper(gym.ObservationWrapper): - """ - Wrapper that ensures observations are compatible with batch processing. - - This wrapper adds a batch dimension to observations that don't already have one, - making them compatible with models that expect batched inputs. - """ - - def __init__(self, env): - """ - Initialize the batch compatibility wrapper. - - Args: - env: The environment to wrap. - """ - super().__init__(env) - - def observation(self, observation: dict[str, torch.Tensor]) -> dict[str, torch.Tensor]: - """ - Add batch dimensions to observations if needed. - - Args: - observation: Dictionary of observation tensors. - - Returns: - Dictionary of observation tensors with batch dimensions. - """ - for key in observation: - if "image" in key and observation[key].dim() == 3: - observation[key] = observation[key].unsqueeze(0) - if "state" in key and observation[key].dim() == 1: - observation[key] = observation[key].unsqueeze(0) - if "velocity" in key and observation[key].dim() == 1: - observation[key] = observation[key].unsqueeze(0) - return observation - - -class GripperPenaltyWrapper(gym.RewardWrapper): - """ - Wrapper that adds penalties for inefficient gripper commands. - - This wrapper modifies rewards to discourage excessive gripper movement - or commands that attempt to move the gripper beyond its physical limits. - """ - - def __init__(self, env, penalty: float = -0.1): - """ - Initialize the gripper penalty wrapper. - - Args: - env: The environment to wrap. - penalty: Negative reward value to apply for inefficient gripper actions. - """ - super().__init__(env) - self.penalty = penalty - self.last_gripper_state = None - - def reward(self, reward, action): - """ - Apply penalties to reward based on gripper actions. - - Args: - reward: The original reward from the environment. - action: The action that was taken. - - Returns: - Modified reward with penalty applied if necessary. - """ - gripper_state_normalized = self.last_gripper_state / self.unwrapped.robot.config.max_gripper_pos - - action_normalized = action - 1.0 # action / MAX_GRIPPER_COMMAND - - gripper_penalty_bool = (gripper_state_normalized < 0.5 and action_normalized > 0.5) or ( - gripper_state_normalized > 0.75 and action_normalized < -0.5 - ) - - return reward + self.penalty * int(gripper_penalty_bool) - - def step(self, action): - """ - Step the environment and apply gripper penalties. - - Args: - action: The action to take in the environment. - - Returns: - Tuple of (observation, reward, terminated, truncated, info) with penalty applied. - """ - self.last_gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"] - - gripper_action = action[-1] - obs, reward, terminated, truncated, info = self.env.step(action) - gripper_penalty = self.reward(reward, gripper_action) - - info["discrete_penalty"] = gripper_penalty - - return obs, reward, terminated, truncated, info - - def reset(self, **kwargs): - """ - Reset the environment and penalty tracking. - - Args: - **kwargs: Keyword arguments passed to the wrapped environment's reset. - - Returns: - The initial observation and info with gripper penalty initialized. - """ - self.last_gripper_state = None - obs, info = super().reset(**kwargs) - info["gripper_penalty"] = 0.0 - return obs, info - - -class GripperActionWrapper(gym.ActionWrapper): - """ - Wrapper that processes gripper control commands. - - This wrapper quantizes and processes gripper commands, adding a sleep time between - consecutive gripper actions to prevent rapid toggling. - """ - - def __init__(self, env, quantization_threshold: float = 0.2, gripper_sleep: float = 0.0): - """ - Initialize the gripper action wrapper. - - Args: - env: The environment to wrap. - quantization_threshold: Threshold below which gripper commands are quantized to zero. - gripper_sleep: Minimum time in seconds between consecutive gripper commands. - """ - super().__init__(env) - self.quantization_threshold = quantization_threshold - self.gripper_sleep = gripper_sleep - self.last_gripper_action_time = 0.0 - self.last_gripper_action = None - - def action(self, action): - """ - Process gripper commands in the action. - - Args: - action: The original action from the agent. - - Returns: - Modified action with processed gripper command. - """ - if self.gripper_sleep > 0.0: - if ( - self.last_gripper_action is not None - and time.perf_counter() - self.last_gripper_action_time < self.gripper_sleep - ): - action[-1] = self.last_gripper_action - else: - self.last_gripper_action_time = time.perf_counter() - self.last_gripper_action = action[-1] - - gripper_command = action[-1] - # Gripper actions are between 0, 2 - # we want to quantize them to -1, 0 or 1 - gripper_command = gripper_command - 1.0 - - if self.quantization_threshold is not None: - # Quantize gripper command to -1, 0 or 1 - gripper_command = ( - np.sign(gripper_command) if abs(gripper_command) > self.quantization_threshold else 0.0 - ) - gripper_command = gripper_command * self.unwrapped.robot.config.max_gripper_pos - - gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"] - - gripper_action_value = np.clip( - gripper_state + gripper_command, 0, self.unwrapped.robot.config.max_gripper_pos - ) - action[-1] = gripper_action_value.item() - return action - - def reset(self, **kwargs): - """ - Reset the gripper action tracking. - - Args: - **kwargs: Keyword arguments passed to the wrapped environment's reset. - - Returns: - The initial observation and info. - """ - obs, info = super().reset(**kwargs) - self.last_gripper_action_time = 0.0 - self.last_gripper_action = None - return obs, info - - -class EEObservationWrapper(gym.ObservationWrapper): - """ - Wrapper that adds end-effector pose information to observations. - - This wrapper computes the end-effector pose using forward kinematics - and adds it to the observation space. - """ - - def __init__(self, env, ee_pose_limits): - """ - Initialize the end-effector observation wrapper. - - Args: - env: The environment to wrap. - ee_pose_limits: Dictionary with 'min' and 'max' keys containing limits for EE pose. - """ - super().__init__(env) - - # Extend observation space to include end effector pose - prev_space = self.observation_space["observation.state"] - - self.observation_space["observation.state"] = gym.spaces.Box( - low=np.concatenate([prev_space.low, ee_pose_limits["min"]]), - high=np.concatenate([prev_space.high, ee_pose_limits["max"]]), - shape=(prev_space.shape[0] + 3,), - dtype=np.float32, - ) - - self.kinematics = RobotKinematics( - urdf_path=env.unwrapped.robot.config.urdf_path, - target_frame_name=env.unwrapped.robot.config.target_frame_name, - ) - - def observation(self, observation): - """ - Add end-effector pose to the observation. - - Args: - observation: Original observation from the environment. - - Returns: - Enhanced observation with end-effector pose information. - """ - current_joint_pos = self.unwrapped.current_observation["agent_pos"] - - current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos)[:3, 3] - observation["agent_pos"] = np.concatenate([observation["agent_pos"], current_ee_pos], -1) - return observation - - -########################################################### -# Wrappers related to human intervention and input devices -########################################################### - - -class BaseLeaderControlWrapper(gym.Wrapper): - """ - Base class for leader-follower robot control wrappers. - - This wrapper enables human intervention through a leader-follower robot setup, - where the human can control a leader robot to guide the follower robot's movements. - """ - - def __init__( - self, - env, - teleop_device, - end_effector_step_sizes, - use_geared_leader_arm: bool = False, - use_gripper=False, - ): - """ - Initialize the base leader control wrapper. - - Args: - env: The environment to wrap. - teleop_device: The teleoperation device. - use_geared_leader_arm: Whether to use a geared leader arm setup. - use_gripper: Whether to include gripper control. - """ - super().__init__(env) - self.robot_leader = teleop_device - self.robot_follower = env.unwrapped.robot - self.use_geared_leader_arm = use_geared_leader_arm - self.use_gripper: bool = use_gripper - self.end_effector_step_sizes = np.array(list(end_effector_step_sizes.values())) - - # Set up keyboard event tracking - self._init_keyboard_events() - self.event_lock = Lock() # Thread-safe access to events - - # Initialize robot control - self.kinematics = RobotKinematics( - urdf_path=env.unwrapped.robot.config.urdf_path, - target_frame_name=env.unwrapped.robot.config.target_frame_name, - ) - self.leader_torque_enabled = True - self.prev_leader_gripper = None - - # Configure leader arm - # NOTE: Lower the gains of leader arm for automatic take-over - # With lower gains we can manually move the leader arm without risk of injury to ourselves or the robot - # With higher gains, it would be dangerous and difficult to modify the leader's pose while torque is enabled - # Default value for P_coeff is 32 - self.robot_leader.bus.sync_write("Torque_Enable", 1) - for motor in self.robot_leader.bus.motors: - self.robot_leader.bus.write("P_Coefficient", motor, 16) - self.robot_leader.bus.write("I_Coefficient", motor, 0) - self.robot_leader.bus.write("D_Coefficient", motor, 16) - - self.leader_tracking_error_queue = deque(maxlen=4) - self._init_keyboard_listener() - - def _init_keyboard_events(self): - """ - Initialize the keyboard events dictionary. - - This method sets up tracking for keyboard events used for intervention control. - It should be overridden in subclasses to add additional events. - """ - self.keyboard_events = { - "episode_success": False, - "episode_end": False, - "rerecord_episode": False, - } - - def _handle_key_press(self, key, keyboard_device): - """ - Handle key press events. - - Args: - key: The key that was pressed. - keyboard: The keyboard module with key definitions. - - This method should be overridden in subclasses for additional key handling. - """ - try: - if key == keyboard_device.Key.esc: - self.keyboard_events["episode_end"] = True - return - if key == keyboard_device.Key.left: - self.keyboard_events["rerecord_episode"] = True - return - if hasattr(key, "char") and key.char == "s": - logging.info("Key 's' pressed. Episode success triggered.") - self.keyboard_events["episode_success"] = True - return - except Exception as e: - logging.error(f"Error handling key press: {e}") - - def _init_keyboard_listener(self): - """ - Initialize the keyboard listener for intervention control. - - This method sets up keyboard event handling if not in headless mode. - """ - from pynput import keyboard as keyboard_device - - def on_press(key): - with self.event_lock: - self._handle_key_press(key, keyboard_device) - - self.listener = keyboard_device.Listener(on_press=on_press) - self.listener.start() - - def _check_intervention(self): - """ - Check if human intervention is needed. - - Returns: - Boolean indicating whether intervention is needed. - - This method should be overridden in subclasses with specific intervention logic. - """ - return False - - def _handle_intervention(self, action): - """ - Process actions during intervention mode. - - Args: - action: The original action from the agent. - - Returns: - Tuple of (modified_action, intervention_action). - """ - if self.leader_torque_enabled: - self.robot_leader.bus.sync_write("Torque_Enable", 0) - self.leader_torque_enabled = False - - leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position") - follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position") - - leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict]) - follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict]) - - self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - leader_pos[:-1])) - - # [:3, 3] Last column of the transformation matrix corresponds to the xyz translation - leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3] - follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3] - - action = np.clip(leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes) - # Normalize the action to the range [-1, 1] - action = action / self.end_effector_step_sizes - - if self.use_gripper: - if self.prev_leader_gripper is None: - self.prev_leader_gripper = np.clip( - leader_pos[-1], 0, self.robot_follower.config.max_gripper_pos - ) - - # Get gripper action delta based on leader pose - leader_gripper = leader_pos[-1] - gripper_delta = leader_gripper - self.prev_leader_gripper - - # Normalize by max angle and quantize to {0,1,2} - normalized_delta = gripper_delta / self.robot_follower.config.max_gripper_pos - if normalized_delta >= 0.3: - gripper_action = 2 - elif normalized_delta <= 0.1: - gripper_action = 0 - else: - gripper_action = 1 - - action = np.append(action, gripper_action) - - return action - - def _handle_leader_teleoperation(self): - """ - Handle leader teleoperation in non-intervention mode. - - This method synchronizes the leader robot position with the follower. - """ - - prev_leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position") - prev_leader_pos = np.array( - [prev_leader_pos_dict[name] for name in prev_leader_pos_dict], dtype=np.float32 - ) - - if not self.leader_torque_enabled: - self.robot_leader.bus.sync_write("Torque_Enable", 1) - self.leader_torque_enabled = True - - follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position") - follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32) - - goal_pos = {f"{motor}": follower_pos[i] for i, motor in enumerate(self.robot_leader.bus.motors)} - self.robot_leader.bus.sync_write("Goal_Position", goal_pos) - - self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - prev_leader_pos[:-1])) - - def step(self, action): - """ - Execute a step with possible human intervention. - - Args: - action: The action to take in the environment. - - Returns: - Tuple of (observation, reward, terminated, truncated, info). - """ - is_intervention = self._check_intervention() - - # NOTE: - if is_intervention: - action = self._handle_intervention(action) - else: - self._handle_leader_teleoperation() - - # NOTE: - obs, reward, terminated, truncated, info = self.env.step(action) - - if isinstance(action, np.ndarray): - action = torch.from_numpy(action) - - # Add intervention info - info["is_intervention"] = is_intervention - info["action_intervention"] = action - - self.prev_leader_gripper = np.clip( - self.robot_leader.bus.sync_read("Present_Position")["gripper"], - 0, - self.robot_follower.config.max_gripper_pos, - ) - - # Check for success or manual termination - success = self.keyboard_events["episode_success"] - terminated = terminated or self.keyboard_events["episode_end"] or success - - if success: - reward = 1.0 - logging.info("Episode ended successfully with reward 1.0") - - return obs, reward, terminated, truncated, info - - def reset(self, **kwargs): - """ - Reset the environment and intervention state. - - Args: - **kwargs: Keyword arguments passed to the wrapped environment's reset. - - Returns: - The initial observation and info. - """ - self.keyboard_events = dict.fromkeys(self.keyboard_events, False) - self.leader_tracking_error_queue.clear() - return super().reset(**kwargs) - - def close(self): - """ - Clean up resources, including stopping keyboard listener. - - Returns: - Result of closing the wrapped environment. - """ - if hasattr(self, "listener") and self.listener is not None: - self.listener.stop() - return self.env.close() - - -class GearedLeaderControlWrapper(BaseLeaderControlWrapper): - """ - Wrapper that enables manual intervention via keyboard. - - This wrapper extends the BaseLeaderControlWrapper to allow explicit toggling - of human intervention mode with keyboard controls. - """ - - def _init_keyboard_events(self): - """ - Initialize keyboard events including human intervention flag. - - Extends the base class dictionary with an additional flag for tracking - intervention state toggled by keyboard. - """ - super()._init_keyboard_events() - self.keyboard_events["human_intervention_step"] = False - - def _handle_key_press(self, key, keyboard_device): - """ - Handle key presses including space for intervention toggle. - - Args: - key: The key that was pressed. - keyboard: The keyboard module with key definitions. - - Extends the base handler to respond to space key for toggling intervention. - """ - super()._handle_key_press(key, keyboard_device) - if key == keyboard_device.Key.space: - if not self.keyboard_events["human_intervention_step"]: - logging.info( - "Space key pressed. Human intervention required.\n" - "Place the leader in similar pose to the follower and press space again." - ) - self.keyboard_events["human_intervention_step"] = True - log_say("Human intervention step.", play_sounds=True) - else: - self.keyboard_events["human_intervention_step"] = False - logging.info("Space key pressed for a second time.\nContinuing with policy actions.") - log_say("Continuing with policy actions.", play_sounds=True) - - def _check_intervention(self): - """ - Check if human intervention is active based on keyboard toggle. - - Returns: - Boolean indicating whether intervention mode is active. - """ - return self.keyboard_events["human_intervention_step"] - - -class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): - """ - Wrapper with automatic intervention based on error thresholds. - - This wrapper monitors the error between leader and follower positions - and automatically triggers intervention when error exceeds thresholds. - """ - - def __init__( - self, - env, - teleop_device, - end_effector_step_sizes, - use_gripper=False, - intervention_threshold=10.0, - release_threshold=1e-2, - ): - """ - Initialize the automatic intervention wrapper. - - Args: - env: The environment to wrap. - teleop_device: The teleoperation device. - use_gripper: Whether to include gripper control. - intervention_threshold: Error threshold to trigger intervention. - release_threshold: Error threshold to release intervention. - queue_size: Number of error measurements to track for smoothing. - """ - super().__init__(env, teleop_device, end_effector_step_sizes, use_gripper=use_gripper) - - # Error tracking parameters - self.intervention_threshold = intervention_threshold # Threshold to trigger intervention - self.release_threshold = release_threshold # Threshold to release intervention - self.is_intervention_active = False - self.start_time = time.perf_counter() - - def _check_intervention(self): - """ - Determine if intervention should occur based on the rate of change of leader-follower error in end_effector space. - - This method monitors the rate of change of leader-follower error in end_effector space - and automatically triggers intervention when the rate of change exceeds - the intervention threshold, releasing when it falls below the release threshold. - - Returns: - Boolean indicating whether intervention should be active. - """ - - # Condition for starting the intervention - # If the error in teleoperation is too high, that means the a user has grasped the leader robot and he wants to take over - if ( - not self.is_intervention_active - and len(self.leader_tracking_error_queue) == self.leader_tracking_error_queue.maxlen - and np.var(list(self.leader_tracking_error_queue)[-2:]) > self.intervention_threshold - ): - self.is_intervention_active = True - self.leader_tracking_error_queue.clear() - log_say("Intervention started", play_sounds=True) - return True - - # Track the error over time in leader_tracking_error_queue - # If the variance of the tracking error is too low, that means the user has let go of the leader robot and the intervention is over - if ( - self.is_intervention_active - and len(self.leader_tracking_error_queue) == self.leader_tracking_error_queue.maxlen - and np.var(self.leader_tracking_error_queue) < self.release_threshold - ): - self.is_intervention_active = False - self.leader_tracking_error_queue.clear() - log_say("Intervention ended", play_sounds=True) - return False - - # If not change has happened that merits a change in the intervention state, return the current state - return self.is_intervention_active - - def reset(self, **kwargs): - """ - Reset error tracking on environment reset. - - Args: - **kwargs: Keyword arguments passed to the wrapped environment's reset. - - Returns: - The initial observation and info. - """ - self.is_intervention_active = False - return super().reset(**kwargs) - - -class GamepadControlWrapper(gym.Wrapper): - """ - Wrapper that allows controlling a gym environment with a gamepad. - - This wrapper intercepts the step method and allows human input via gamepad - to override the agent's actions when desired. - """ - - def __init__( - self, - env, - teleop_device, # Accepts an instantiated teleoperator - use_gripper=False, # This should align with teleop_device's config - auto_reset=False, - ): - """ - Initialize the gamepad controller wrapper. - - Args: - env: The environment to wrap. - teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop). - use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper). - auto_reset: Whether to auto reset the environment when episode ends. - """ - super().__init__(env) - - self.teleop_device = teleop_device - # Ensure the teleop_device is connected if it has a connect method - if hasattr(self.teleop_device, "connect") and not self.teleop_device.is_connected: - self.teleop_device.connect() - - # self.controller attribute is removed - - self.auto_reset = auto_reset - # use_gripper from args should ideally match teleop_device.config.use_gripper - # For now, we use the one passed, but it can lead to inconsistency if not set correctly from config - self.use_gripper = use_gripper - - logging.info("Gamepad control wrapper initialized with provided teleop_device.") - print( - "Gamepad controls (managed by the provided teleop_device - specific button mappings might vary):" - ) - print(" Left analog stick: Move in X-Y plane") - print(" Right analog stick: Move in Z axis (up/down)") - print(" X/Square button: End episode (FAILURE)") - print(" Y/Triangle button: End episode (SUCCESS)") - print(" B/Circle button: Exit program") - - def get_teleop_commands( - self, - ) -> tuple[bool, np.ndarray, bool, bool, bool]: - """ - Get the current action from the gamepad if any input is active. - - Returns: - Tuple containing: - - is_active: Whether gamepad input is active (from teleop_device.gamepad.should_intervene()) - - action: The action derived from gamepad input (from teleop_device.get_action()) - - terminate_episode: Whether episode termination was requested - - success: Whether episode success was signaled - - rerecord_episode: Whether episode rerecording was requested - """ - if not hasattr(self.teleop_device, "gamepad") or self.teleop_device.gamepad is None: - raise AttributeError( - "teleop_device does not have a 'gamepad' attribute or it is None. Expected for GamepadControlWrapper." - ) - - # Get status flags from the underlying gamepad controller within the teleop_device - self.teleop_device.gamepad.update() # Ensure gamepad state is fresh - intervention_is_active = self.teleop_device.gamepad.should_intervene() - episode_end_status = self.teleop_device.gamepad.get_episode_end_status() - - terminate_episode = episode_end_status is not None - success = episode_end_status == "success" - rerecord_episode = episode_end_status == "rerecord_episode" - - # Get the action dictionary from the teleop_device - action_dict = self.teleop_device.get_action() - - # Convert action_dict to numpy array based on expected structure - # Order: delta_x, delta_y, delta_z, gripper (if use_gripper) - action_list = [action_dict["delta_x"], action_dict["delta_y"], action_dict["delta_z"]] - if self.use_gripper: - # GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open) - # This needs to be consistent with what EEActionWrapper expects if it's used downstream - # EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open) - # For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility. - gripper_val = action_dict.get("gripper", 1.0) # Default to 1.0 (stay) if not present - action_list.append(float(gripper_val)) - - gamepad_action_np = np.array(action_list, dtype=np.float32) - - return ( - intervention_is_active, - gamepad_action_np, - terminate_episode, - success, - rerecord_episode, - ) - - def step(self, action): - """ - Step the environment, using gamepad input to override actions when active. - - Args: - action: Original action from agent. - - Returns: - Tuple of (observation, reward, terminated, truncated, info). - """ - # Get gamepad state and action - ( - is_intervention, - gamepad_action, - terminate_episode, - success, - rerecord_episode, - ) = self.get_teleop_commands() - - # Update episode ending state if requested - if terminate_episode: - logging.info(f"Episode manually ended: {'SUCCESS' if success else 'FAILURE'}") - - # Only override the action if gamepad is active - action = gamepad_action if is_intervention else action - - # Step the environment - obs, reward, terminated, truncated, info = self.env.step(action) - - # Add episode ending if requested via gamepad - terminated = terminated or truncated or terminate_episode - - if success: - reward = 1.0 - logging.info("Episode ended successfully with reward 1.0") - - if isinstance(action, np.ndarray): - action = torch.from_numpy(action) - - info["is_intervention"] = is_intervention - # The original `BaseLeaderControlWrapper` puts `action_intervention` in info. - # For Gamepad, if intervention, `gamepad_action` is the intervention. - # If not intervention, policy's action is `action`. - # For consistency, let's store the *human's* action if intervention occurred. - info["action_intervention"] = action - - info["rerecord_episode"] = rerecord_episode - - # If episode ended, reset the state - if terminated or truncated: - # Add success/failure information to info dict - info["next.success"] = success - - # Auto reset if configured - if self.auto_reset: - obs, reset_info = self.reset() - info.update(reset_info) - - return obs, reward, terminated, truncated, info - - def close(self): - """ - Clean up resources when environment closes. - - Returns: - Result of closing the wrapped environment. - """ - if hasattr(self.teleop_device, "disconnect"): - self.teleop_device.disconnect() - - # Call the parent close method - return self.env.close() - - -class KeyboardControlWrapper(GamepadControlWrapper): - """ - Wrapper that allows controlling a gym environment with a keyboard. - - This wrapper intercepts the step method and allows human input via keyboard - to override the agent's actions when desired. - - Inherits from GamepadControlWrapper to avoid code duplication. - """ - - def __init__( - self, - env, - teleop_device, # Accepts an instantiated teleoperator - use_gripper=False, # This should align with teleop_device's config - auto_reset=False, - ): - """ - Initialize the gamepad controller wrapper. - - Args: - env: The environment to wrap. - teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop). - use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper). - auto_reset: Whether to auto reset the environment when episode ends. - """ - super().__init__(env, teleop_device, use_gripper, auto_reset) - - self.is_intervention_active = False - - logging.info("Keyboard control wrapper initialized with provided teleop_device.") - print("Keyboard controls:") - print(" Arrow keys: Move in X-Y plane") - print(" Shift and Shift_R: Move in Z axis") - print(" Right Ctrl and Left Ctrl: Open and close gripper") - print(" f: End episode with FAILURE") - print(" s: End episode with SUCCESS") - print(" r: End episode with RERECORD") - print(" i: Start/Stop Intervention") - - def get_teleop_commands( - self, - ) -> tuple[bool, np.ndarray, bool, bool, bool]: - action_dict = self.teleop_device.get_action() - episode_end_status = None - - # Unroll the misc_keys_queue to check for events related to intervention, episode success, etc. - while not self.teleop_device.misc_keys_queue.empty(): - key = self.teleop_device.misc_keys_queue.get() - if key == "i": - self.is_intervention_active = not self.is_intervention_active - elif key == "f": - episode_end_status = "failure" - elif key == "s": - episode_end_status = "success" - elif key == "r": - episode_end_status = "rerecord_episode" - - terminate_episode = episode_end_status is not None - success = episode_end_status == "success" - rerecord_episode = episode_end_status == "rerecord_episode" - - # Convert action_dict to numpy array based on expected structure - # Order: delta_x, delta_y, delta_z, gripper (if use_gripper) - action_list = [action_dict["delta_x"], action_dict["delta_y"], action_dict["delta_z"]] - if self.use_gripper: - # GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open) - # This needs to be consistent with what EEActionWrapper expects if it's used downstream - # EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open) - # For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility. - gripper_val = action_dict.get("gripper", 1.0) # Default to 1.0 (stay) if not present - action_list.append(float(gripper_val)) - - gamepad_action_np = np.array(action_list, dtype=np.float32) - - return ( - self.is_intervention_active, - gamepad_action_np, - terminate_episode, - success, - rerecord_episode, - ) - - -class GymHilDeviceWrapper(gym.Wrapper): - def __init__(self, env, device="cpu"): - super().__init__(env) - self.device = device - - def step(self, action): - obs, reward, terminated, truncated, info = self.env.step(action) - for k in obs: - obs[k] = obs[k].to(self.device) - if "action_intervention" in info: - # NOTE: This is a hack to ensure the action intervention is a float32 tensor and supported on MPS device - info["action_intervention"] = info["action_intervention"].astype(np.float32) - info["action_intervention"] = torch.from_numpy(info["action_intervention"]).to(self.device) - return obs, reward, terminated, truncated, info - - def reset(self, *, seed: int | None = None, options: dict[str, Any] | None = None): - obs, info = self.env.reset(seed=seed, options=options) - for k in obs: - obs[k] = obs[k].to(self.device) - if "action_intervention" in info: - # NOTE: This is a hack to ensure the action intervention is a float32 tensor and supported on MPS device - info["action_intervention"] = info["action_intervention"].astype(np.float32) - info["action_intervention"] = torch.from_numpy(info["action_intervention"]).to(self.device) - return obs, info - - -class GymHilObservationProcessorWrapper(gym.ObservationWrapper): - def __init__(self, env: gym.Env): - super().__init__(env) - prev_space = self.observation_space - new_space = {} - - for key in prev_space: - if "pixels" in key: - for k in prev_space["pixels"]: - new_space[f"observation.images.{k}"] = gym.spaces.Box( - 0.0, 255.0, shape=(3, 128, 128), dtype=np.uint8 - ) - - if key == "agent_pos": - new_space["observation.state"] = prev_space["agent_pos"] - - self.observation_space = gym.spaces.Dict(new_space) - - def observation(self, observation: dict[str, Any]) -> dict[str, Any]: - return preprocess_observation(observation) - - -########################################################### -# Factory functions -########################################################### - - -def make_robot_env(cfg: EnvConfig) -> gym.Env: - """ - Factory function to create a robot environment. - - This function builds a robot environment with all necessary wrappers - based on the provided configuration. +def make_robot_env(cfg: HILSerlRobotEnvConfig) -> tuple[gym.Env, Any]: + """Create robot environment from configuration. Args: - cfg: Configuration object containing environment parameters. + cfg: Environment configuration. Returns: - A gym environment with all necessary wrappers applied. + Tuple of (gym environment, teleoperator device). """ - if cfg.type == "hil": + # Check if this is a GymHIL simulation environment + if cfg.name == "gym_hil": + assert cfg.robot is None and cfg.teleop is None, "GymHIL environment does not support robot or teleop" import gym_hil # noqa: F401 - # TODO (azouitine) + # Extract gripper settings with defaults + use_gripper = cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else True + gripper_penalty = cfg.processor.gripper.gripper_penalty if cfg.processor.gripper is not None else 0.0 + env = gym.make( f"gym_hil/{cfg.task}", image_obs=True, render_mode="human", - use_gripper=cfg.wrapper.use_gripper, - gripper_penalty=cfg.wrapper.gripper_penalty, - ) - env = GymHilObservationProcessorWrapper(env=env) - env = GymHilDeviceWrapper(env=env, device=cfg.device) - env = BatchCompatibleWrapper(env=env) - env = TorchActionWrapper(env=env, device=cfg.device) - return env - - if not hasattr(cfg, "robot") or not hasattr(cfg, "teleop"): - raise ValueError( - "Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop." + use_gripper=use_gripper, + gripper_penalty=gripper_penalty, ) - if cfg.robot is None: - raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.") + return env, None + + # Real robot environment + assert cfg.robot is not None, "Robot config must be provided for real robot environment" + assert cfg.teleop is not None, "Teleop config must be provided for real robot environment" + robot = make_robot_from_config(cfg.robot) teleop_device = make_teleoperator_from_config(cfg.teleop) teleop_device.connect() - # Create base environment + # Create base environment with safe defaults + use_gripper = cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else True + display_cameras = ( + cfg.processor.observation.display_cameras if cfg.processor.observation is not None else False + ) + reset_pose = cfg.processor.reset.fixed_reset_joint_positions if cfg.processor.reset is not None else None + env = RobotEnv( robot=robot, - use_gripper=cfg.wrapper.use_gripper, - display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False, + use_gripper=use_gripper, + display_cameras=display_cameras, + reset_pose=reset_pose, ) - # Add observation and image processing - if cfg.wrapper: - if cfg.wrapper.add_joint_velocity_to_observation: - env = AddJointVelocityToObservation(env=env, fps=cfg.fps) - if cfg.wrapper.add_current_to_observation: - env = AddCurrentToObservation(env=env) - if cfg.wrapper.add_ee_pose_to_observation: - env = EEObservationWrapper(env=env, ee_pose_limits=robot.end_effector_bounds) - - env = ConvertToLeRobotObservation(env=env, device=cfg.device) - - if cfg.wrapper and cfg.wrapper.crop_params_dict is not None: - env = ImageCropResizeWrapper( - env=env, - crop_params_dict=cfg.wrapper.crop_params_dict, - resize_size=cfg.wrapper.resize_size, - ) - - # Add reward computation and control wrappers - reward_classifier = init_reward_classifier(cfg) - if reward_classifier is not None: - env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device) - - env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps) - if cfg.wrapper.use_gripper and cfg.wrapper.gripper_penalty is not None: - env = GripperPenaltyWrapper( - env=env, - penalty=cfg.wrapper.gripper_penalty, - ) - - # Control mode specific wrappers - control_mode = cfg.wrapper.control_mode - if control_mode == "gamepad": - assert isinstance(teleop_device, GamepadTeleop), ( - "teleop_device must be an instance of GamepadTeleop for gamepad control mode" - ) - env = GamepadControlWrapper( - env=env, - teleop_device=teleop_device, - use_gripper=cfg.wrapper.use_gripper, - ) - elif control_mode == "keyboard_ee": - assert isinstance(teleop_device, KeyboardEndEffectorTeleop), ( - "teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode" - ) - env = KeyboardControlWrapper( - env=env, - teleop_device=teleop_device, - use_gripper=cfg.wrapper.use_gripper, - ) - elif control_mode == "leader": - env = GearedLeaderControlWrapper( - env=env, - teleop_device=teleop_device, - end_effector_step_sizes=cfg.robot.end_effector_step_sizes, - use_gripper=cfg.wrapper.use_gripper, - ) - elif control_mode == "leader_automatic": - env = GearedLeaderAutomaticControlWrapper( - env=env, - teleop_device=teleop_device, - end_effector_step_sizes=cfg.robot.end_effector_step_sizes, - use_gripper=cfg.wrapper.use_gripper, - ) - else: - raise ValueError(f"Invalid control mode: {control_mode}") - - env = ResetWrapper( - env=env, - reset_pose=cfg.wrapper.fixed_reset_joint_positions, - reset_time_s=cfg.wrapper.reset_time_s, - ) - - env = BatchCompatibleWrapper(env=env) - env = TorchActionWrapper(env=env, device=cfg.device) - - return env + return env, teleop_device -def init_reward_classifier(cfg): - """ - Load a reward classifier policy from a pretrained path if configured. +def make_processors( + env: gym.Env, teleop_device: Teleoperator | None, cfg: HILSerlRobotEnvConfig, device: str = "cpu" +) -> tuple[Any, Any]: + """Create environment and action processors. Args: - cfg: The environment configuration containing classifier paths. + env: Robot environment instance. + teleop_device: Teleoperator device for intervention. + cfg: Processor configuration. + device: Target device for computations. Returns: - The loaded classifier model or None if not configured. + Tuple of (environment processor, action processor). """ - if cfg.reward_classifier_pretrained_path is None: - return None - - from lerobot.policies.sac.reward_model.modeling_classifier import Classifier - - # Get device from config or default to CUDA - device = getattr(cfg, "device", "cpu") - - # Load the classifier directly using from_pretrained - classifier = Classifier.from_pretrained( - pretrained_name_or_path=cfg.reward_classifier_pretrained_path, + terminate_on_success = ( + cfg.processor.reset.terminate_on_success if cfg.processor.reset is not None else True ) - # Ensure model is on the correct device - classifier.to(device) - classifier.eval() # Set to evaluation mode + if cfg.name == "gym_hil": + action_pipeline_steps = [ + InterventionActionProcessor(terminate_on_success=terminate_on_success), + Torch2NumpyActionProcessor(), + ] - return classifier + # Minimal processor pipeline for GymHIL simulation + env_pipeline_steps = [ + Numpy2TorchActionProcessor(), + VanillaObservationProcessor(), + ToBatchProcessor(), + DeviceProcessor(device=device), + ] + + return RobotProcessor(steps=env_pipeline_steps), RobotProcessor(steps=action_pipeline_steps) + + # Full processor pipeline for real robot environment + # Get robot and motor information for kinematics + motor_names = list(env.robot.bus.motors.keys()) + + # Set up kinematics solver if inverse kinematics is configured + kinematics_solver = None + if cfg.processor.inverse_kinematics is not None: + kinematics_solver = RobotKinematics( + urdf_path=cfg.processor.inverse_kinematics.urdf_path, + target_frame_name=cfg.processor.inverse_kinematics.target_frame_name, + joint_names=motor_names, + ) + + env_pipeline_steps = [ + VanillaObservationProcessor(), + ] + + if cfg.processor.observation is not None: + if cfg.processor.observation.add_joint_velocity_to_observation: + env_pipeline_steps.append(JointVelocityProcessor(dt=1.0 / cfg.fps, num_dof=len(motor_names))) + if cfg.processor.observation.add_current_to_observation: + env_pipeline_steps.append(MotorCurrentProcessor(robot=env.robot)) + + if kinematics_solver is not None: + env_pipeline_steps.append( + ForwardKinematicsJointsToEE( + kinematics=kinematics_solver, + motor_names=motor_names, + ) + ) + + if cfg.processor.image_preprocessing is not None: + env_pipeline_steps.append( + ImageCropResizeProcessor( + crop_params_dict=cfg.processor.image_preprocessing.crop_params_dict, + resize_size=cfg.processor.image_preprocessing.resize_size, + ) + ) + + # Add time limit processor if reset config exists + if cfg.processor.reset is not None: + env_pipeline_steps.append( + TimeLimitProcessor(max_episode_steps=int(cfg.processor.reset.control_time_s * cfg.fps)) + ) + + # Add gripper penalty processor if gripper config exists and enabled + if cfg.processor.gripper is not None and cfg.processor.gripper.use_gripper: + env_pipeline_steps.append( + GripperPenaltyProcessor( + penalty=cfg.processor.gripper.gripper_penalty, + max_gripper_pos=cfg.processor.max_gripper_pos, + ) + ) + + if ( + cfg.processor.reward_classifier is not None + and cfg.processor.reward_classifier.pretrained_path is not None + ): + env_pipeline_steps.append( + RewardClassifierProcessor( + pretrained_path=cfg.processor.reward_classifier.pretrained_path, + device=device, + success_threshold=cfg.processor.reward_classifier.success_threshold, + success_reward=cfg.processor.reward_classifier.success_reward, + terminate_on_success=terminate_on_success, + ) + ) + + env_pipeline_steps.append(ToBatchProcessor()) + env_pipeline_steps.append(DeviceProcessor(device=device)) + + action_pipeline_steps = [ + AddTeleopActionAsComplimentaryData(teleop_device=teleop_device), + AddTeleopEventsAsInfo(teleop_device=teleop_device), + AddRobotObservationAsComplimentaryData(robot=env.robot), + InterventionActionProcessor( + use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, + terminate_on_success=terminate_on_success, + ), + ] + + # Replace InverseKinematicsProcessor with new kinematic processors + if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None: + # Add EE bounds and safety processor + inverse_kinematics_steps = [ + MapDeltaActionToRobotAction(), + EEReferenceAndDelta( + kinematics=kinematics_solver, + end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes, + motor_names=motor_names, + use_latched_reference=False, + ), + EEBoundsAndSafety( + end_effector_bounds=cfg.processor.inverse_kinematics.end_effector_bounds, + ), + InverseKinematicsEEToJoints( + kinematics=kinematics_solver, + motor_names=motor_names, + initial_guess_current_joints=False, + ), + GripperVelocityToJoint( + motor_names=motor_names, + clip_max=cfg.processor.max_gripper_pos, + speed_factor=1.0, + discrete_gripper=True, + ), + ] + action_pipeline_steps.extend(inverse_kinematics_steps) + + return RobotProcessor(steps=env_pipeline_steps), RobotProcessor(steps=action_pipeline_steps) -########################################################### -# Record and replay functions -########################################################### - - -def record_dataset(env, policy, cfg): +def step_env_and_process_transition( + env: gym.Env, + transition: EnvTransition, + action: torch.Tensor, + env_processor: RobotProcessor, + action_processor: RobotProcessor, +): """ - Record a dataset of robot interactions using either a policy or teleop. - - This function runs episodes in the environment and records the observations, - actions, and results for dataset creation. + Execute one step with processor pipeline. Args: - env: The environment to record from. - policy: Optional policy to generate actions (if None, uses teleop). - cfg: Configuration object containing recording parameters like: - - repo_id: Repository ID for dataset storage - - dataset_root: Local root directory for dataset - - num_episodes: Number of episodes to record - - fps: Frames per second for recording - - push_to_hub: Whether to push dataset to Hugging Face Hub - - task: Name/description of the task being recorded - - number_of_steps_after_success: Number of additional steps to continue recording after - a success (reward=1) is detected. This helps collect - more positive examples for reward classifier training. + env: The robot environment + transition: Current transition state + action: Action to execute + env_processor: Environment processor + action_processor: Action processor + + Returns: + Processed transition with updated state. """ - from lerobot.datasets.lerobot_dataset import LeRobotDataset - # Setup initial action (zero action if using teleop) - action = env.action_space.sample() * 0.0 + # Create action transition + transition[TransitionKey.ACTION] = action + processed_action_transition = action_processor(transition) + processed_action = processed_action_transition[TransitionKey.ACTION] - action_names = ["delta_x_ee", "delta_y_ee", "delta_z_ee"] - if cfg.wrapper.use_gripper: - action_names.append("gripper_delta") + # Step environment with processed action + obs, reward, terminated, truncated, info = env.step(processed_action) - # Configure dataset features based on environment spaces - features = { - "observation.state": { - "dtype": "float32", - "shape": env.observation_space["observation.state"].shape, - "names": None, - }, - "action": { - "dtype": "float32", - "shape": (len(action_names),), - "names": action_names, - }, - "next.reward": {"dtype": "float32", "shape": (1,), "names": None}, - "next.done": {"dtype": "bool", "shape": (1,), "names": None}, - "complementary_info.discrete_penalty": { - "dtype": "float32", - "shape": (1,), - "names": ["discrete_penalty"], - }, - } + # Combine rewards from environment and action processor - # Add image features - for key in env.observation_space: - if "image" in key: - features[key] = { - "dtype": "video", - "shape": env.observation_space[key].shape, - "names": ["channels", "height", "width"], - } + reward = reward + processed_action_transition[TransitionKey.REWARD] + terminated = terminated or processed_action_transition[TransitionKey.DONE] + truncated = truncated or processed_action_transition[TransitionKey.TRUNCATED] + complementary_data = processed_action_transition[TransitionKey.COMPLEMENTARY_DATA].copy() + new_info = processed_action_transition[TransitionKey.INFO].copy() + new_info.update(info) - # Create dataset - dataset = LeRobotDataset.create( - cfg.repo_id, - cfg.fps, - root=cfg.dataset_root, - use_videos=True, - image_writer_threads=4, - image_writer_processes=0, - features=features, + new_transition = create_transition( + observation=obs, + action=processed_action, + reward=reward, + done=terminated, + truncated=truncated, + info=new_info, + complementary_data=complementary_data, ) + new_transition = env_processor(new_transition) - # Record episodes - episode_index = 0 - recorded_action = None - while episode_index < cfg.num_episodes: - obs, _ = env.reset() - start_episode_t = time.perf_counter() - log_say(f"Recording episode {episode_index}", play_sounds=True) + return new_transition - # Track success state collection - success_detected = False - success_steps_collected = 0 - # Run episode steps - while time.perf_counter() - start_episode_t < cfg.wrapper.control_time_s: - start_loop_t = time.perf_counter() +def control_loop( + env: gym.Env, + env_processor: RobotProcessor, + action_processor: RobotProcessor, + teleop_device: Teleoperator, + cfg: GymManipulatorConfig, +) -> None: + """Main control loop for robot environment interaction. + if cfg.mode == "record": then a dataset will be created and recorded - # Get action from policy if available - if cfg.pretrained_policy_name_or_path is not None: - action = policy.select_action(obs) + Args: + env: The robot environment + env_processor: Environment processor + action_processor: Action processor + teleop_device: Teleoperator device + cfg: gym_manipulator configuration + """ + dt = 1.0 / cfg.env.fps - # Step environment - obs, reward, terminated, truncated, info = env.step(action) + print(f"Starting control loop at {cfg.env.fps} FPS") + print("Controls:") + print("- Use gamepad/teleop device for intervention") + print("- When not intervening, robot will stay still") + print("- Press Ctrl+C to exit") - # Check if episode needs to be rerecorded - if info.get("rerecord_episode", False): - break + # Reset environment and processors + obs, info = env.reset() + complementary_data = ( + {"raw_joint_positions": info.pop("raw_joint_positions")} if "raw_joint_positions" in info else {} + ) + env_processor.reset() + action_processor.reset() - # For teleop, get action from intervention - recorded_action = { - "action": info["action_intervention"].cpu().squeeze(0).float() if policy is None else action + # Process initial observation + transition = create_transition(observation=obs, info=info, complementary_data=complementary_data) + transition = env_processor(transition) + + # Determine if gripper is used + use_gripper = cfg.env.processor.gripper.use_gripper if cfg.env.processor.gripper is not None else True + + dataset = None + if cfg.mode == "record": + action_features = teleop_device.action_features + features = { + "action": action_features, + "next.reward": {"dtype": "float32", "shape": (1,), "names": None}, + "next.done": {"dtype": "bool", "shape": (1,), "names": None}, + } + if use_gripper: + features["complementary_info.discrete_penalty"] = { + "dtype": "float32", + "shape": (1,), + "names": ["discrete_penalty"], } - # Process observation for dataset - obs_processed = {k: v.cpu().squeeze(0).float() for k, v in obs.items()} + for key, value in transition[TransitionKey.OBSERVATION].items(): + if key == "observation.state": + features[key] = { + "dtype": "float32", + "shape": value.squeeze(0).shape, + "names": None, + } + if "image" in key: + features[key] = { + "dtype": "video", + "shape": value.squeeze(0).shape, + "names": ["channels", "height", "width"], + } - # Check if we've just detected success - if reward == 1.0 and not success_detected: - success_detected = True - logging.info("Success detected! Collecting additional success states.") + # Create dataset + dataset = LeRobotDataset.create( + cfg.dataset.repo_id, + cfg.env.fps, + root=cfg.dataset.dataset_root, + use_videos=True, + image_writer_threads=4, + image_writer_processes=0, + features=features, + ) - # Add frame to dataset - continue marking as success even during extra collection steps - frame = {**obs_processed, **recorded_action} + episode_idx = 0 + episode_step = 0 + episode_start_time = time.perf_counter() - # If we're in the success collection phase, keep marking rewards as 1.0 - if success_detected: - frame["next.reward"] = np.array([1.0], dtype=np.float32) - else: - frame["next.reward"] = np.array([reward], dtype=np.float32) + while episode_idx < cfg.dataset.num_episodes: + step_start_time = time.perf_counter() - # Only mark as done if we're truly done (reached end or collected enough success states) - really_done = terminated or truncated - if success_detected: - success_steps_collected += 1 - really_done = success_steps_collected >= cfg.number_of_steps_after_success + # Create a neutral action (no movement) + neutral_action = torch.tensor([0.0, 0.0, 0.0], dtype=torch.float32) + if use_gripper: + neutral_action = torch.cat([neutral_action, torch.tensor([1.0])]) # Gripper stay - frame["next.done"] = np.array([really_done], dtype=bool) - frame["complementary_info.discrete_penalty"] = torch.tensor( - [info.get("discrete_penalty", 0.0)], dtype=torch.float32 + # Use the new step function + transition = step_env_and_process_transition( + env=env, + transition=transition, + action=neutral_action, + env_processor=env_processor, + action_processor=action_processor, + ) + terminated = transition.get(TransitionKey.DONE, False) + truncated = transition.get(TransitionKey.TRUNCATED, False) + + if cfg.mode == "record": + observations = {k: v.squeeze(0).cpu() for k, v in transition[TransitionKey.OBSERVATION].items()} + # Use teleop_action if available, otherwise use the action from the transition + action_to_record = transition[TransitionKey.COMPLEMENTARY_DATA].get( + "teleop_action", transition[TransitionKey.ACTION] ) - dataset.add_frame(frame, task=cfg.task) + frame = { + **observations, + "action": action_to_record.cpu(), + "next.reward": np.array([transition[TransitionKey.REWARD]], dtype=np.float32), + "next.done": np.array([terminated or truncated], dtype=bool), + } + if use_gripper: + discrete_penalty = transition[TransitionKey.COMPLEMENTARY_DATA].get("discrete_penalty", 0.0) + frame["complementary_info.discrete_penalty"] = np.array([discrete_penalty], dtype=np.float32) + if dataset is not None: + dataset.add_frame(frame, task=cfg.dataset.task) - # Maintain consistent timing - if cfg.fps: - dt_s = time.perf_counter() - start_loop_t - busy_wait(1 / cfg.fps - dt_s) + episode_step += 1 - # Check if we should end the episode - if (terminated or truncated) and not success_detected: - # Regular termination without success - break - elif success_detected and success_steps_collected >= cfg.number_of_steps_after_success: - # We've collected enough success states - logging.info(f"Collected {success_steps_collected} additional success states") - break + # Handle episode termination + if terminated or truncated: + episode_time = time.perf_counter() - episode_start_time + logging.info( + f"Episode ended after {episode_step} steps in {episode_time:.1f}s with reward {transition[TransitionKey.REWARD]}" + ) + episode_step = 0 + episode_idx += 1 - # Handle episode recording - if info.get("rerecord_episode", False): - dataset.clear_episode_buffer() - logging.info(f"Re-recording episode {episode_index}") - continue + if dataset is not None: + if transition[TransitionKey.INFO].get("rerecord_episode", False): + logging.info(f"Re-recording episode {episode_idx}") + dataset.clear_episode_buffer() + episode_idx -= 1 + else: + logging.info(f"Saving episode {episode_idx}") + dataset.save_episode() - dataset.save_episode() - episode_index += 1 + # Reset for new episode + obs, info = env.reset() + env_processor.reset() + action_processor.reset() - # Finalize dataset - # dataset.consolidate(run_compute_stats=True) - if cfg.push_to_hub: + transition = create_transition(observation=obs, info=info) + transition = env_processor(transition) + + # Maintain fps timing + busy_wait(dt - (time.perf_counter() - step_start_time)) + + if dataset is not None and cfg.dataset.push_to_hub: + logging.info("Pushing dataset to hub") dataset.push_to_hub() -def replay_episode(env, cfg): - """ - Replay a recorded episode in the environment. +def replay_trajectory(env: gym.Env, action_processor: RobotProcessor, cfg: GymManipulatorConfig) -> None: + """Replay recorded trajectory on robot environment.""" + dataset = LeRobotDataset( + cfg.dataset.repo_id, + root=cfg.dataset.dataset_root, + episodes=[cfg.dataset.episode], + download_videos=False, + ) + dataset_actions = dataset.hf_dataset.select_columns(["action"]) + _, info = env.reset() - This function loads actions from a previously recorded episode - and executes them in the environment. - - Args: - env: The environment to replay in. - cfg: Configuration object containing replay parameters: - - repo_id: Repository ID for dataset - - dataset_root: Local root directory for dataset - - episode: Episode ID to replay - """ - from lerobot.datasets.lerobot_dataset import LeRobotDataset - - dataset = LeRobotDataset(cfg.repo_id, root=cfg.dataset_root, episodes=[cfg.episode]) - env.reset() - - actions = dataset.hf_dataset.select_columns("action") - - for idx in range(dataset.num_frames): - start_episode_t = time.perf_counter() - - action = actions[idx]["action"] - env.step(action) - - dt_s = time.perf_counter() - start_episode_t - busy_wait(1 / 10 - dt_s) + for action_data in dataset_actions: + start_time = time.perf_counter() + transition = create_transition( + action=action_data["action"], + complementary_data={"raw_joint_positions": info["raw_joint_positions"]}, + ) + transition = action_processor(transition) + _, _, _, _, info = env.step(transition[TransitionKey.ACTION]) + busy_wait(1 / cfg.env.fps - (time.perf_counter() - start_time)) @parser.wrap() -def main(cfg: EnvConfig): - """Main entry point for the robot environment script. +def main(cfg: GymManipulatorConfig) -> None: + """Main entry point for gym manipulator script.""" + env, teleop_device = make_robot_env(cfg.env) + env_processor, action_processor = make_processors(env, teleop_device, cfg.env, cfg.device) - This function runs the robot environment in one of several modes - based on the provided configuration. - - Args: - cfg: Configuration object defining the run parameters, - including mode (record, replay, random) and other settings. - """ - env = make_robot_env(cfg) - - if cfg.mode == "record": - policy = None - if cfg.pretrained_policy_name_or_path is not None: - from lerobot.policies.sac.modeling_sac import SACPolicy - - policy = SACPolicy.from_pretrained(cfg.pretrained_policy_name_or_path) - policy.to(cfg.device) - policy.eval() - - record_dataset( - env, - policy=policy, - cfg=cfg, - ) - exit() + print("Environment observation space:", env.observation_space) + print("Environment action space:", env.action_space) + print("Environment processor:", env_processor) + print("Action processor:", action_processor) if cfg.mode == "replay": - replay_episode( - env, - cfg=cfg, - ) + replay_trajectory(env, action_processor, cfg) exit() - env.reset() - - # Initialize the smoothed action as a random sample. - smoothed_action = env.action_space.sample() * 0.0 - - # Smoothing coefficient (alpha) defines how much of the new random sample to mix in. - # A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth. - alpha = 1.0 - - num_episode = 0 - successes = [] - while num_episode < 10: - start_loop_s = time.perf_counter() - # Sample a new random action from the robot's action space. - new_random_action = env.action_space.sample() - # Update the smoothed action using an exponential moving average. - smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action - - # Execute the step: wrap the NumPy action in a torch tensor. - obs, reward, terminated, truncated, info = env.step(smoothed_action) - if terminated or truncated: - successes.append(reward) - env.reset() - num_episode += 1 - - dt_s = time.perf_counter() - start_loop_s - busy_wait(1 / cfg.fps - dt_s) - - logging.info(f"Success after 20 steps {successes}") - logging.info(f"success rate {sum(successes) / len(successes)}") + control_loop(env, env_processor, action_processor, teleop_device, cfg) if __name__ == "__main__": diff --git a/src/lerobot/scripts/rl/learner.py b/src/lerobot/scripts/rl/learner.py index 17479b0f8..840f0b96e 100644 --- a/src/lerobot/scripts/rl/learner.py +++ b/src/lerobot/scripts/rl/learner.py @@ -75,6 +75,7 @@ from lerobot.policies.sac.modeling_sac import SACPolicy from lerobot.robots import so100_follower # noqa: F401 from lerobot.scripts.rl import learner_service from lerobot.teleoperators import gamepad, so101_leader # noqa: F401 +from lerobot.teleoperators.utils import TeleopEvents from lerobot.transport import services_pb2_grpc from lerobot.transport.utils import ( MAX_MESSAGE_SIZE, @@ -1174,7 +1175,7 @@ def process_transitions( # Add to offline buffer if it's an intervention if dataset_repo_id is not None and transition.get("complementary_info", {}).get( - "is_intervention" + TeleopEvents.IS_INTERVENTION ): offline_replay_buffer.add(**transition) diff --git a/src/lerobot/teleoperators/__init__.py b/src/lerobot/teleoperators/__init__.py index 56f48af7e..ee508dddb 100644 --- a/src/lerobot/teleoperators/__init__.py +++ b/src/lerobot/teleoperators/__init__.py @@ -16,4 +16,4 @@ from .config import TeleoperatorConfig from .teleoperator import Teleoperator -from .utils import make_teleoperator_from_config +from .utils import TeleopEvents, make_teleoperator_from_config diff --git a/src/lerobot/teleoperators/gamepad/gamepad_utils.py b/src/lerobot/teleoperators/gamepad/gamepad_utils.py index 7ebed6b31..397c2104f 100644 --- a/src/lerobot/teleoperators/gamepad/gamepad_utils.py +++ b/src/lerobot/teleoperators/gamepad/gamepad_utils.py @@ -16,6 +16,8 @@ import logging +from ..utils import TeleopEvents + class InputController: """Base class for input controllers that generate motion deltas.""" @@ -134,10 +136,10 @@ class KeyboardController(InputController): return False elif key == keyboard.Key.enter: self.key_states["success"] = True - self.episode_end_status = "success" + self.episode_end_status = TeleopEvents.SUCCESS elif key == keyboard.Key.backspace: self.key_states["failure"] = True - self.episode_end_status = "failure" + self.episode_end_status = TeleopEvents.FAILURE except AttributeError: pass @@ -255,13 +257,13 @@ class GamepadController(InputController): for event in pygame.event.get(): if event.type == pygame.JOYBUTTONDOWN: if event.button == 3: - self.episode_end_status = "success" + self.episode_end_status = TeleopEvents.SUCCESS # A button (1) for failure elif event.button == 1: - self.episode_end_status = "failure" + self.episode_end_status = TeleopEvents.FAILURE # X button (0) for rerecord elif event.button == 0: - self.episode_end_status = "rerecord_episode" + self.episode_end_status = TeleopEvents.RERECORD_EPISODE # RB button (6) for closing gripper elif event.button == 6: @@ -451,11 +453,11 @@ class GamepadControllerHID(InputController): # Check if X/Square button (bit 5) is pressed for failure # Check if A/Cross button (bit 4) is pressed for rerecording if buttons & 1 << 7: - self.episode_end_status = "success" + self.episode_end_status = TeleopEvents.SUCCESS elif buttons & 1 << 5: - self.episode_end_status = "failure" + self.episode_end_status = TeleopEvents.FAILURE elif buttons & 1 << 4: - self.episode_end_status = "rerecord_episode" + self.episode_end_status = TeleopEvents.RERECORD_EPISODE else: self.episode_end_status = None diff --git a/src/lerobot/teleoperators/gamepad/teleop_gamepad.py b/src/lerobot/teleoperators/gamepad/teleop_gamepad.py index 98a0647e2..6902fd6fe 100644 --- a/src/lerobot/teleoperators/gamepad/teleop_gamepad.py +++ b/src/lerobot/teleoperators/gamepad/teleop_gamepad.py @@ -21,6 +21,7 @@ from typing import Any import numpy as np from ..teleoperator import Teleoperator +from ..utils import TeleopEvents from .configuration_gamepad import GamepadTeleopConfig @@ -93,9 +94,9 @@ class GamepadTeleop(Teleoperator): gamepad_action = np.array([delta_x, delta_y, delta_z], dtype=np.float32) action_dict = { - "delta_x": gamepad_action[0], - "delta_y": gamepad_action[1], - "delta_z": gamepad_action[2], + "action.delta_x": gamepad_action[0], + "action.delta_y": gamepad_action[1], + "action.delta_z": gamepad_action[2], } # Default gripper action is to stay @@ -107,6 +108,48 @@ class GamepadTeleop(Teleoperator): return action_dict + def get_teleop_events(self) -> dict[str, Any]: + """ + Get extra control events from the gamepad such as intervention status, + episode termination, success indicators, etc. + + Returns: + Dictionary containing: + - is_intervention: bool - Whether human is currently intervening + - terminate_episode: bool - Whether to terminate the current episode + - success: bool - Whether the episode was successful + - rerecord_episode: bool - Whether to rerecord the episode + """ + if self.gamepad is None: + return { + TeleopEvents.IS_INTERVENTION: False, + TeleopEvents.TERMINATE_EPISODE: False, + TeleopEvents.SUCCESS: False, + TeleopEvents.RERECORD_EPISODE: False, + } + + # Update gamepad state to get fresh inputs + self.gamepad.update() + + # Check if intervention is active + is_intervention = self.gamepad.should_intervene() + + # Get episode end status + episode_end_status = self.gamepad.get_episode_end_status() + terminate_episode = episode_end_status in [ + TeleopEvents.RERECORD_EPISODE, + TeleopEvents.FAILURE, + ] + success = episode_end_status == TeleopEvents.SUCCESS + rerecord_episode = episode_end_status == TeleopEvents.RERECORD_EPISODE + + return { + TeleopEvents.IS_INTERVENTION: is_intervention, + TeleopEvents.TERMINATE_EPISODE: terminate_episode, + TeleopEvents.SUCCESS: success, + TeleopEvents.RERECORD_EPISODE: rerecord_episode, + } + def disconnect(self) -> None: """Disconnect from the gamepad.""" if self.gamepad is not None: diff --git a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py index d034982f1..a8b003ede 100644 --- a/src/lerobot/teleoperators/keyboard/teleop_keyboard.py +++ b/src/lerobot/teleoperators/keyboard/teleop_keyboard.py @@ -24,6 +24,7 @@ from typing import Any from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from ..teleoperator import Teleoperator +from ..utils import TeleopEvents from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig PYNPUT_AVAILABLE = True @@ -167,13 +168,13 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): return { "dtype": "float32", "shape": (4,), - "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3}, + "names": {"action.delta_x": 0, "action.delta_y": 1, "action.delta_z": 2, "action.gripper": 3}, } else: return { "dtype": "float32", "shape": (3,), - "names": {"delta_x": 0, "delta_y": 1, "delta_z": 2}, + "names": {"action.delta_x": 0, "action.delta_y": 1, "action.delta_z": 2}, } def _on_press(self, key): @@ -226,12 +227,75 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop): self.current_pressed.clear() action_dict = { - "delta_x": delta_x, - "delta_y": delta_y, - "delta_z": delta_z, + "action.delta_x": delta_x, + "action.delta_y": delta_y, + "action.delta_z": delta_z, } if self.config.use_gripper: action_dict["gripper"] = gripper_action return action_dict + + def get_teleop_events(self) -> dict[str, Any]: + """ + Get extra control events from the keyboard such as intervention status, + episode termination, success indicators, etc. + + Keyboard mappings: + - Any movement keys pressed = intervention active + - 's' key = success (terminate episode successfully) + - 'r' key = rerecord episode (terminate and rerecord) + - 'q' key = quit episode (terminate without success) + + Returns: + Dictionary containing: + - is_intervention: bool - Whether human is currently intervening + - terminate_episode: bool - Whether to terminate the current episode + - success: bool - Whether the episode was successful + - rerecord_episode: bool - Whether to rerecord the episode + """ + if not self.is_connected: + return { + TeleopEvents.IS_INTERVENTION: False, + TeleopEvents.TERMINATE_EPISODE: False, + TeleopEvents.SUCCESS: False, + TeleopEvents.RERECORD_EPISODE: False, + } + + # Check if any movement keys are currently pressed (indicates intervention) + movement_keys = [ + keyboard.Key.up, + keyboard.Key.down, + keyboard.Key.left, + keyboard.Key.right, + keyboard.Key.shift, + keyboard.Key.shift_r, + keyboard.Key.ctrl_r, + keyboard.Key.ctrl_l, + ] + is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys) + + # Check for episode control commands from misc_keys_queue + terminate_episode = False + success = False + rerecord_episode = False + + # Process any pending misc keys + while not self.misc_keys_queue.empty(): + key = self.misc_keys_queue.get_nowait() + if key == "s": + success = True + elif key == "r": + terminate_episode = True + rerecord_episode = True + elif key == "q": + terminate_episode = True + success = False + + return { + TeleopEvents.IS_INTERVENTION: is_intervention, + TeleopEvents.TERMINATE_EPISODE: terminate_episode, + TeleopEvents.SUCCESS: success, + TeleopEvents.RERECORD_EPISODE: rerecord_episode, + } diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index 344a95d72..bdfd41f02 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -12,10 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +from enum import Enum + from .config import TeleoperatorConfig from .teleoperator import Teleoperator +class TeleopEvents(Enum): + """Shared constants for teleoperator events across teleoperators.""" + + SUCCESS = "success" + FAILURE = "failure" + RERECORD_EPISODE = "rerecord_episode" + IS_INTERVENTION = "is_intervention" + TERMINATE_EPISODE = "terminate_episode" + + def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: if config.type == "keyboard": from .keyboard import KeyboardTeleop