Python Scripts Collection¶
Ready-to-run Python scripts demonstrating Dataphy SDK capabilities with comprehensive examples and use cases.
Basic Dataset Operations¶
Dataset Explorer Script¶
#!/usr/bin/env python3
"""
Dataset Explorer - Comprehensive dataset analysis and visualization.
Usage:
python dataset_explorer.py --dataset-path ./my_dataset [--format lerobot]
"""
import argparse
from pathlib import Path
from typing import Optional
from dataphy.dataset.registry import create_dataset_loader, DatasetFormat, list_supported_formats
from dataphy.dataset.episode_augmentor import EpisodeAugmentor
def explore_dataset(dataset_path: str, format_type: Optional[str] = None):
"""Explore dataset structure, episodes, and available data."""
print(f"Exploring dataset: {dataset_path}")
print("-" * 50)
# Create loader
if format_type:
format_enum = DatasetFormat(format_type)
loader = create_dataset_loader(dataset_path, format_type=format_enum)
else:
loader = create_dataset_loader(dataset_path)
# Get dataset info
try:
info = loader.get_dataset_info()
print(f"Dataset Information:")
print(f" Format: {info.format}")
print(f" Episodes: {info.num_episodes}")
print(f" Total Steps: {info.total_steps}")
print()
except Exception as e:
print(f"Could not load dataset info: {e}")
# List episodes
episodes = loader.get_episode_ids()
print(f"Episodes ({len(episodes)} total):")
for i, episode_id in enumerate(episodes[:10]): # Show first 10
print(f" [{i}] {episode_id}")
if len(episodes) > 10:
print(f" ... and {len(episodes) - 10} more episodes")
print()
# Analyze first episode
if episodes:
first_episode = episodes[0]
print(f"First Episode Analysis: {first_episode}")
try:
episode_data = loader.get_episode(first_episode)
print(f" Keys: {list(episode_data.keys())}")
# Check for observations
if 'observation' in episode_data:
obs = episode_data['observation']
print(f" Observations: {list(obs.keys())}")
# Check for images
if 'images' in obs:
images = obs['images']
print(f" Image streams: {list(images.keys())}")
# Analyze first image
first_cam = list(images.keys())[0]
img_tensor = images[first_cam]
print(f" {first_cam} shape: {img_tensor.shape}")
# Check for actions
if 'action' in episode_data:
action = episode_data['action']
print(f" Action shape: {action.shape}")
except Exception as e:
print(f" Could not load episode data: {e}")
print()
# Camera analysis for augmentation
if episodes:
augmentor = EpisodeAugmentor(loader)
cameras = augmentor.get_available_cameras(episodes[0])
print(f"🎥 Available Cameras for Augmentation:")
for camera in cameras:
print(f" • {camera}")
print()
print("Dataset exploration complete!")
def main():
parser = argparse.ArgumentParser(description="Explore dataset structure and contents")
parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
parser.add_argument("--format", choices=[f.value for f in DatasetFormat],
help="Dataset format (auto-detected if not specified)")
args = parser.parse_args()
if not Path(args.dataset_path).exists():
print(f"Dataset path does not exist: {args.dataset_path}")
return
try:
explore_dataset(args.dataset_path, args.format)
except Exception as e:
print(f"Error exploring dataset: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Episode Data Sampler¶
#!/usr/bin/env python3
"""
Episode Data Sampler - Extract and analyze specific episodes and timesteps.
Usage:
python episode_sampler.py --dataset-path ./dataset --episode 0 --timesteps 10 20 50 100
"""
import argparse
import json
from pathlib import Path
from typing import List, Optional
import torch
import numpy as np
from PIL import Image
from dataphy.dataset.registry import create_dataset_loader
def sample_episode_data(dataset_path: str, episode_idx: int, timesteps: List[int],
output_dir: Optional[str] = None):
"""Sample specific timesteps from an episode and save data."""
loader = create_dataset_loader(dataset_path)
episodes = loader.get_episode_ids()
if episode_idx >= len(episodes):
raise ValueError(f"Episode index {episode_idx} out of range (0-{len(episodes)-1})")
episode_id = episodes[episode_idx]
print(f"Sampling episode: {episode_id} (index {episode_idx})")
# Create output directory if specified
if output_dir:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"Output directory: {output_path}")
episode_data = loader.get_episode(episode_id)
max_timesteps = len(episode_data.get('action', []))
results = {
'episode_id': episode_id,
'episode_index': episode_idx,
'total_timesteps': max_timesteps,
'sampled_timesteps': []
}
for timestep in timesteps:
if timestep >= max_timesteps:
print(f"Timestep {timestep} out of range (max: {max_timesteps-1})")
continue
print(f"Timestep {timestep}:")
# Get timestep data
try:
timestep_data = loader.get_timestep(episode_id, timestep)
except AttributeError:
# Fallback: extract from episode data
timestep_data = {
'action': episode_data['action'][timestep] if 'action' in episode_data else None,
'observation': {}
}
if 'observation' in episode_data:
obs = episode_data['observation']
for key, value in obs.items():
if isinstance(value, torch.Tensor) and len(value) > timestep:
timestep_data['observation'][key] = value[timestep]
elif isinstance(value, dict):
timestep_data['observation'][key] = {}
for sub_key, sub_value in value.items():
if isinstance(sub_value, torch.Tensor) and len(sub_value) > timestep:
timestep_data['observation'][key][sub_key] = sub_value[timestep]
# Process timestep data
timestep_info = {
'timestep': timestep,
'action': None,
'state': None,
'images': {}
}
# Extract action
if 'action' in timestep_data and timestep_data['action'] is not None:
action = timestep_data['action']
if isinstance(action, torch.Tensor):
action = action.numpy()
timestep_info['action'] = action.tolist()
print(f" Action: {action}")
# Extract observations
if 'observation' in timestep_data:
obs = timestep_data['observation']
# Handle state data
if 'state' in obs:
state = obs['state']
if isinstance(state, torch.Tensor):
state = state.numpy()
timestep_info['state'] = state.tolist()
print(f" State shape: {state.shape}")
# Handle images
if 'images' in obs:
images = obs['images']
for cam_name, img_tensor in images.items():
if isinstance(img_tensor, torch.Tensor):
# Convert to numpy and save
img_np = img_tensor.permute(1, 2, 0).numpy() # CHW -> HWP
img_np = (img_np * 255).astype(np.uint8)
print(f" {cam_name}: {img_tensor.shape} -> {img_np.shape}")
# Save image if output directory specified
if output_dir:
img_filename = f"{episode_id}_t{timestep:04d}_{cam_name}.jpg"
img_path = output_path / img_filename
Image.fromarray(img_np).save(img_path)
timestep_info['images'][cam_name] = str(img_path)
print(f" Saved: {img_path}")
else:
timestep_info['images'][cam_name] = f"shape_{img_np.shape}"
results['sampled_timesteps'].append(timestep_info)
print()
# Save metadata
if output_dir:
metadata_path = output_path / f"{episode_id}_metadata.json"
with open(metadata_path, 'w') as f:
json.dump(results, f, indent=2)
print(f"Metadata saved: {metadata_path}")
return results
def main():
parser = argparse.ArgumentParser(description="Sample episode data at specific timesteps")
parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
parser.add_argument("--episode", type=int, default=0, help="Episode index to sample")
parser.add_argument("--timesteps", type=int, nargs="+", default=[0, 10, 50, 100],
help="Timesteps to sample")
parser.add_argument("--output-dir", help="Directory to save extracted data")
args = parser.parse_args()
try:
sample_episode_data(args.dataset_path, args.episode, args.timesteps, args.output_dir)
print("Episode sampling complete!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Augmentation Scripts¶
Batch Augmentation Processor¶
#!/usr/bin/env python3
"""
Batch Augmentation Processor - Apply augmentations to multiple episodes with progress tracking.
Usage:
python batch_augment.py --dataset-path ./dataset --config config.yaml --episodes 0 1 2 5
"""
import argparse
import time
from pathlib import Path
from typing import List, Optional
from dataphy.dataset.registry import create_dataset_loader
from dataphy.dataset.episode_augmentor import EpisodeAugmentor
def batch_augment_episodes(dataset_path: str, config_file: str,
episode_indices: Optional[List[int]] = None,
camera_streams: Optional[List[str]] = None,
preserve_original: bool = True):
"""Apply augmentations to multiple episodes with progress tracking."""
print(f"Starting batch augmentation")
print(f" Dataset: {dataset_path}")
print(f" Config: {config_file}")
print("-" * 60)
# Load dataset
loader = create_dataset_loader(dataset_path)
episodes = loader.get_episode_ids()
augmentor = EpisodeAugmentor(loader)
# Determine episodes to process
if episode_indices is None:
target_episodes = list(range(len(episodes)))
print(f"Processing all {len(episodes)} episodes")
else:
target_episodes = episode_indices
print(f"Processing {len(target_episodes)} specified episodes")
# Determine cameras to process
if camera_streams is None:
first_episode = episodes[0] if episodes else None
if first_episode:
available_cameras = augmentor.get_available_cameras(first_episode)
print(f"🎥 Using all available cameras: {available_cameras}")
else:
available_cameras = []
else:
available_cameras = camera_streams
print(f"Using specified cameras: {available_cameras}")
print()
# Process episodes
start_time = time.time()
successful = 0
failed = 0
for i, episode_idx in enumerate(target_episodes):
if episode_idx >= len(episodes):
print(f"Episode index {episode_idx} out of range, skipping")
failed += 1
continue
episode_id = episodes[episode_idx]
print(f"[{i+1}/{len(target_episodes)}] Processing {episode_id} (index {episode_idx})")
try:
episode_start = time.time()
# Apply augmentation
augmentor.augment_episode(
episode_id=episode_idx,
config_file=config_file,
camera_streams=camera_streams,
preserve_original=preserve_original
)
episode_time = time.time() - episode_start
print(f" Complete in {episode_time:.1f}s")
successful += 1
except Exception as e:
print(f" Failed: {e}")
failed += 1
print()
# Summary
total_time = time.time() - start_time
print("Batch Augmentation Summary")
print("-" * 40)
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Total time: {total_time:.1f}s")
print(f" Avg time per episode: {total_time/len(target_episodes):.1f}s")
if preserve_original:
backups = augmentor.list_backups()
print(f" Backups created: {len(backups)}")
return successful, failed
def main():
parser = argparse.ArgumentParser(description="Apply augmentations to multiple episodes")
parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
parser.add_argument("--config", required=True, help="Augmentation config file")
parser.add_argument("--episodes", type=int, nargs="+",
help="Episode indices to process (default: all)")
parser.add_argument("--cameras", type=str, nargs="+",
help="Camera streams to augment (default: all)")
parser.add_argument("--no-backup", action="store_true",
help="Don't create backups of original files")
args = parser.parse_args()
# Validate inputs
if not Path(args.dataset_path).exists():
print(f"Dataset path does not exist: {args.dataset_path}")
return
if not Path(args.config).exists():
print(f"Config file does not exist: {args.config}")
return
try:
successful, failed = batch_augment_episodes(
dataset_path=args.dataset_path,
config_file=args.config,
episode_indices=args.episodes,
camera_streams=args.cameras,
preserve_original=not args.no_backup
)
if failed == 0:
print("All episodes processed successfully!")
else:
print(f"{failed} episodes failed to process")
except Exception as e:
print(f"Batch processing failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Augmentation Parameter Tester¶
#!/usr/bin/env python3
"""
Augmentation Parameter Tester - Test different augmentation parameters and visualize results.
Usage:
python test_augmentation.py --input sample.jpg --output-dir ./test_results
"""
import argparse
from pathlib import Path
import tempfile
import yaml
from dataphy.visionpack.pipeline import build_pipeline
import torch
import torchvision.transforms as T
from PIL import Image
def create_test_configs(output_dir: Path):
"""Create test configuration files with different parameters."""
configs = {
"minimal": {
"version": 1,
"pipeline": {
"sync_views": True,
"steps": [
{"name": "color_jitter", "magnitude": 0.05}
]
},
"seed": 42
},
"gentle": {
"version": 1,
"pipeline": {
"sync_views": True,
"steps": [
{"name": "random_crop_pad", "keep_ratio_min": 0.95},
{"name": "random_translate", "px": 3},
{"name": "color_jitter", "magnitude": 0.08}
]
},
"seed": 42
},
"balanced": {
"version": 1,
"pipeline": {
"sync_views": True,
"steps": [
{"name": "random_crop_pad", "keep_ratio_min": 0.88},
{"name": "random_translate", "px": 8},
{"name": "color_jitter", "magnitude": 0.15},
{"name": "random_conv", "kernel_variance": 0.035}
]
},
"seed": 42
},
"aggressive": {
"version": 1,
"pipeline": {
"sync_views": True,
"steps": [
{"name": "random_crop_pad", "keep_ratio_min": 0.75},
{"name": "random_translate", "px": 15},
{"name": "color_jitter", "magnitude": 0.25},
{"name": "random_conv", "kernel_variance": 0.06},
{"name": "cutout", "holes": 2, "size_range": [16, 32]}
]
},
"seed": 42
},
"robotics_optimized": {
"version": 1,
"pipeline": {
"sync_views": True,
"steps": [
{"name": "random_crop_pad", "keep_ratio_min": 0.92},
{"name": "random_translate", "px": 5},
{"name": "color_jitter", "magnitude": 0.12},
{"name": "random_conv", "kernel_variance": 0.025},
{"name": "cutout", "holes": 1, "size_range": [6, 12]}
]
},
"seed": 42
}
}
config_files = {}
for name, config in configs.items():
config_path = output_dir / f"config_{name}.yaml"
with open(config_path, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
config_files[name] = config_path
print(f"Created config: {config_path}")
return config_files
def test_augmentation_configs(input_image: str, output_dir: Path, num_samples: int = 5):
"""Test different augmentation configurations on input image."""
print(f"Testing augmentation configurations")
print(f" Input: {input_image}")
print(f" Output: {output_dir}")
print(f" Samples per config: {num_samples}")
print("-" * 50)
# Create test configs
config_files = create_test_configs(output_dir)
# Load input image
input_pil = Image.open(input_image).convert('RGB')
input_tensor = T.ToTensor()(input_pil).unsqueeze(0) # Add batch dimension
# Save original
original_path = output_dir / "00_original.jpg"
input_pil.save(original_path)
print(f"Original saved: {original_path}")
# Test each configuration
tensor_to_pil = T.ToPILImage()
for config_name, config_path in config_files.items():
print(f"\nTesting config: {config_name}")
try:
# Build pipeline
pipeline = build_pipeline(str(config_path), device="cpu")
# Generate samples
for sample_idx in range(num_samples):
# Apply augmentation
batch = {"images": input_tensor}
with torch.no_grad():
augmented_batch = pipeline(batch)
# Convert back to PIL and save
augmented_tensor = augmented_batch["images"][0]
augmented_tensor = torch.clamp(augmented_tensor, 0.0, 1.0)
augmented_pil = tensor_to_pil(augmented_tensor)
# Save sample
sample_path = output_dir / f"{config_name}_sample_{sample_idx+1:02d}.jpg"
augmented_pil.save(sample_path)
print(f" Sample {sample_idx+1}: {sample_path}")
print(f" {config_name}: {num_samples} samples generated")
except Exception as e:
print(f" {config_name} failed: {e}")
print(f"\nTesting complete! Results saved to: {output_dir}")
print(f"Generated files:")
for file_path in sorted(output_dir.glob("*.jpg")):
print(f" • {file_path.name}")
def create_comparison_html(output_dir: Path):
"""Create HTML comparison page for visual inspection."""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Augmentation Comparison</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.config-section {{ margin-bottom: 40px; border: 1px solid #ccc; padding: 20px; }}
.config-title {{ font-size: 18px; font-weight: bold; margin-bottom: 10px; }}
.samples {{ display: flex; flex-wrap: wrap; gap: 10px; }}
.sample {{ text-align: center; }}
.sample img {{ max-width: 200px; max-height: 200px; border: 1px solid #ddd; }}
.sample-label {{ font-size: 12px; margin-top: 5px; }}
</style>
</head>
<body>
<h1>Augmentation Configuration Comparison</h1>
<p>Generated on: {Path.cwd()}</p>
<div class="config-section">
<div class="config-title">Original Image</div>
<div class="samples">
<div class="sample">
<img src="00_original.jpg" alt="Original">
<div class="sample-label">Original</div>
</div>
</div>
</div>
"""
# Get all config types
config_types = set()
for jpg_file in output_dir.glob("*.jpg"):
if jpg_file.name != "00_original.jpg":
config_name = jpg_file.name.split("_sample_")[0]
config_types.add(config_name)
# Add sections for each config
for config_name in sorted(config_types):
html_content += f"""
<div class="config-section">
<div class="config-title">{config_name.replace('_', ' ').title()}</div>
<div class="samples">
"""
# Find all samples for this config
for jpg_file in sorted(output_dir.glob(f"{config_name}_sample_*.jpg")):
sample_num = jpg_file.name.split("_sample_")[1].split(".")[0]
html_content += f"""
<div class="sample">
<img src="{jpg_file.name}" alt="{config_name} Sample {sample_num}">
<div class="sample-label">Sample {sample_num}</div>
</div>
"""
html_content += """
</div>
</div>
"""
html_content += """
</body>
</html>
"""
html_path = output_dir / "comparison.html"
with open(html_path, 'w') as f:
f.write(html_content)
print(f"Comparison page created: {html_path}")
return html_path
def main():
parser = argparse.ArgumentParser(description="Test augmentation parameters with visual comparison")
parser.add_argument("--input", required=True, help="Input image file")
parser.add_argument("--output-dir", default="./augmentation_test", help="Output directory")
parser.add_argument("--samples", type=int, default=5, help="Samples per configuration")
parser.add_argument("--create-html", action="store_true", help="Create HTML comparison page")
args = parser.parse_args()
# Validate input
if not Path(args.input).exists():
print(f"Input image does not exist: {args.input}")
return
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
try:
# Run tests
test_augmentation_configs(args.input, output_dir, args.samples)
# Create HTML comparison if requested
if args.create_html:
html_path = create_comparison_html(output_dir)
print(f"\nOpen in browser: file://{html_path.absolute()}")
except Exception as e:
print(f"Testing failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Visualization Scripts¶
2D Dataset Visualizer¶
#!/usr/bin/env python3
"""
2D Dataset Visualizer - Advanced visualization with custom layouts and data filtering.
Usage:
python visualize_2d.py --dataset-path ./dataset --episode 0 --timestep-range 0 100
"""
import argparse
from pathlib import Path
from typing import Optional, Tuple
from dataphy.dataset.registry import create_dataset_loader
def visualize_episode_range(dataset_path: str, episode_idx: int = 0,
timestep_range: Optional[Tuple[int, int]] = None,
cameras: Optional[list] = None):
"""Visualize specific episode with timestep range filtering."""
print(f"Starting 2D visualization")
print(f" Dataset: {dataset_path}")
print(f" Episode: {episode_idx}")
if timestep_range:
print(f" Timestep range: {timestep_range[0]}-{timestep_range[1]}")
if cameras:
print(f" Cameras: {cameras}")
print("-" * 50)
# Load dataset
loader = create_dataset_loader(dataset_path)
episodes = loader.get_episode_ids()
if episode_idx >= len(episodes):
raise ValueError(f"Episode index {episode_idx} out of range (0-{len(episodes)-1})")
episode_id = episodes[episode_idx]
print(f"Loading episode: {episode_id}")
# Get episode data
episode_data = loader.get_episode(episode_id)
# Filter timestep range if specified
if timestep_range:
start, end = timestep_range
print(f"Filtering timesteps {start}-{end}")
# Filter all temporal data
filtered_data = {}
for key, value in episode_data.items():
if hasattr(value, '__len__') and len(value) > end:
filtered_data[key] = value[start:end+1]
else:
filtered_data[key] = value
episode_data = filtered_data
# Camera filtering
if cameras and 'observation' in episode_data and 'images' in episode_data['observation']:
images = episode_data['observation']['images']
filtered_images = {cam: images[cam] for cam in cameras if cam in images}
episode_data['observation']['images'] = filtered_images
print(f"Using cameras: {list(filtered_images.keys())}")
# Launch visualization
try:
from dataphy.visualization.lerobot import visualize_episode
print(f"Launching Rerun visualization...")
visualize_episode(episode_data, episode_id=episode_id)
print("Visualization launched successfully!")
print(" Use the Rerun viewer to explore the data:")
print(" - Timeline scrubber to navigate timesteps")
print(" - Camera panels for multi-view exploration")
print(" - 2D view for spatial understanding")
except ImportError:
print("Rerun visualization not available. Install with:")
print(" poetry install --extras rerun")
print(" poetry run dataphy-upgrade-rerun")
except Exception as e:
print(f"Visualization failed: {e}")
def main():
parser = argparse.ArgumentParser(description="Advanced 2D dataset visualization")
parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
parser.add_argument("--episode", type=int, default=0, help="Episode index to visualize")
parser.add_argument("--timestep-range", type=int, nargs=2, metavar=('START', 'END'),
help="Timestep range to visualize (e.g., --timestep-range 0 100)")
parser.add_argument("--cameras", nargs="+", help="Specific cameras to visualize")
args = parser.parse_args()
# Validate inputs
if not Path(args.dataset_path).exists():
print(f"Dataset path does not exist: {args.dataset_path}")
return
try:
visualize_episode_range(
dataset_path=args.dataset_path,
episode_idx=args.episode,
timestep_range=tuple(args.timestep_range) if args.timestep_range else None,
cameras=args.cameras
)
except Exception as e:
print(f"Visualization failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Utility Scripts¶
Dataset Converter¶
#!/usr/bin/env python3
"""
Dataset Converter - Convert between different dataset formats.
Usage:
python convert_dataset.py --input ./lerobot_dataset --output ./converted_dataset --target-format lerobot
"""
import argparse
from pathlib import Path
import shutil
from dataphy.dataset.registry import create_dataset_loader, DatasetFormat
def convert_dataset(input_path: str, output_path: str, target_format: str):
"""Convert dataset from one format to another."""
print(f"Converting dataset")
print(f" Input: {input_path}")
print(f" Output: {output_path}")
print(f" Target format: {target_format}")
print("-" * 50)
# Load source dataset
source_loader = create_dataset_loader(input_path)
source_info = source_loader.get_dataset_info()
print(f"Source dataset:")
print(f" Format: {source_info.format}")
print(f" Episodes: {source_info.num_episodes}")
print(f" Total steps: {source_info.total_steps}")
print()
# Create output directory
output_dir = Path(output_path)
output_dir.mkdir(parents=True, exist_ok=True)
# Convert based on target format
target_format_enum = DatasetFormat(target_format)
if target_format_enum == DatasetFormat.LEROBOT:
convert_to_lerobot(source_loader, output_dir)
else:
raise ValueError(f"Conversion to {target_format} not supported")
print(f"Conversion complete: {output_path}")
def convert_to_lerobot(source_loader, output_dir: Path):
"""Convert dataset to LeRobot format."""
print("Converting to LeRobot format...")
episodes = source_loader.get_episode_ids()
# Create LeRobot directory structure
(output_dir / "data").mkdir(exist_ok=True)
(output_dir / "videos").mkdir(exist_ok=True)
print(f" Creating LeRobot structure with {len(episodes)} episodes")
print(" Structure:")
print(" ├── data/")
print(" │ └── chunk-001/")
print(" │ ├── episode_000000.parquet")
print(" │ └── ...")
print(" └── videos/")
print(" └── chunk-001/")
print(" └── observation.images.webcam/")
print(" ├── episode_000000.mp4")
print(" └── ...")
# Placeholder implementation
print("LeRobot conversion not fully implemented yet")
def main():
parser = argparse.ArgumentParser(description="Convert datasets between formats")
parser.add_argument("--input", required=True, help="Input dataset path")
parser.add_argument("--output", required=True, help="Output dataset path")
parser.add_argument("--target-format", required=True,
choices=["lerobot"],
help="Target dataset format")
args = parser.parse_args()
# Validate inputs
if not Path(args.input).exists():
print(f"Input path does not exist: {args.input}")
return
if Path(args.output).exists():
response = input(f"Output directory {args.output} exists. Overwrite? (y/N): ")
if response.lower() != 'y':
print("Conversion cancelled")
return
shutil.rmtree(args.output)
try:
convert_dataset(args.input, args.output, args.target_format)
except Exception as e:
print(f"Conversion failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Performance Profiler¶
#!/usr/bin/env python3
"""
Performance Profiler - Profile dataset loading and augmentation performance.
Usage:
python profile_performance.py --dataset-path ./dataset --config config.yaml
"""
import argparse
import time
import psutil
from pathlib import Path
from typing import Dict, Any
import matplotlib.pyplot as plt
from dataphy.dataset.registry import create_dataset_loader
from dataphy.dataset.episode_augmentor import EpisodeAugmentor
from dataphy.visionpack.pipeline import build_pipeline
import torch
def profile_dataset_loading(dataset_path: str, num_episodes: int = 5) -> Dict[str, Any]:
"""Profile dataset loading performance."""
print("Profiling dataset loading...")
results = {
'loader_creation_time': 0,
'episode_load_times': [],
'memory_usage': [],
'total_episodes': 0
}
# Profile loader creation
start_time = time.time()
loader = create_dataset_loader(dataset_path)
results['loader_creation_time'] = time.time() - start_time
episodes = loader.get_episode_ids()
results['total_episodes'] = len(episodes)
# Profile episode loading
process = psutil.Process()
for i in range(min(num_episodes, len(episodes))):
episode_id = episodes[i]
# Memory before
mem_before = process.memory_info().rss / 1024 / 1024 # MB
# Load episode
start_time = time.time()
episode_data = loader.get_episode(episode_id)
load_time = time.time() - start_time
# Memory after
mem_after = process.memory_info().rss / 1024 / 1024 # MB
results['episode_load_times'].append(load_time)
results['memory_usage'].append(mem_after - mem_before)
print(f" Episode {i}: {load_time:.2f}s, Memory: +{mem_after - mem_before:.1f}MB")
return results
def profile_augmentation(dataset_path: str, config_file: str, num_episodes: int = 3) -> Dict[str, Any]:
"""Profile augmentation performance."""
print("Profiling augmentation...")
results = {
'pipeline_creation_time': 0,
'augmentation_times': [],
'gpu_available': torch.cuda.is_available(),
'device_used': 'cuda' if torch.cuda.is_available() else 'cpu'
}
# Profile pipeline creation
start_time = time.time()
pipeline = build_pipeline(config_file, device=results['device_used'])
results['pipeline_creation_time'] = time.time() - start_time
# Profile episode augmentation
loader = create_dataset_loader(dataset_path)
augmentor = EpisodeAugmentor(loader)
episodes = loader.get_episode_ids()
for i in range(min(num_episodes, len(episodes))):
episode_idx = i
start_time = time.time()
try:
# Create temporary backup directory for testing
backup_dir = Path(dataset_path) / "temp_test_backup"
backup_dir.mkdir(exist_ok=True)
# Note: This would actually modify the dataset, so we'll simulate
# augmentation_time = simulate_augmentation(pipeline, episode_data)
augmentation_time = time.time() - start_time
results['augmentation_times'].append(augmentation_time)
print(f" Episode {i}: {augmentation_time:.2f}s")
# Clean up test backup
if backup_dir.exists():
import shutil
shutil.rmtree(backup_dir)
except Exception as e:
print(f" Episode {i}: Failed - {e}")
return results
def generate_performance_report(dataset_results: Dict[str, Any],
augmentation_results: Dict[str, Any],
output_dir: Path):
"""Generate performance report with charts."""
print("Generating performance report...")
# Create report directory
output_dir.mkdir(parents=True, exist_ok=True)
# Text report
report_path = output_dir / "performance_report.txt"
with open(report_path, 'w') as f:
f.write("Dataphy SDK Performance Report\n")
f.write("=" * 40 + "\n\n")
# Dataset loading results
f.write("Dataset Loading Performance:\n")
f.write(f" Loader creation: {dataset_results['loader_creation_time']:.3f}s\n")
f.write(f" Total episodes: {dataset_results['total_episodes']}\n")
if dataset_results['episode_load_times']:
avg_load = sum(dataset_results['episode_load_times']) / len(dataset_results['episode_load_times'])
f.write(f" Average episode load time: {avg_load:.3f}s\n")
f.write(f" Min load time: {min(dataset_results['episode_load_times']):.3f}s\n")
f.write(f" Max load time: {max(dataset_results['episode_load_times']):.3f}s\n")
if dataset_results['memory_usage']:
avg_memory = sum(dataset_results['memory_usage']) / len(dataset_results['memory_usage'])
f.write(f" Average memory usage per episode: {avg_memory:.1f}MB\n")
f.write("\n")
# Augmentation results
f.write("Augmentation Performance:\n")
f.write(f" GPU available: {augmentation_results['gpu_available']}\n")
f.write(f" Device used: {augmentation_results['device_used']}\n")
f.write(f" Pipeline creation: {augmentation_results['pipeline_creation_time']:.3f}s\n")
if augmentation_results['augmentation_times']:
avg_aug = sum(augmentation_results['augmentation_times']) / len(augmentation_results['augmentation_times'])
f.write(f" Average augmentation time: {avg_aug:.3f}s\n")
f.write("\n")
print(f"Report saved: {report_path}")
# Create charts if matplotlib available
try:
# Episode loading times chart
if dataset_results['episode_load_times']:
plt.figure(figsize=(10, 6))
plt.subplot(2, 2, 1)
plt.plot(dataset_results['episode_load_times'], 'b-o')
plt.title('Episode Load Times')
plt.xlabel('Episode Index')
plt.ylabel('Time (seconds)')
plt.grid(True)
plt.subplot(2, 2, 2)
plt.bar(range(len(dataset_results['memory_usage'])), dataset_results['memory_usage'])
plt.title('Memory Usage per Episode')
plt.xlabel('Episode Index')
plt.ylabel('Memory (MB)')
plt.grid(True)
if augmentation_results['augmentation_times']:
plt.subplot(2, 2, 3)
plt.plot(augmentation_results['augmentation_times'], 'r-o')
plt.title('Augmentation Times')
plt.xlabel('Episode Index')
plt.ylabel('Time (seconds)')
plt.grid(True)
plt.tight_layout()
chart_path = output_dir / "performance_charts.png"
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
print(f"Charts saved: {chart_path}")
except ImportError:
print(" Matplotlib not available, skipping charts")
return report_path
def main():
parser = argparse.ArgumentParser(description="Profile Dataphy SDK performance")
parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
parser.add_argument("--config", help="Augmentation config file (optional)")
parser.add_argument("--episodes", type=int, default=5, help="Number of episodes to profile")
parser.add_argument("--output-dir", default="./performance_profile", help="Output directory")
args = parser.parse_args()
# Validate inputs
if not Path(args.dataset_path).exists():
print(f"Dataset path does not exist: {args.dataset_path}")
return
if args.config and not Path(args.config).exists():
print(f"Config file does not exist: {args.config}")
return
try:
print(f"Starting performance profiling")
print(f" Dataset: {args.dataset_path}")
print(f" Episodes to profile: {args.episodes}")
print("-" * 50)
# Profile dataset loading
dataset_results = profile_dataset_loading(args.dataset_path, args.episodes)
# Profile augmentation if config provided
augmentation_results = {}
if args.config:
augmentation_results = profile_augmentation(args.dataset_path, args.config, args.episodes)
# Generate report
output_dir = Path(args.output_dir)
report_path = generate_performance_report(dataset_results, augmentation_results, output_dir)
print(f"Profiling complete!")
print(f"Results: {output_dir}")
except Exception as e:
print(f"Profiling failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Running the Scripts¶
Prerequisites¶
# Install Dataphy SDK with all features
poetry install --extras "torch aws hf parquet rerun"
# Activate environment
poetry shell
Basic Usage¶
# Explore a dataset
python dataset_explorer.py --dataset-path ./my_dataset
# Sample specific episodes
python episode_sampler.py --dataset-path ./my_dataset --episode 0 --timesteps 0 50 100 --output-dir ./samples
# Test augmentation parameters
python test_augmentation.py --input sample.jpg --output-dir ./test_results --create-html
# Batch augment episodes
python batch_augment.py --dataset-path ./my_dataset --config config.yaml --episodes 0 1 2
# Profile performance
python profile_performance.py --dataset-path ./my_dataset --config config.yaml --episodes 3
All scripts include comprehensive help: