diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index a9a872f52..30f098101 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.configs.types import FeatureType, PolicyFeature @@ -25,14 +24,15 @@ from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors from lerobot.processor import ( - EnvTransition, RobotAction, + RobotObservation, RobotProcessorPipeline, make_default_teleop_action_processor, ) from lerobot.processor.converters import ( - identity_transition, observation_to_transition, + robot_action_to_transition, + transition_to_observation, transition_to_robot_action, ) from lerobot.record import record_loop @@ -74,7 +74,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction]( +robot_ee_to_joints_processor = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[ AddRobotObservationAsComplimentaryData(robot=robot), InverseKinematicsEEToJoints( @@ -83,17 +83,17 @@ robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction initial_guess_current_joints=True, ), ], - to_transition=identity_transition, + to_transition=robot_action_to_transition, to_output=transition_to_robot_action, ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose_processor = RobotProcessorPipeline[dict[str, Any], EnvTransition]( +robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation]( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], to_transition=observation_to_transition, - to_output=identity_transition, + to_output=transition_to_observation, ) diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index 1e0e27b34..08f190e70 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -14,18 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features from lerobot.datasets.utils import combine_feature_dicts from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import EnvTransition, RobotAction, RobotProcessorPipeline +from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline from lerobot.processor.converters import ( - identity_transition, observation_to_transition, robot_action_to_transition, + transition_to_observation, transition_to_robot_action, ) from lerobot.record import record_loop @@ -75,7 +74,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert phone action to ee pose action -phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, EnvTransition]( +phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[ MapPhoneActionToRobotAction(platform=teleop_config.phone_os), AddRobotObservationAsComplimentaryData(robot=robot), @@ -92,11 +91,11 @@ phone_to_robot_ee_pose_processor = RobotProcessorPipeline[RobotAction, EnvTransi GripperVelocityToJoint(), ], to_transition=robot_action_to_transition, - to_output=identity_transition, + to_output=transition_to_robot_action, ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction]( +robot_ee_to_joints_processor = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[ InverseKinematicsEEToJoints( kinematics=kinematics_solver, @@ -104,17 +103,17 @@ robot_ee_to_joints_processor = RobotProcessorPipeline[EnvTransition, RobotAction initial_guess_current_joints=True, ), ], - to_transition=identity_transition, + to_transition=robot_action_to_transition, to_output=transition_to_robot_action, ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose = RobotProcessorPipeline[dict[str, Any], EnvTransition]( +robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation]( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], to_transition=observation_to_transition, - to_output=identity_transition, + to_output=transition_to_observation, ) # Create the dataset diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 7e6b609b9..c14120538 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -20,7 +20,14 @@ from .converters import ( create_transition, transition_to_batch, ) -from .core import EnvAction, EnvTransition, PolicyAction, RobotAction, TransitionKey +from .core import ( + EnvAction, + EnvTransition, + PolicyAction, + RobotAction, + RobotObservation, + TransitionKey, +) from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep from .device_processor import DeviceProcessorStep from .factory import ( @@ -99,6 +106,7 @@ __all__ = [ "ProcessorStepRegistry", "RobotAction", "RobotActionProcessorStep", + "RobotObservation", "RenameObservationsProcessorStep", "RewardClassifierProcessorStep", "RewardProcessorStep", diff --git a/src/lerobot/processor/converters.py b/src/lerobot/processor/converters.py index cc8ec1edb..89256fe94 100644 --- a/src/lerobot/processor/converters.py +++ b/src/lerobot/processor/converters.py @@ -25,7 +25,7 @@ import torch from lerobot.constants import OBS_IMAGES -from .core import EnvTransition, PolicyAction, RobotAction, TransitionKey +from .core import EnvTransition, PolicyAction, RobotAction, RobotObservation, TransitionKey @singledispatch @@ -265,7 +265,7 @@ def robot_action_to_transition(action: RobotAction) -> EnvTransition: return create_transition(observation={}, action=action) -def observation_to_transition(observation: dict[str, Any]) -> EnvTransition: +def observation_to_transition(observation: RobotObservation) -> EnvTransition: """ Convert a raw robot observation dictionary into a standardized `EnvTransition`. @@ -299,6 +299,9 @@ def transition_to_robot_action(transition: EnvTransition) -> RobotAction: Returns: A dictionary representing the raw robot action. """ + action = transition.get(TransitionKey.ACTION) + if not isinstance(action, dict): + raise ValueError(f"Action should be a RobotAction type (dict) got {type(action)}") return transition.get(TransitionKey.ACTION) @@ -312,6 +315,16 @@ def transition_to_policy_action(transition: EnvTransition) -> PolicyAction: return action +def transition_to_observation(transition: EnvTransition) -> RobotObservation: + """ + Convert an `EnvTransition` to a `RobotObservation`. + """ + observation = transition.get(TransitionKey.OBSERVATION) + if not isinstance(observation, dict): + raise ValueError(f"Observation should be a RobotObservation (dict) type got {type(observation)}") + return observation + + def policy_action_to_transition(action: PolicyAction) -> EnvTransition: """ Convert a `PolicyAction` to an `EnvTransition`. diff --git a/src/lerobot/processor/core.py b/src/lerobot/processor/core.py index 9a16cbaff..679ba8c54 100644 --- a/src/lerobot/processor/core.py +++ b/src/lerobot/processor/core.py @@ -39,6 +39,7 @@ class TransitionKey(str, Enum): PolicyAction: TypeAlias = torch.Tensor RobotAction: TypeAlias = dict[str, Any] EnvAction: TypeAlias = np.ndarray +RobotObservation: TypeAlias = dict[str, Any] EnvTransition = TypedDict( diff --git a/src/lerobot/processor/factory.py b/src/lerobot/processor/factory.py index 9ed705755..4d611dccb 100644 --- a/src/lerobot/processor/factory.py +++ b/src/lerobot/processor/factory.py @@ -13,41 +13,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any from .converters import ( - identity_transition, observation_to_transition, robot_action_to_transition, + transition_to_observation, transition_to_robot_action, ) -from .core import EnvTransition, RobotAction +from .core import RobotAction, RobotObservation from .pipeline import IdentityProcessorStep, RobotProcessorPipeline -def make_default_teleop_action_processor() -> RobotProcessorPipeline[RobotAction, EnvTransition]: - teleop_action_processor = RobotProcessorPipeline[RobotAction, EnvTransition]( +def make_default_teleop_action_processor() -> RobotProcessorPipeline[RobotAction, RobotAction]: + teleop_action_processor = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[IdentityProcessorStep()], to_transition=robot_action_to_transition, - to_output=identity_transition, + to_output=transition_to_robot_action, ) return teleop_action_processor -def make_default_robot_action_processor() -> RobotProcessorPipeline[EnvTransition, RobotAction]: - robot_action_processor = RobotProcessorPipeline[EnvTransition, RobotAction]( +def make_default_robot_action_processor() -> RobotProcessorPipeline[RobotAction, RobotAction]: + robot_action_processor = RobotProcessorPipeline[RobotAction, RobotAction]( steps=[IdentityProcessorStep()], - to_transition=identity_transition, + to_transition=robot_action_to_transition, to_output=transition_to_robot_action, ) return robot_action_processor -def make_default_robot_observation_processor() -> RobotProcessorPipeline[dict[str, Any], EnvTransition]: - robot_observation_processor = RobotProcessorPipeline[dict[str, Any], EnvTransition]( +def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]: + robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation]( steps=[IdentityProcessorStep()], to_transition=observation_to_transition, - to_output=identity_transition, + to_output=transition_to_observation, ) return robot_observation_processor diff --git a/src/lerobot/record.py b/src/lerobot/record.py index b186567ae..a7cbc3dfb 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -79,12 +79,11 @@ from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy from lerobot.processor import ( - EnvTransition, PolicyAction, PolicyProcessorPipeline, RobotAction, + RobotObservation, RobotProcessorPipeline, - TransitionKey, make_default_processors, ) from lerobot.processor.rename_processor import rename_stats @@ -236,9 +235,11 @@ def record_loop( robot: Robot, events: dict, fps: int, - teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition], # runs after teleop - robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction], # runs before robot - robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition], # runs after robot + teleop_action_processor: RobotProcessorPipeline[RobotAction, RobotAction], # runs after teleop + robot_action_processor: RobotProcessorPipeline[RobotAction, RobotAction], # runs before robot + robot_observation_processor: RobotProcessorPipeline[ + RobotObservation, RobotObservation + ], # runs after robot dataset: LeRobotDataset | None = None, teleop: Teleoperator | list[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, @@ -281,10 +282,6 @@ def record_loop( preprocessor.reset() postprocessor.reset() - policy_transition = None - teleop_transition = None - obs_transition = None - timestamp = 0 start_episode_t = time.perf_counter() while timestamp < control_time_s: @@ -298,12 +295,10 @@ def record_loop( obs = robot.get_observation() # Applies a pipeline to the raw robot observation, default is IdentityProcessor - obs_transition = robot_observation_processor(obs) + obs_processed = robot_observation_processor(obs) if policy is not None or dataset is not None: - observation_frame = build_dataset_frame( - dataset.features, obs_transition[TransitionKey.OBSERVATION], prefix="observation" - ) + observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix="observation") # Get action from either policy or teleop if policy is not None and preprocessor is not None and postprocessor is not None: @@ -319,10 +314,8 @@ def record_loop( ) action_names = dataset.features["action"]["names"] - policy_action = {f"{name}": float(action_values[i]) for i, name in enumerate(action_names)} - policy_transition = { - TransitionKey.ACTION: policy_action, - TransitionKey.COMPLEMENTARY_DATA: {}, + act_processed_policy: RobotAction = { + f"{name}": float(action_values[i]) for i, name in enumerate(action_names) } elif isinstance(teleop, Teleoperator): @@ -330,7 +323,7 @@ def record_loop( # Applies a pipeline to the raw teleop action, default is IdentityProcessor # TODO(Steven): This assumes that the processor passed by the user should have identity_transition as to_output. - teleop_transition = teleop_action_processor(act) + act_processed_teleop = teleop_action_processor(act) elif isinstance(teleop, list): arm_action = teleop_arm.get_action() @@ -338,7 +331,7 @@ def record_loop( keyboard_action = teleop_keyboard.get_action() base_action = robot._from_keyboard_to_base_action(keyboard_action) act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action - teleop_transition = teleop_action_processor(act) + act_processed_teleop = teleop_action_processor(act) else: logging.info( "No policy or teleoperator provided, skipping action generation. " @@ -349,12 +342,12 @@ def record_loop( # Applies a pipeline to the action, default is IdentityProcessor # IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action() - if policy is not None and policy_transition is not None: - action_values = policy_transition[TransitionKey.ACTION] - robot_action_to_send = robot_action_processor(policy_transition) + if policy is not None and act_processed_policy is not None: + action_values = act_processed_policy + robot_action_to_send = robot_action_processor(act_processed_policy) else: - action_values = teleop_transition[TransitionKey.ACTION] - robot_action_to_send = robot_action_processor(teleop_transition) + action_values = act_processed_teleop + robot_action_to_send = robot_action_processor(act_processed_teleop) # Send action to robot # Action can eventually be clipped using `max_relative_target`, @@ -369,7 +362,7 @@ def record_loop( dataset.add_frame(frame) if display_data: - log_rerun_data(observation=obs_transition[TransitionKey.OBSERVATION], action=action_values) + log_rerun_data(observation=obs_processed, action=action_values) dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s)