Skip to content

Episode Augmentor

dataphy.dataset.episode_augmentor

Episode-specific augmentation with camera stream selection and repackaging.

Classes

EpisodeAugmentor(dataset_loader: BaseDatasetLoader)

Augment specific episodes with camera stream selection and synchronized views.

This class provides functionality to apply VisionPack augmentations to individual episodes in LeRobot datasets, with support for:

  • Episode-specific targeting (by name or index)
  • Camera stream selection for multi-camera setups
  • Synchronized augmentations across cameras (sync_views)
  • Automatic backup and restore functionality
  • In-place video repackaging in original format

The augmentation process extracts frames from videos, applies VisionPack transforms, and repackages them back into MP4 files while preserving the original dataset structure.

Parameters:

Name Type Description Default
dataset_loader BaseDatasetLoader

The dataset loader instance to use for accessing episodes

required
Example
from dataphy.dataset.registry import create_dataset_loader
from dataphy.dataset.episode_augmentor import EpisodeAugmentor

# Load dataset
loader = create_dataset_loader("./dataset")
augmentor = EpisodeAugmentor(loader)

# Augment first episode, all cameras
augmentor.augment_episode(
    episode_id=0,
    config_file="config.yaml"
)

# Augment specific camera
augmentor.augment_episode(
    episode_id="episode_000000",
    config_file="config.yaml",
    camera_streams=["observation.images.webcam"]
)
Source code in src/dataphy/dataset/episode_augmentor.py
def __init__(self, dataset_loader: BaseDatasetLoader):
    self.loader = dataset_loader
    self.dataset_path = dataset_loader.dataset_path
Attributes
loader = dataset_loader instance-attribute
dataset_path = dataset_loader.dataset_path instance-attribute
Functions
get_available_cameras(episode_id: str) -> List[str]

Get list of available camera streams for an episode.

Parameters:

Name Type Description Default
episode_id str

Episode ID to check for available cameras

required

Returns:

Type Description
List[str]

List of camera stream names (e.g., ['observation.images.webcam', 'observation.images.laptop'])

Example
cameras = augmentor.get_available_cameras("episode_000000")
print(f"Available cameras: {cameras}")
Source code in src/dataphy/dataset/episode_augmentor.py
def get_available_cameras(self, episode_id: str) -> List[str]:
    """Get list of available camera streams for an episode.

    Args:
        episode_id: Episode ID to check for available cameras

    Returns:
        List of camera stream names (e.g., ['observation.images.webcam', 'observation.images.laptop'])

    Example:
        ```python
        cameras = augmentor.get_available_cameras("episode_000000")
        print(f"Available cameras: {cameras}")
        ```
    """
    videos_dir = self.dataset_path / "videos"
    cameras = []

    for chunk_dir in videos_dir.iterdir():
        if chunk_dir.is_dir():
            for camera_dir in chunk_dir.iterdir():
                if camera_dir.is_dir():
                    video_file = camera_dir / f"{episode_id}.mp4"
                    if video_file.exists():
                        cameras.append(camera_dir.name)

    return sorted(cameras)
augment_episode(episode_id: Union[str, int], config_file: Union[str, Path], camera_streams: Optional[List[str]] = None, preserve_original: bool = True) -> str

Augment a specific episode with selected camera streams.

This method applies VisionPack augmentations to video frames from specified camera streams in an episode. The process includes:

  1. Episode validation and camera stream selection
  2. Automatic backup creation (if preserve_original=True)
  3. Frame extraction from video files
  4. VisionPack augmentation pipeline application
  5. Video repackaging with augmented frames
  6. Synchronized augmentation across cameras (if sync_views=True in config)

Parameters:

Name Type Description Default
episode_id Union[str, int]

Episode to augment. Can be: - Episode name (e.g., "episode_000000")
- Episode index (e.g., 0, 1, 2...)

required
config_file Union[str, Path]

Path to VisionPack YAML configuration file

required
camera_streams Optional[List[str]]

Camera streams to augment. Options: - None: Augment all available cameras (default) - List of camera names (e.g., ["observation.images.webcam"])

None
preserve_original bool

Whether to backup original files before augmentation

True

Returns:

Type Description
str

Path to the dataset directory containing augmented episodes

Raises:

Type Description
ValueError

If episode not found, no cameras available, or invalid camera specified

FileNotFoundError

If config file not found

Example
# Augment by episode index with specific camera
augmentor.augment_episode(
    episode_id=0,
    config_file="aug.yaml",
    camera_streams=["observation.images.webcam"]
)

# Augment by episode name, all cameras
augmentor.augment_episode(
    episode_id="episode_000000", 
    config_file="config.yaml"
)

# Synchronized multi-camera augmentation
augmentor.augment_episode(
    episode_id=5,
    config_file="sync_config.yaml",  # sync_views: true
    camera_streams=["observation.images.webcam", "observation.images.laptop"]
)
Source code in src/dataphy/dataset/episode_augmentor.py
def augment_episode(
    self,
    episode_id: Union[str, int],
    config_file: Union[str, Path],
    camera_streams: Optional[List[str]] = None,
    preserve_original: bool = True
) -> str:
    """Augment a specific episode with selected camera streams.

    This method applies VisionPack augmentations to video frames from specified camera
    streams in an episode. The process includes:

    1. Episode validation and camera stream selection
    2. Automatic backup creation (if preserve_original=True)
    3. Frame extraction from video files
    4. VisionPack augmentation pipeline application
    5. Video repackaging with augmented frames
    6. Synchronized augmentation across cameras (if sync_views=True in config)

    Args:
        episode_id: Episode to augment. Can be:
            - Episode name (e.g., "episode_000000")  
            - Episode index (e.g., 0, 1, 2...)
        config_file: Path to VisionPack YAML configuration file
        camera_streams: Camera streams to augment. Options:
            - None: Augment all available cameras (default)
            - List of camera names (e.g., ["observation.images.webcam"])
        preserve_original: Whether to backup original files before augmentation

    Returns:
        Path to the dataset directory containing augmented episodes

    Raises:
        ValueError: If episode not found, no cameras available, or invalid camera specified
        FileNotFoundError: If config file not found

    Example:
        ```python
        # Augment by episode index with specific camera
        augmentor.augment_episode(
            episode_id=0,
            config_file="aug.yaml",
            camera_streams=["observation.images.webcam"]
        )

        # Augment by episode name, all cameras
        augmentor.augment_episode(
            episode_id="episode_000000", 
            config_file="config.yaml"
        )

        # Synchronized multi-camera augmentation
        augmentor.augment_episode(
            episode_id=5,
            config_file="sync_config.yaml",  # sync_views: true
            camera_streams=["observation.images.webcam", "observation.images.laptop"]
        )
        ```
    """
    # Convert episode index to episode name if needed
    if isinstance(episode_id, int):
        episodes = self.list_episodes()
        if episode_id < 0 or episode_id >= len(episodes):
            raise ValueError(f"Episode index {episode_id} out of range. Available: 0-{len(episodes)-1}")
        episode_name = episodes[episode_id]
        print(f"Using episode index {episode_id} -> {episode_name}")
    else:
        episode_name = episode_id

    print(f"Augmenting episode: {episode_name}")

    # Get available cameras
    available_cameras = self.get_available_cameras(episode_name)
    if not available_cameras:
        raise ValueError(f"No cameras found for episode {episode_name}")

    # Determine which cameras to augment
    if camera_streams is None:
        target_cameras = available_cameras
    else:
        target_cameras = [cam for cam in camera_streams if cam in available_cameras]
        if not target_cameras:
            raise ValueError(f"None of the specified cameras {camera_streams} found. Available: {available_cameras}")

    print(f"Available cameras: {available_cameras}")
    print(f"Augmenting cameras: {target_cameras}")

    # Create backup if requested
    if preserve_original:
        self._backup_episode(episode_name)

    # Check if sync_views is enabled for synchronized augmentation across cameras
    sync_seed = None
    try:
        import yaml
        with open(config_file, "r") as f:
            config = yaml.safe_load(f)
            if config.get("pipeline", {}).get("sync_views", False):
                # Use episode-specific seed for consistent augmentation across cameras
                sync_seed = hash(episode_name) % (2**31)
                print(f"Sync views enabled - using seed {sync_seed} for all cameras")
    except Exception:
        pass  # If config parsing fails, use individual seeds

    # Augment each camera stream
    for camera_name in target_cameras:
        print(f"Processing camera: {camera_name}")
        self._augment_camera_stream(episode_name, camera_name, config_file, sync_seed=sync_seed)

    print(f"Episode {episode_name} augmentation completed")
    return str(self.dataset_path)
restore_episode(episode_id: str)

Restore episode from backup.

Source code in src/dataphy/dataset/episode_augmentor.py
def restore_episode(self, episode_id: str):
    """Restore episode from backup."""
    backup_dir = self.dataset_path / "backups" / episode_id
    if not backup_dir.exists():
        raise ValueError(f"No backup found for episode {episode_id}")

    # Restore videos
    backup_videos = backup_dir / "videos"
    if backup_videos.exists():
        for chunk_dir in backup_videos.iterdir():
            if chunk_dir.is_dir():
                for camera_dir in chunk_dir.iterdir():
                    if camera_dir.is_dir():
                        backup_video = camera_dir / f"{episode_id}.mp4"
                        if backup_video.exists():
                            target_dir = self.dataset_path / "videos" / chunk_dir.name / camera_dir.name
                            target_dir.mkdir(parents=True, exist_ok=True)
                            shutil.copy2(backup_video, target_dir / f"{episode_id}.mp4")

    # Restore parquet data
    backup_data = backup_dir / "data"
    if backup_data.exists():
        for chunk_dir in backup_data.iterdir():
            if chunk_dir.is_dir():
                backup_parquet = chunk_dir / f"{episode_id}.parquet"
                if backup_parquet.exists():
                    target_dir = self.dataset_path / "data" / chunk_dir.name
                    target_dir.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(backup_parquet, target_dir / f"{episode_id}.parquet")

    print(f"Episode {episode_id} restored from backup")
list_episodes() -> List[str]

List all available episodes.

Source code in src/dataphy/dataset/episode_augmentor.py
def list_episodes(self) -> List[str]:
    """List all available episodes."""
    return self.loader.get_episode_ids()
list_backups() -> List[str]

List episodes that have backups.

Source code in src/dataphy/dataset/episode_augmentor.py
def list_backups(self) -> List[str]:
    """List episodes that have backups."""
    backup_dir = self.dataset_path / "backups"
    if not backup_dir.exists():
        return []

    return [d.name for d in backup_dir.iterdir() if d.is_dir()]

Functions