Skip to content

Python Scripts Collection

Ready-to-run Python scripts demonstrating Dataphy SDK capabilities with comprehensive examples and use cases.

Basic Dataset Operations

Dataset Explorer Script

#!/usr/bin/env python3
"""
Dataset Explorer - Comprehensive dataset analysis and visualization.

Usage:
    python dataset_explorer.py --dataset-path ./my_dataset [--format lerobot]
"""

import argparse
from pathlib import Path
from typing import Optional

from dataphy.dataset.registry import create_dataset_loader, DatasetFormat, list_supported_formats
from dataphy.dataset.episode_augmentor import EpisodeAugmentor


def explore_dataset(dataset_path: str, format_type: Optional[str] = None):
    """Explore dataset structure, episodes, and available data."""
    print(f"Exploring dataset: {dataset_path}")
    print("-" * 50)

    # Create loader
    if format_type:
        format_enum = DatasetFormat(format_type)
        loader = create_dataset_loader(dataset_path, format_type=format_enum)
    else:
        loader = create_dataset_loader(dataset_path)

    # Get dataset info
    try:
        info = loader.get_dataset_info()
        print(f"Dataset Information:")
        print(f"   Format: {info.format}")
        print(f"   Episodes: {info.num_episodes}")
        print(f"   Total Steps: {info.total_steps}")
        print()
    except Exception as e:
        print(f"Could not load dataset info: {e}")

    # List episodes
    episodes = loader.get_episode_ids()
    print(f"Episodes ({len(episodes)} total):")
    for i, episode_id in enumerate(episodes[:10]):  # Show first 10
        print(f"   [{i}] {episode_id}")

    if len(episodes) > 10:
        print(f"   ... and {len(episodes) - 10} more episodes")
    print()

    # Analyze first episode
    if episodes:
        first_episode = episodes[0]
        print(f"First Episode Analysis: {first_episode}")

        try:
            episode_data = loader.get_episode(first_episode)
            print(f"   Keys: {list(episode_data.keys())}")

            # Check for observations
            if 'observation' in episode_data:
                obs = episode_data['observation']
                print(f"   Observations: {list(obs.keys())}")

                # Check for images
                if 'images' in obs:
                    images = obs['images']
                    print(f"   Image streams: {list(images.keys())}")

                    # Analyze first image
                    first_cam = list(images.keys())[0]
                    img_tensor = images[first_cam]
                    print(f"   {first_cam} shape: {img_tensor.shape}")

            # Check for actions
            if 'action' in episode_data:
                action = episode_data['action']
                print(f"   Action shape: {action.shape}")

        except Exception as e:
            print(f"   Could not load episode data: {e}")
        print()

    # Camera analysis for augmentation
    if episodes:
        augmentor = EpisodeAugmentor(loader)
        cameras = augmentor.get_available_cameras(episodes[0])
        print(f"🎥 Available Cameras for Augmentation:")
        for camera in cameras:
            print(f"   • {camera}")
        print()

    print("Dataset exploration complete!")


def main():
    parser = argparse.ArgumentParser(description="Explore dataset structure and contents")
    parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
    parser.add_argument("--format", choices=[f.value for f in DatasetFormat],
                       help="Dataset format (auto-detected if not specified)")

    args = parser.parse_args()

    if not Path(args.dataset_path).exists():
        print(f"Dataset path does not exist: {args.dataset_path}")
        return

    try:
        explore_dataset(args.dataset_path, args.format)
    except Exception as e:
        print(f"Error exploring dataset: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Episode Data Sampler

#!/usr/bin/env python3
"""
Episode Data Sampler - Extract and analyze specific episodes and timesteps.

Usage:
    python episode_sampler.py --dataset-path ./dataset --episode 0 --timesteps 10 20 50 100
"""

import argparse
import json
from pathlib import Path
from typing import List, Optional

import torch
import numpy as np
from PIL import Image

from dataphy.dataset.registry import create_dataset_loader


def sample_episode_data(dataset_path: str, episode_idx: int, timesteps: List[int],
                       output_dir: Optional[str] = None):
    """Sample specific timesteps from an episode and save data."""

    loader = create_dataset_loader(dataset_path)
    episodes = loader.get_episode_ids()

    if episode_idx >= len(episodes):
        raise ValueError(f"Episode index {episode_idx} out of range (0-{len(episodes)-1})")

    episode_id = episodes[episode_idx]
    print(f"Sampling episode: {episode_id} (index {episode_idx})")

    # Create output directory if specified
    if output_dir:
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        print(f"Output directory: {output_path}")

    episode_data = loader.get_episode(episode_id)
    max_timesteps = len(episode_data.get('action', []))

    results = {
        'episode_id': episode_id,
        'episode_index': episode_idx,
        'total_timesteps': max_timesteps,
        'sampled_timesteps': []
    }

    for timestep in timesteps:
        if timestep >= max_timesteps:
            print(f"Timestep {timestep} out of range (max: {max_timesteps-1})")
            continue

        print(f"Timestep {timestep}:")

        # Get timestep data
        try:
            timestep_data = loader.get_timestep(episode_id, timestep)
        except AttributeError:
            # Fallback: extract from episode data
            timestep_data = {
                'action': episode_data['action'][timestep] if 'action' in episode_data else None,
                'observation': {}
            }

            if 'observation' in episode_data:
                obs = episode_data['observation']
                for key, value in obs.items():
                    if isinstance(value, torch.Tensor) and len(value) > timestep:
                        timestep_data['observation'][key] = value[timestep]
                    elif isinstance(value, dict):
                        timestep_data['observation'][key] = {}
                        for sub_key, sub_value in value.items():
                            if isinstance(sub_value, torch.Tensor) and len(sub_value) > timestep:
                                timestep_data['observation'][key][sub_key] = sub_value[timestep]

        # Process timestep data
        timestep_info = {
            'timestep': timestep,
            'action': None,
            'state': None,
            'images': {}
        }

        # Extract action
        if 'action' in timestep_data and timestep_data['action'] is not None:
            action = timestep_data['action']
            if isinstance(action, torch.Tensor):
                action = action.numpy()
            timestep_info['action'] = action.tolist()
            print(f"   Action: {action}")

        # Extract observations
        if 'observation' in timestep_data:
            obs = timestep_data['observation']

            # Handle state data
            if 'state' in obs:
                state = obs['state']
                if isinstance(state, torch.Tensor):
                    state = state.numpy()
                timestep_info['state'] = state.tolist()
                print(f"   State shape: {state.shape}")

            # Handle images
            if 'images' in obs:
                images = obs['images']
                for cam_name, img_tensor in images.items():
                    if isinstance(img_tensor, torch.Tensor):
                        # Convert to numpy and save
                        img_np = img_tensor.permute(1, 2, 0).numpy()  # CHW -> HWP
                        img_np = (img_np * 255).astype(np.uint8)

                        print(f"   {cam_name}: {img_tensor.shape} -> {img_np.shape}")

                        # Save image if output directory specified
                        if output_dir:
                            img_filename = f"{episode_id}_t{timestep:04d}_{cam_name}.jpg"
                            img_path = output_path / img_filename
                            Image.fromarray(img_np).save(img_path)
                            timestep_info['images'][cam_name] = str(img_path)
                            print(f"   Saved: {img_path}")
                        else:
                            timestep_info['images'][cam_name] = f"shape_{img_np.shape}"

        results['sampled_timesteps'].append(timestep_info)
        print()

    # Save metadata
    if output_dir:
        metadata_path = output_path / f"{episode_id}_metadata.json"
        with open(metadata_path, 'w') as f:
            json.dump(results, f, indent=2)
        print(f"Metadata saved: {metadata_path}")

    return results


def main():
    parser = argparse.ArgumentParser(description="Sample episode data at specific timesteps")
    parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
    parser.add_argument("--episode", type=int, default=0, help="Episode index to sample")
    parser.add_argument("--timesteps", type=int, nargs="+", default=[0, 10, 50, 100],
                       help="Timesteps to sample")
    parser.add_argument("--output-dir", help="Directory to save extracted data")

    args = parser.parse_args()

    try:
        sample_episode_data(args.dataset_path, args.episode, args.timesteps, args.output_dir)
        print("Episode sampling complete!")
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Augmentation Scripts

Batch Augmentation Processor

#!/usr/bin/env python3
"""
Batch Augmentation Processor - Apply augmentations to multiple episodes with progress tracking.

Usage:
    python batch_augment.py --dataset-path ./dataset --config config.yaml --episodes 0 1 2 5
"""

import argparse
import time
from pathlib import Path
from typing import List, Optional

from dataphy.dataset.registry import create_dataset_loader
from dataphy.dataset.episode_augmentor import EpisodeAugmentor


def batch_augment_episodes(dataset_path: str, config_file: str,
                          episode_indices: Optional[List[int]] = None,
                          camera_streams: Optional[List[str]] = None,
                          preserve_original: bool = True):
    """Apply augmentations to multiple episodes with progress tracking."""

    print(f"Starting batch augmentation")
    print(f"   Dataset: {dataset_path}")
    print(f"   Config: {config_file}")
    print("-" * 60)

    # Load dataset
    loader = create_dataset_loader(dataset_path)
    episodes = loader.get_episode_ids()
    augmentor = EpisodeAugmentor(loader)

    # Determine episodes to process
    if episode_indices is None:
        target_episodes = list(range(len(episodes)))
        print(f"Processing all {len(episodes)} episodes")
    else:
        target_episodes = episode_indices
        print(f"Processing {len(target_episodes)} specified episodes")

    # Determine cameras to process
    if camera_streams is None:
        first_episode = episodes[0] if episodes else None
        if first_episode:
            available_cameras = augmentor.get_available_cameras(first_episode)
            print(f"🎥 Using all available cameras: {available_cameras}")
        else:
            available_cameras = []
    else:
        available_cameras = camera_streams
        print(f"Using specified cameras: {available_cameras}")

    print()

    # Process episodes
    start_time = time.time()
    successful = 0
    failed = 0

    for i, episode_idx in enumerate(target_episodes):
        if episode_idx >= len(episodes):
            print(f"Episode index {episode_idx} out of range, skipping")
            failed += 1
            continue

        episode_id = episodes[episode_idx]
        print(f"[{i+1}/{len(target_episodes)}] Processing {episode_id} (index {episode_idx})")

        try:
            episode_start = time.time()

            # Apply augmentation
            augmentor.augment_episode(
                episode_id=episode_idx,
                config_file=config_file,
                camera_streams=camera_streams,
                preserve_original=preserve_original
            )

            episode_time = time.time() - episode_start
            print(f"   Complete in {episode_time:.1f}s")
            successful += 1

        except Exception as e:
            print(f"   Failed: {e}")
            failed += 1

        print()

    # Summary
    total_time = time.time() - start_time
    print("Batch Augmentation Summary")
    print("-" * 40)
    print(f"   Successful: {successful}")
    print(f"   Failed: {failed}")
    print(f"   Total time: {total_time:.1f}s")
    print(f"   Avg time per episode: {total_time/len(target_episodes):.1f}s")

    if preserve_original:
        backups = augmentor.list_backups()
        print(f"   Backups created: {len(backups)}")

    return successful, failed


def main():
    parser = argparse.ArgumentParser(description="Apply augmentations to multiple episodes")
    parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
    parser.add_argument("--config", required=True, help="Augmentation config file")
    parser.add_argument("--episodes", type=int, nargs="+",
                       help="Episode indices to process (default: all)")
    parser.add_argument("--cameras", type=str, nargs="+",
                       help="Camera streams to augment (default: all)")
    parser.add_argument("--no-backup", action="store_true",
                       help="Don't create backups of original files")

    args = parser.parse_args()

    # Validate inputs
    if not Path(args.dataset_path).exists():
        print(f"Dataset path does not exist: {args.dataset_path}")
        return

    if not Path(args.config).exists():
        print(f"Config file does not exist: {args.config}")
        return

    try:
        successful, failed = batch_augment_episodes(
            dataset_path=args.dataset_path,
            config_file=args.config,
            episode_indices=args.episodes,
            camera_streams=args.cameras,
            preserve_original=not args.no_backup
        )

        if failed == 0:
            print("All episodes processed successfully!")
        else:
            print(f"{failed} episodes failed to process")

    except Exception as e:
        print(f"Batch processing failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Augmentation Parameter Tester

#!/usr/bin/env python3
"""
Augmentation Parameter Tester - Test different augmentation parameters and visualize results.

Usage:
    python test_augmentation.py --input sample.jpg --output-dir ./test_results
"""

import argparse
from pathlib import Path
import tempfile
import yaml

from dataphy.visionpack.pipeline import build_pipeline
import torch
import torchvision.transforms as T
from PIL import Image


def create_test_configs(output_dir: Path):
    """Create test configuration files with different parameters."""

    configs = {
        "minimal": {
            "version": 1,
            "pipeline": {
                "sync_views": True,
                "steps": [
                    {"name": "color_jitter", "magnitude": 0.05}
                ]
            },
            "seed": 42
        },

        "gentle": {
            "version": 1,
            "pipeline": {
                "sync_views": True,
                "steps": [
                    {"name": "random_crop_pad", "keep_ratio_min": 0.95},
                    {"name": "random_translate", "px": 3},
                    {"name": "color_jitter", "magnitude": 0.08}
                ]
            },
            "seed": 42
        },

        "balanced": {
            "version": 1,
            "pipeline": {
                "sync_views": True,
                "steps": [
                    {"name": "random_crop_pad", "keep_ratio_min": 0.88},
                    {"name": "random_translate", "px": 8},
                    {"name": "color_jitter", "magnitude": 0.15},
                    {"name": "random_conv", "kernel_variance": 0.035}
                ]
            },
            "seed": 42
        },

        "aggressive": {
            "version": 1,
            "pipeline": {
                "sync_views": True,
                "steps": [
                    {"name": "random_crop_pad", "keep_ratio_min": 0.75},
                    {"name": "random_translate", "px": 15},
                    {"name": "color_jitter", "magnitude": 0.25},
                    {"name": "random_conv", "kernel_variance": 0.06},
                    {"name": "cutout", "holes": 2, "size_range": [16, 32]}
                ]
            },
            "seed": 42
        },

        "robotics_optimized": {
            "version": 1,
            "pipeline": {
                "sync_views": True,
                "steps": [
                    {"name": "random_crop_pad", "keep_ratio_min": 0.92},
                    {"name": "random_translate", "px": 5},
                    {"name": "color_jitter", "magnitude": 0.12},
                    {"name": "random_conv", "kernel_variance": 0.025},
                    {"name": "cutout", "holes": 1, "size_range": [6, 12]}
                ]
            },
            "seed": 42
        }
    }

    config_files = {}
    for name, config in configs.items():
        config_path = output_dir / f"config_{name}.yaml"
        with open(config_path, 'w') as f:
            yaml.dump(config, f, default_flow_style=False)
        config_files[name] = config_path
        print(f"Created config: {config_path}")

    return config_files


def test_augmentation_configs(input_image: str, output_dir: Path, num_samples: int = 5):
    """Test different augmentation configurations on input image."""

    print(f"Testing augmentation configurations")
    print(f"   Input: {input_image}")
    print(f"   Output: {output_dir}")
    print(f"   Samples per config: {num_samples}")
    print("-" * 50)

    # Create test configs
    config_files = create_test_configs(output_dir)

    # Load input image
    input_pil = Image.open(input_image).convert('RGB')
    input_tensor = T.ToTensor()(input_pil).unsqueeze(0)  # Add batch dimension

    # Save original
    original_path = output_dir / "00_original.jpg"
    input_pil.save(original_path)
    print(f"Original saved: {original_path}")

    # Test each configuration
    tensor_to_pil = T.ToPILImage()

    for config_name, config_path in config_files.items():
        print(f"\nTesting config: {config_name}")

        try:
            # Build pipeline
            pipeline = build_pipeline(str(config_path), device="cpu")

            # Generate samples
            for sample_idx in range(num_samples):
                # Apply augmentation
                batch = {"images": input_tensor}
                with torch.no_grad():
                    augmented_batch = pipeline(batch)

                # Convert back to PIL and save
                augmented_tensor = augmented_batch["images"][0]
                augmented_tensor = torch.clamp(augmented_tensor, 0.0, 1.0)
                augmented_pil = tensor_to_pil(augmented_tensor)

                # Save sample
                sample_path = output_dir / f"{config_name}_sample_{sample_idx+1:02d}.jpg"
                augmented_pil.save(sample_path)
                print(f"   Sample {sample_idx+1}: {sample_path}")

            print(f"   {config_name}: {num_samples} samples generated")

        except Exception as e:
            print(f"   {config_name} failed: {e}")

    print(f"\nTesting complete! Results saved to: {output_dir}")
    print(f"Generated files:")
    for file_path in sorted(output_dir.glob("*.jpg")):
        print(f"   • {file_path.name}")


def create_comparison_html(output_dir: Path):
    """Create HTML comparison page for visual inspection."""

    html_content = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Augmentation Comparison</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; }}
        .config-section {{ margin-bottom: 40px; border: 1px solid #ccc; padding: 20px; }}
        .config-title {{ font-size: 18px; font-weight: bold; margin-bottom: 10px; }}
        .samples {{ display: flex; flex-wrap: wrap; gap: 10px; }}
        .sample {{ text-align: center; }}
        .sample img {{ max-width: 200px; max-height: 200px; border: 1px solid #ddd; }}
        .sample-label {{ font-size: 12px; margin-top: 5px; }}
    </style>
</head>
<body>
    <h1>Augmentation Configuration Comparison</h1>
    <p>Generated on: {Path.cwd()}</p>

    <div class="config-section">
        <div class="config-title">Original Image</div>
        <div class="samples">
            <div class="sample">
                <img src="00_original.jpg" alt="Original">
                <div class="sample-label">Original</div>
            </div>
        </div>
    </div>
"""

    # Get all config types
    config_types = set()
    for jpg_file in output_dir.glob("*.jpg"):
        if jpg_file.name != "00_original.jpg":
            config_name = jpg_file.name.split("_sample_")[0]
            config_types.add(config_name)

    # Add sections for each config
    for config_name in sorted(config_types):
        html_content += f"""
    <div class="config-section">
        <div class="config-title">{config_name.replace('_', ' ').title()}</div>
        <div class="samples">
"""

        # Find all samples for this config
        for jpg_file in sorted(output_dir.glob(f"{config_name}_sample_*.jpg")):
            sample_num = jpg_file.name.split("_sample_")[1].split(".")[0]
            html_content += f"""
            <div class="sample">
                <img src="{jpg_file.name}" alt="{config_name} Sample {sample_num}">
                <div class="sample-label">Sample {sample_num}</div>
            </div>
"""

        html_content += """
        </div>
    </div>
"""

    html_content += """
</body>
</html>
"""

    html_path = output_dir / "comparison.html"
    with open(html_path, 'w') as f:
        f.write(html_content)

    print(f"Comparison page created: {html_path}")
    return html_path


def main():
    parser = argparse.ArgumentParser(description="Test augmentation parameters with visual comparison")
    parser.add_argument("--input", required=True, help="Input image file")
    parser.add_argument("--output-dir", default="./augmentation_test", help="Output directory")
    parser.add_argument("--samples", type=int, default=5, help="Samples per configuration")
    parser.add_argument("--create-html", action="store_true", help="Create HTML comparison page")

    args = parser.parse_args()

    # Validate input
    if not Path(args.input).exists():
        print(f"Input image does not exist: {args.input}")
        return

    # Create output directory
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    try:
        # Run tests
        test_augmentation_configs(args.input, output_dir, args.samples)

        # Create HTML comparison if requested
        if args.create_html:
            html_path = create_comparison_html(output_dir)
            print(f"\nOpen in browser: file://{html_path.absolute()}")

    except Exception as e:
        print(f"Testing failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Visualization Scripts

2D Dataset Visualizer

#!/usr/bin/env python3
"""
2D Dataset Visualizer - Advanced visualization with custom layouts and data filtering.

Usage:
    python visualize_2d.py --dataset-path ./dataset --episode 0 --timestep-range 0 100
"""

import argparse
from pathlib import Path
from typing import Optional, Tuple

from dataphy.dataset.registry import create_dataset_loader


def visualize_episode_range(dataset_path: str, episode_idx: int = 0,
                           timestep_range: Optional[Tuple[int, int]] = None,
                           cameras: Optional[list] = None):
    """Visualize specific episode with timestep range filtering."""

    print(f"Starting 2D visualization")
    print(f"   Dataset: {dataset_path}")
    print(f"   Episode: {episode_idx}")

    if timestep_range:
        print(f"   Timestep range: {timestep_range[0]}-{timestep_range[1]}")
    if cameras:
        print(f"   Cameras: {cameras}")

    print("-" * 50)

    # Load dataset
    loader = create_dataset_loader(dataset_path)
    episodes = loader.get_episode_ids()

    if episode_idx >= len(episodes):
        raise ValueError(f"Episode index {episode_idx} out of range (0-{len(episodes)-1})")

    episode_id = episodes[episode_idx]
    print(f"Loading episode: {episode_id}")

    # Get episode data
    episode_data = loader.get_episode(episode_id)

    # Filter timestep range if specified
    if timestep_range:
        start, end = timestep_range
        print(f"Filtering timesteps {start}-{end}")

        # Filter all temporal data
        filtered_data = {}
        for key, value in episode_data.items():
            if hasattr(value, '__len__') and len(value) > end:
                filtered_data[key] = value[start:end+1]
            else:
                filtered_data[key] = value
        episode_data = filtered_data

    # Camera filtering
    if cameras and 'observation' in episode_data and 'images' in episode_data['observation']:
        images = episode_data['observation']['images']
        filtered_images = {cam: images[cam] for cam in cameras if cam in images}
        episode_data['observation']['images'] = filtered_images
        print(f"Using cameras: {list(filtered_images.keys())}")

    # Launch visualization
    try:
        from dataphy.visualization.lerobot import visualize_episode

        print(f"Launching Rerun visualization...")
        visualize_episode(episode_data, episode_id=episode_id)

        print("Visualization launched successfully!")
        print("   Use the Rerun viewer to explore the data:")
        print("   - Timeline scrubber to navigate timesteps")
        print("   - Camera panels for multi-view exploration")
        print("   - 2D view for spatial understanding")

    except ImportError:
        print("Rerun visualization not available. Install with:")
        print("   poetry install --extras rerun")
        print("   poetry run dataphy-upgrade-rerun")
    except Exception as e:
        print(f"Visualization failed: {e}")


def main():
    parser = argparse.ArgumentParser(description="Advanced 2D dataset visualization")
    parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
    parser.add_argument("--episode", type=int, default=0, help="Episode index to visualize")
    parser.add_argument("--timestep-range", type=int, nargs=2, metavar=('START', 'END'),
                       help="Timestep range to visualize (e.g., --timestep-range 0 100)")
    parser.add_argument("--cameras", nargs="+", help="Specific cameras to visualize")

    args = parser.parse_args()

    # Validate inputs
    if not Path(args.dataset_path).exists():
        print(f"Dataset path does not exist: {args.dataset_path}")
        return

    try:
        visualize_episode_range(
            dataset_path=args.dataset_path,
            episode_idx=args.episode,
            timestep_range=tuple(args.timestep_range) if args.timestep_range else None,
            cameras=args.cameras
        )

    except Exception as e:
        print(f"Visualization failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Utility Scripts

Dataset Converter

#!/usr/bin/env python3
"""
Dataset Converter - Convert between different dataset formats.

Usage:
    python convert_dataset.py --input ./lerobot_dataset --output ./converted_dataset --target-format lerobot
"""

import argparse
from pathlib import Path
import shutil

from dataphy.dataset.registry import create_dataset_loader, DatasetFormat


def convert_dataset(input_path: str, output_path: str, target_format: str):
    """Convert dataset from one format to another."""

    print(f"Converting dataset")
    print(f"   Input: {input_path}")
    print(f"   Output: {output_path}")
    print(f"   Target format: {target_format}")
    print("-" * 50)

    # Load source dataset
    source_loader = create_dataset_loader(input_path)
    source_info = source_loader.get_dataset_info()

    print(f"Source dataset:")
    print(f"   Format: {source_info.format}")
    print(f"   Episodes: {source_info.num_episodes}")
    print(f"   Total steps: {source_info.total_steps}")
    print()

    # Create output directory
    output_dir = Path(output_path)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Convert based on target format
    target_format_enum = DatasetFormat(target_format)

    if target_format_enum == DatasetFormat.LEROBOT:
        convert_to_lerobot(source_loader, output_dir)
    else:
        raise ValueError(f"Conversion to {target_format} not supported")

    print(f"Conversion complete: {output_path}")





def convert_to_lerobot(source_loader, output_dir: Path):
    """Convert dataset to LeRobot format."""
    print("Converting to LeRobot format...")

    episodes = source_loader.get_episode_ids()

    # Create LeRobot directory structure
    (output_dir / "data").mkdir(exist_ok=True)
    (output_dir / "videos").mkdir(exist_ok=True)

    print(f"   Creating LeRobot structure with {len(episodes)} episodes")
    print("   Structure:")
    print("   ├── data/")
    print("   │   └── chunk-001/")
    print("   │       ├── episode_000000.parquet")
    print("   │       └── ...")
    print("   └── videos/")
    print("       └── chunk-001/")
    print("           └── observation.images.webcam/")
    print("               ├── episode_000000.mp4")
    print("               └── ...")

    # Placeholder implementation
    print("LeRobot conversion not fully implemented yet")


def main():
    parser = argparse.ArgumentParser(description="Convert datasets between formats")
    parser.add_argument("--input", required=True, help="Input dataset path")
    parser.add_argument("--output", required=True, help="Output dataset path")
    parser.add_argument("--target-format", required=True,
                       choices=["lerobot"],
                       help="Target dataset format")

    args = parser.parse_args()

    # Validate inputs
    if not Path(args.input).exists():
        print(f"Input path does not exist: {args.input}")
        return

    if Path(args.output).exists():
        response = input(f"Output directory {args.output} exists. Overwrite? (y/N): ")
        if response.lower() != 'y':
            print("Conversion cancelled")
            return
        shutil.rmtree(args.output)

    try:
        convert_dataset(args.input, args.output, args.target_format)
    except Exception as e:
        print(f"Conversion failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Performance Profiler

#!/usr/bin/env python3
"""
Performance Profiler - Profile dataset loading and augmentation performance.

Usage:
    python profile_performance.py --dataset-path ./dataset --config config.yaml
"""

import argparse
import time
import psutil
from pathlib import Path
from typing import Dict, Any
import matplotlib.pyplot as plt

from dataphy.dataset.registry import create_dataset_loader
from dataphy.dataset.episode_augmentor import EpisodeAugmentor
from dataphy.visionpack.pipeline import build_pipeline
import torch


def profile_dataset_loading(dataset_path: str, num_episodes: int = 5) -> Dict[str, Any]:
    """Profile dataset loading performance."""

    print("Profiling dataset loading...")

    results = {
        'loader_creation_time': 0,
        'episode_load_times': [],
        'memory_usage': [],
        'total_episodes': 0
    }

    # Profile loader creation
    start_time = time.time()
    loader = create_dataset_loader(dataset_path)
    results['loader_creation_time'] = time.time() - start_time

    episodes = loader.get_episode_ids()
    results['total_episodes'] = len(episodes)

    # Profile episode loading
    process = psutil.Process()

    for i in range(min(num_episodes, len(episodes))):
        episode_id = episodes[i]

        # Memory before
        mem_before = process.memory_info().rss / 1024 / 1024  # MB

        # Load episode
        start_time = time.time()
        episode_data = loader.get_episode(episode_id)
        load_time = time.time() - start_time

        # Memory after
        mem_after = process.memory_info().rss / 1024 / 1024  # MB

        results['episode_load_times'].append(load_time)
        results['memory_usage'].append(mem_after - mem_before)

        print(f"   Episode {i}: {load_time:.2f}s, Memory: +{mem_after - mem_before:.1f}MB")

    return results


def profile_augmentation(dataset_path: str, config_file: str, num_episodes: int = 3) -> Dict[str, Any]:
    """Profile augmentation performance."""

    print("Profiling augmentation...")

    results = {
        'pipeline_creation_time': 0,
        'augmentation_times': [],
        'gpu_available': torch.cuda.is_available(),
        'device_used': 'cuda' if torch.cuda.is_available() else 'cpu'
    }

    # Profile pipeline creation
    start_time = time.time()
    pipeline = build_pipeline(config_file, device=results['device_used'])
    results['pipeline_creation_time'] = time.time() - start_time

    # Profile episode augmentation
    loader = create_dataset_loader(dataset_path)
    augmentor = EpisodeAugmentor(loader)
    episodes = loader.get_episode_ids()

    for i in range(min(num_episodes, len(episodes))):
        episode_idx = i

        start_time = time.time()
        try:
            # Create temporary backup directory for testing
            backup_dir = Path(dataset_path) / "temp_test_backup"
            backup_dir.mkdir(exist_ok=True)

            # Note: This would actually modify the dataset, so we'll simulate
            # augmentation_time = simulate_augmentation(pipeline, episode_data)
            augmentation_time = time.time() - start_time

            results['augmentation_times'].append(augmentation_time)
            print(f"   Episode {i}: {augmentation_time:.2f}s")

            # Clean up test backup
            if backup_dir.exists():
                import shutil
                shutil.rmtree(backup_dir)

        except Exception as e:
            print(f"   Episode {i}: Failed - {e}")

    return results


def generate_performance_report(dataset_results: Dict[str, Any],
                              augmentation_results: Dict[str, Any],
                              output_dir: Path):
    """Generate performance report with charts."""

    print("Generating performance report...")

    # Create report directory
    output_dir.mkdir(parents=True, exist_ok=True)

    # Text report
    report_path = output_dir / "performance_report.txt"
    with open(report_path, 'w') as f:
        f.write("Dataphy SDK Performance Report\n")
        f.write("=" * 40 + "\n\n")

        # Dataset loading results
        f.write("Dataset Loading Performance:\n")
        f.write(f"  Loader creation: {dataset_results['loader_creation_time']:.3f}s\n")
        f.write(f"  Total episodes: {dataset_results['total_episodes']}\n")

        if dataset_results['episode_load_times']:
            avg_load = sum(dataset_results['episode_load_times']) / len(dataset_results['episode_load_times'])
            f.write(f"  Average episode load time: {avg_load:.3f}s\n")
            f.write(f"  Min load time: {min(dataset_results['episode_load_times']):.3f}s\n")
            f.write(f"  Max load time: {max(dataset_results['episode_load_times']):.3f}s\n")

        if dataset_results['memory_usage']:
            avg_memory = sum(dataset_results['memory_usage']) / len(dataset_results['memory_usage'])
            f.write(f"  Average memory usage per episode: {avg_memory:.1f}MB\n")

        f.write("\n")

        # Augmentation results
        f.write("Augmentation Performance:\n")
        f.write(f"  GPU available: {augmentation_results['gpu_available']}\n")
        f.write(f"  Device used: {augmentation_results['device_used']}\n")
        f.write(f"  Pipeline creation: {augmentation_results['pipeline_creation_time']:.3f}s\n")

        if augmentation_results['augmentation_times']:
            avg_aug = sum(augmentation_results['augmentation_times']) / len(augmentation_results['augmentation_times'])
            f.write(f"  Average augmentation time: {avg_aug:.3f}s\n")

        f.write("\n")

    print(f"Report saved: {report_path}")

    # Create charts if matplotlib available
    try:
        # Episode loading times chart
        if dataset_results['episode_load_times']:
            plt.figure(figsize=(10, 6))

            plt.subplot(2, 2, 1)
            plt.plot(dataset_results['episode_load_times'], 'b-o')
            plt.title('Episode Load Times')
            plt.xlabel('Episode Index')
            plt.ylabel('Time (seconds)')
            plt.grid(True)

            plt.subplot(2, 2, 2)
            plt.bar(range(len(dataset_results['memory_usage'])), dataset_results['memory_usage'])
            plt.title('Memory Usage per Episode')
            plt.xlabel('Episode Index')
            plt.ylabel('Memory (MB)')
            plt.grid(True)

            if augmentation_results['augmentation_times']:
                plt.subplot(2, 2, 3)
                plt.plot(augmentation_results['augmentation_times'], 'r-o')
                plt.title('Augmentation Times')
                plt.xlabel('Episode Index')
                plt.ylabel('Time (seconds)')
                plt.grid(True)

            plt.tight_layout()
            chart_path = output_dir / "performance_charts.png"
            plt.savefig(chart_path, dpi=150, bbox_inches='tight')
            print(f"Charts saved: {chart_path}")

    except ImportError:
        print(" Matplotlib not available, skipping charts")

    return report_path


def main():
    parser = argparse.ArgumentParser(description="Profile Dataphy SDK performance")
    parser.add_argument("--dataset-path", required=True, help="Path to dataset directory")
    parser.add_argument("--config", help="Augmentation config file (optional)")
    parser.add_argument("--episodes", type=int, default=5, help="Number of episodes to profile")
    parser.add_argument("--output-dir", default="./performance_profile", help="Output directory")

    args = parser.parse_args()

    # Validate inputs
    if not Path(args.dataset_path).exists():
        print(f"Dataset path does not exist: {args.dataset_path}")
        return

    if args.config and not Path(args.config).exists():
        print(f"Config file does not exist: {args.config}")
        return

    try:
        print(f"Starting performance profiling")
        print(f"   Dataset: {args.dataset_path}")
        print(f"   Episodes to profile: {args.episodes}")
        print("-" * 50)

        # Profile dataset loading
        dataset_results = profile_dataset_loading(args.dataset_path, args.episodes)

        # Profile augmentation if config provided
        augmentation_results = {}
        if args.config:
            augmentation_results = profile_augmentation(args.dataset_path, args.config, args.episodes)

        # Generate report
        output_dir = Path(args.output_dir)
        report_path = generate_performance_report(dataset_results, augmentation_results, output_dir)

        print(f"Profiling complete!")
        print(f"Results: {output_dir}")

    except Exception as e:
        print(f"Profiling failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()

Running the Scripts

Prerequisites

# Install Dataphy SDK with all features
poetry install --extras "torch aws hf parquet rerun"

# Activate environment
poetry shell

Basic Usage

# Explore a dataset
python dataset_explorer.py --dataset-path ./my_dataset

# Sample specific episodes
python episode_sampler.py --dataset-path ./my_dataset --episode 0 --timesteps 0 50 100 --output-dir ./samples

# Test augmentation parameters
python test_augmentation.py --input sample.jpg --output-dir ./test_results --create-html

# Batch augment episodes
python batch_augment.py --dataset-path ./my_dataset --config config.yaml --episodes 0 1 2

# Profile performance
python profile_performance.py --dataset-path ./my_dataset --config config.yaml --episodes 3

All scripts include comprehensive help:

python script_name.py --help