diff --git a/utils/dataset_merging/README.md b/utils/dataset_merging/README.md index 5316546..29f96c3 100644 --- a/utils/dataset_merging/README.md +++ b/utils/dataset_merging/README.md @@ -33,13 +33,14 @@ python dataset_merger.py --sources /path/to/dataset1 /path/to/dataset2 /path/to/ ### (二)命令行参数 - **--sources**:源数据集文件夹路径列表,至少需要指定一个源数据集路径。 - **--output**:输出数据集文件夹路径,用于指定合并后数据集的存储位置。 -- **--max_dim**:向量的最大维度,默认值为32。 +- **--state_max_dim**:状态向量的最大维度,默认值为32。 +- **--action_max_dim**:动作向量的最大维度,默认值为32。 - **--fps**:数据集的帧率,默认值为20。 - **--copy_images**: 是否将图像从源文件夹复制且合并到输出文件夹。(default: `False`) ### (三)示例 ```bash -python dataset_merger.py --sources ./robot_dataset_1 ./robot_dataset_2 --output ./merged_dataset --max_dim 18 --fps 30 +python dataset_merger.py --sources ./robot_dataset_1 ./robot_dataset_2 --output ./merged_dataset --state_max_dim 32 --action_max_dim 18 --fps 30 ``` ## 五、数据集格式 @@ -85,4 +86,4 @@ dataset/ 2. **Q: 如何处理不同FPS的数据集?** **A**: 暂时只支持相同FPS的数据集合并。 3. **Q: 能否只合并某些特定episode?** -**A**: 当前版本会合并所有数据。如需更精细的控制,您可以先筛选数据集,然后再进行合并。 +**A**: 当前版本会合并所有数据。如需更精细的控制,您可以先筛选数据集,然后再进行合并。也可以全部合并,然后使用lerobot加载特定的episode。 diff --git a/utils/dataset_merging/merge_lerobot_dataset.py b/utils/dataset_merging/merge_lerobot_dataset.py index f543167..4bcb137 100644 --- a/utils/dataset_merging/merge_lerobot_dataset.py +++ b/utils/dataset_merging/merge_lerobot_dataset.py @@ -532,7 +532,8 @@ def copy_data_files( source_folders, output_folder, episode_mapping, - max_dim=18, + state_max_dim=32, # 默认状态向量维度为32 + action_max_dim=32, # 默认动作向量维度为32 fps=None, episode_to_frame_index=None, folder_task_mapping=None, @@ -540,25 +541,22 @@ def copy_data_files( default_fps=20, ): """ - 复制并处理parquet数据文件,包括维度填充和索引更新 - (Copy and process parquet data files, including dimension padding and index updates) + 从源文件夹复制数据文件到输出文件夹,同时处理索引映射和维度填充 + (Copy data files from source folders to output folder, handling index mapping and dimension padding) Args: source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) output_folder (str): 输出文件夹路径 (Output folder path) episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表 (List of tuples containing (old_folder, old_index, new_index)) - max_dim (int): 向量的最大维度 (Maximum dimension for vectors) - fps (float, optional): 帧率,如果未提供则从第一个数据集获取 (Frame rate, if not provided will be obtained from the first dataset) - episode_to_frame_index (dict, optional): 每个新episode索引对应的起始帧索引映射 - (Mapping of each new episode index to its starting frame index) - folder_task_mapping (dict, optional): 每个文件夹中task_index的映射关系 - (Mapping of task_index for each folder) - chunks_size (int): 每个chunk包含的episode数量 (Number of episodes per chunk) - default_fps (float): 默认帧率,当无法从数据集获取时使用 (Default frame rate when unable to obtain from dataset) - - Returns: - bool: 是否成功复制了至少一个文件 (Whether at least one file was successfully copied) + state_max_dim (int): 状态向量的最大维度 (Maximum dimension for state vectors) + action_max_dim (int): 动作向量的最大维度 (Maximum dimension for action vectors) + fps (float): 帧率 (frames per second) + episode_to_frame_index (dict): 每个episode对应的起始帧索引 + (Start frame index for each episode) + folder_task_mapping (dict): 文件夹任务映射 (Folder task mapping) + chunks_size (int): 数据块大小 (Chunk size) + default_fps (float): 默认帧率 (Default frame rate) """ # 获取第一个数据集的FPS(如果未提供)(Get FPS from first dataset if not provided) if fps is None: @@ -572,7 +570,7 @@ def copy_data_files( else: fps = default_fps # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) - print(f"使用FPS={fps} (Using FPS={fps})") + print(f"使用FPS={fps}") # 为每个episode复制和处理数据文件 (Copy and process data files for each episode) total_copied = 0 @@ -601,26 +599,48 @@ def copy_data_files( # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 检查是否需要填充维度 (Check if dimensions need padding) - for feature in ["observation.state", "action"]: - if feature in df.columns: - # 检查第一个非空值 (Check first non-null value) - for _idx, value in enumerate(df[feature]): - if value is not None and isinstance(value, (list, np.ndarray)): - current_dim = len(value) - if current_dim < max_dim: - print( - f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)" - ) - # 使用零填充到目标维度 (Pad with zeros to target dimension) - df[feature] = df[feature].apply( - lambda x: np.pad(x, (0, max_dim - len(x)), "constant").tolist() - if x is not None - and isinstance(x, (list, np.ndarray)) - and len(x) < max_dim - else x - ) - break + # 检查是否需要填充维度 - 为不同特征类型使用不同的最大维度 + # 为状态向量填充 + if "observation.state" in df.columns: + # 检查第一个非空值 (Check first non-null value) + for _idx, value in enumerate(df["observation.state"]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < state_max_dim: + print( + f"填充状态向量从 {current_dim} 维到 {state_max_dim} 维" + f" (Padding state vector from {current_dim} to {state_max_dim} dimensions)" + ) + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df["observation.state"] = df["observation.state"].apply( + lambda x: np.pad(x, (0, state_max_dim - len(x)), "constant").tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < state_max_dim + else x + ) + break + + # 为动作向量填充 + if "action" in df.columns: + # 检查第一个非空值 (Check first non-null value) + for _idx, value in enumerate(df["action"]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < action_max_dim: + print( + f"填充动作向量从 {current_dim} 维到 {action_max_dim} 维" + f" (Padding action vector from {current_dim} to {action_max_dim} dimensions)" + ) + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df["action"] = df["action"].apply( + lambda x: np.pad(x, (0, action_max_dim - len(x)), "constant").tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < action_max_dim + else x + ) + break # 更新episode_index列 (Update episode_index column) if "episode_index" in df.columns: @@ -699,28 +719,48 @@ def copy_data_files( # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 检查是否需要填充维度 (Check if dimensions need padding) - for feature in ["observation.state", "action"]: - if feature in df.columns: - # 检查第一个非空值 (Check first non-null value) - for _idx, value in enumerate(df[feature]): - if value is not None and isinstance(value, (list, np.ndarray)): - current_dim = len(value) - if current_dim < max_dim: - print( - f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)" - ) - # 使用零填充到目标维度 (Pad with zeros to target dimension) - df[feature] = df[feature].apply( - lambda x: np.pad( - x, (0, max_dim - len(x)), "constant" - ).tolist() - if x is not None - and isinstance(x, (list, np.ndarray)) - and len(x) < max_dim - else x - ) - break + # 检查是否需要填充维度 - 为不同特征类型使用不同的最大维度 + # 为状态向量填充 + if "observation.state" in df.columns: + # 检查第一个非空值 (Check first non-null value) + for _idx, value in enumerate(df["observation.state"]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < state_max_dim: + print( + f"填充状态向量从 {current_dim} 维到 {state_max_dim} 维" + f" (Padding state vector from {current_dim} to {state_max_dim} dimensions)" + ) + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df["observation.state"] = df["observation.state"].apply( + lambda x: np.pad(x, (0, state_max_dim - len(x)), "constant").tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < state_max_dim + else x + ) + break + + # 为动作向量填充 + if "action" in df.columns: + # 检查第一个非空值 (Check first non-null value) + for _idx, value in enumerate(df["action"]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < action_max_dim: + print( + f"填充动作向量从 {current_dim} 维到 {action_max_dim} 维" + f" (Padding action vector from {current_dim} to {action_max_dim} dimensions)" + ) + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df["action"] = df["action"].apply( + lambda x: np.pad(x, (0, action_max_dim - len(x)), "constant").tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < action_max_dim + else x + ) + break # 更新episode_index列 (Update episode_index column) if "episode_index" in df.columns: @@ -1205,7 +1245,8 @@ def copy_images(source_folders, output_folder, episode_mapping, default_fps=20, def merge_datasets( - source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20 + source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, + state_max_dim=32, action_max_dim=32, default_fps=20 ): """ 将多个数据集文件夹合并为一个,处理索引、维度和元数据 @@ -1216,38 +1257,14 @@ def merge_datasets( output_folder (str): 输出文件夹路径 (Output folder path) validate_ts (bool): 是否验证时间戳 (Whether to validate timestamps) tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) - max_dim (int): 向量的最大维度 (Maximum dimension for vectors) + state_max_dim (int): 状态向量的最大维度 (Maximum dimension for state vectors) + action_max_dim (int): 动作向量的最大维度 (Maximum dimension for action vectors) default_fps (float): 默认帧率 (Default frame rate) - - 这个函数执行以下操作: - (This function performs the following operations:) - 1. 合并所有的episodes、tasks和stats (Merges all episodes, tasks and stats) - 2. 重新编号所有的索引以保持连续性 (Renumbers all indices to maintain continuity) - 3. 填充向量维度使其一致 (Pads vector dimensions for consistency) - 4. 更新元数据文件 (Updates metadata files) - 5. 复制并处理数据和视频文件 (Copies and processes data and video files) - 6. 复制并验证图像文件 (Copies and validates image files) """ # Create output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) os.makedirs(os.path.join(output_folder, "meta"), exist_ok=True) - # 注释掉时间戳验证 - # if validate_ts: - # issues, fps_values = validate_timestamps(source_folders, tolerance_s) - # if issues: - # print("时间戳验证发现以下问题:") - # for issue in issues: - # print(f" - {issue}") - # - # # 获取共同的FPS值 - # if fps_values: - # fps = max(set(fps_values), key=fps_values.count) - # print(f"使用共同FPS值: {fps}") - # else: - # fps = default_fps - # print(f"未找到FPS值,使用默认值: {default_fps}") - # else: fps = default_fps print(f"使用默认FPS值: {fps}") @@ -1266,7 +1283,8 @@ def merge_datasets( all_stats_data = [] # Track dimensions for each folder - folder_dimensions = {} + folder_state_dimensions = {} # 存储每个文件夹的状态向量维度 + folder_action_dimensions = {} # 存储每个文件夹的动作向量维度 # 添加一个变量来跟踪累积的帧数 cumulative_frame_count = 0 @@ -1310,8 +1328,9 @@ def merge_datasets( f"从{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})" ) - # Check dimensions of state vectors in this folder - folder_dim = max_dim # 使用变量替代硬编码的18 + # 分别检查状态和动作向量的维度 + folder_state_dim = state_max_dim # 默认使用传入的状态最大维度 + folder_action_dim = action_max_dim # 默认使用传入的动作最大维度 # Try to find a parquet file to determine dimensions for root, _dirs, files in os.walk(folder): @@ -1319,19 +1338,34 @@ def merge_datasets( if file.endswith(".parquet"): try: df = pd.read_parquet(os.path.join(root, file)) + # 检查状态向量维度 if "observation.state" in df.columns: - first_state = df["observation.state"].iloc[0] - if isinstance(first_state, (list, np.ndarray)): - folder_dim = len(first_state) - print(f"Detected {folder_dim} dimensions in {folder}") - break + for state_val in df["observation.state"]: + if state_val is not None and isinstance(state_val, (list, np.ndarray)): + folder_state_dim = len(state_val) + print(f"Detected {folder_state_dim} dimensions for state in {folder}") + break + + # 检查动作向量维度 + if "action" in df.columns: + for action_val in df["action"]: + if action_val is not None and isinstance(action_val, (list, np.ndarray)): + folder_action_dim = len(action_val) + print(f"Detected {folder_action_dim} dimensions for action in {folder}") + break + + # 如果两个维度都已检测到,可以停止搜索 + if folder_state_dim != state_max_dim and folder_action_dim != action_max_dim: + break except Exception as e: print(f"Error checking dimensions in {folder}: {e}") break - if folder_dim != max_dim: # 使用变量替代硬编码的18 + # 如果两个维度都已检测到,可以停止搜索 + if folder_state_dim != state_max_dim and folder_action_dim != action_max_dim: break - folder_dimensions[folder] = folder_dim + folder_state_dimensions[folder] = folder_state_dim + folder_action_dimensions[folder] = folder_action_dim # Load episodes episodes_path = os.path.join(folder, "meta", "episodes.jsonl") @@ -1391,25 +1425,25 @@ def merge_datasets( stats = stats_map[old_index] stats["episode_index"] = new_index - # Pad stats data if needed - if "stats" in stats and folder_dimensions[folder] < max_dim: # 使用变量替代硬编码的18 - # Pad observation.state and action stats - for feature in ["observation.state", "action"]: - if feature in stats["stats"]: - for stat_type in ["mean", "std", "max", "min"]: - if stat_type in stats["stats"][feature]: - # Get current values - values = stats["stats"][feature][stat_type] - - # Check if it's a list/array that needs padding - if ( - isinstance(values, list) and len(values) < max_dim - ): # 使用变量替代硬编码的18 - # Pad with zeros - padded = values + [0.0] * ( - max_dim - len(values) - ) # 使用变量替代硬编码的18 - stats["stats"][feature][stat_type] = padded + # 填充统计数据 + if "stats" in stats: + # 分别填充observation.state的统计数据 + if "observation.state" in stats["stats"] and folder_state_dimensions[folder] < state_max_dim: + for stat_type in ["mean", "std", "max", "min"]: + if stat_type in stats["stats"]["observation.state"]: + values = stats["stats"]["observation.state"][stat_type] + if isinstance(values, list) and len(values) < state_max_dim: + padded = values + [0.0] * (state_max_dim - len(values)) + stats["stats"]["observation.state"][stat_type] = padded + + # 分别处理action的统计数据 + if "action" in stats["stats"] and folder_action_dimensions[folder] < action_max_dim: + for stat_type in ["mean", "std", "max", "min"]: + if stat_type in stats["stats"]["action"]: + values = stats["stats"]["action"][stat_type] + if isinstance(values, list) and len(values) < action_max_dim: + padded = values + [0.0] * (action_max_dim - len(values)) + stats["stats"]["action"][stat_type] = padded all_episodes_stats.append(stats) @@ -1577,16 +1611,25 @@ def merge_datasets( # Update feature dimensions to the maximum dimension if "features" in info: - # Find the maximum dimension across all folders - actual_max_dim = max_dim # 使用变量替代硬编码的18 - for _folder, dim in folder_dimensions.items(): - actual_max_dim = max(actual_max_dim, dim) - - # Update observation.state and action dimensions - for feature_name in ["observation.state", "action"]: - if feature_name in info["features"] and "shape" in info["features"][feature_name]: - info["features"][feature_name]["shape"] = [actual_max_dim] - print(f"Updated {feature_name} shape to {actual_max_dim}") + # 使用检测到的最大状态和动作维度 + actual_state_max_dim = state_max_dim + actual_action_max_dim = action_max_dim + + for _folder, dim in folder_state_dimensions.items(): + actual_state_max_dim = max(actual_state_max_dim, dim) + + for _folder, dim in folder_action_dimensions.items(): + actual_action_max_dim = max(actual_action_max_dim, dim) + + # 更新状态向量维度 + if "observation.state" in info["features"] and "shape" in info["features"]["observation.state"]: + info["features"]["observation.state"]["shape"] = [actual_state_max_dim] + print(f"Updated observation.state shape to {actual_state_max_dim}") + + # 更新动作向量维度 + if "action" in info["features"] and "shape" in info["features"]["action"]: + info["features"]["action"]["shape"] = [actual_action_max_dim] + print(f"Updated action shape to {actual_action_max_dim}") # 更新视频总数 (Update total videos) info["total_videos"] = total_videos @@ -1608,7 +1651,8 @@ def merge_datasets( source_folders, output_folder, episode_mapping, - max_dim=max_dim, + state_max_dim=state_max_dim, + action_max_dim=action_max_dim, fps=fps, episode_to_frame_index=episode_to_frame_index, folder_task_mapping=folder_task_mapping, @@ -1631,12 +1675,19 @@ if __name__ == "__main__": # Add arguments parser.add_argument("--sources", nargs="+", required=True, help="List of source folder paths") parser.add_argument("--output", required=True, help="Output folder path") - parser.add_argument("--max_dim", type=int, default=32, help="Maximum dimension (default: 32)") + parser.add_argument("--state_max_dim", type=int, default=32, help="Maximum state vector dimension (default: 32)") + parser.add_argument("--action_max_dim", type=int, default=32, help="Maximum action vector dimension (default: 32)") parser.add_argument("--fps", type=int, default=20, help="Your datasets FPS (default: 20)") - parser.add_argument("--copy_images", action="store_true", help="Whether copy images from source folders to output folder with validation. (default: False)",) + parser.add_argument("--copy_images", action="store_true", help="Whether to copy images (default: False)") # Parse arguments args = parser.parse_args() # Use parsed arguments - merge_datasets(args.sources, args.output, max_dim=args.max_dim, default_fps=args.fps) + merge_datasets( + args.sources, + args.output, + state_max_dim=args.state_max_dim, + action_max_dim=args.action_max_dim, + default_fps=args.fps + )