Skip to content

Augmentor

dataphy.dataset.augmentor

Dataset augmentation functionality for Dataphy SDK.

Classes

AugmentationConfig(pipeline_config: Union[str, Path, Dict[str, Any]], target: str = 'dataset', episode_ids: Optional[List[str]] = None, timestep_range: Optional[tuple[int, int]] = None, random_seed: Optional[int] = None, output_path: Optional[Union[str, Path]] = None, num_augmented_episodes: int = 1, camera_streams: Optional[List[str]] = None, preserve_original: bool = True, progress_callback: Optional[Callable[[str, int, int], None]] = None, sync_views: bool = False) dataclass

Configuration for dataset augmentation.

This dataclass defines all parameters needed to configure dataset augmentation, including pipeline settings, target scope, episode selection, and output options.

Attributes
pipeline_config: Union[str, Path, Dict[str, Any]] instance-attribute
target: str = 'dataset' class-attribute instance-attribute
episode_ids: Optional[List[str]] = None class-attribute instance-attribute
timestep_range: Optional[tuple[int, int]] = None class-attribute instance-attribute
random_seed: Optional[int] = None class-attribute instance-attribute
output_path: Optional[Union[str, Path]] = None class-attribute instance-attribute
num_augmented_episodes: int = 1 class-attribute instance-attribute
camera_streams: Optional[List[str]] = None class-attribute instance-attribute
preserve_original: bool = True class-attribute instance-attribute
progress_callback: Optional[Callable[[str, int, int], None]] = None class-attribute instance-attribute
sync_views: bool = False class-attribute instance-attribute

DatasetAugmentor(dataset_loader: BaseDatasetLoader)

Augmentor for applying transformations to datasets.

This class provides comprehensive dataset augmentation capabilities, supporting both episode-level and dataset-level augmentation with memory-efficient processing and progress tracking.

Parameters:

Name Type Description Default
dataset_loader BaseDatasetLoader

The dataset loader to use for accessing episodes

required
Source code in src/dataphy/dataset/augmentor.py
def __init__(self, dataset_loader: BaseDatasetLoader):
    """Initialize the DatasetAugmentor.

    Args:
        dataset_loader: The dataset loader to use for accessing episodes
    """
    self.dataset_loader = dataset_loader
    self.pipeline = None
Attributes
dataset_loader = dataset_loader instance-attribute
pipeline = None instance-attribute
Functions
load_pipeline(config: Union[str, Path, Dict[str, Any]])

Load augmentation pipeline from configuration.

Parameters:

Name Type Description Default
config Union[str, Path, Dict[str, Any]]

Pipeline configuration (file path, dict, or Config object)

required
Source code in src/dataphy/dataset/augmentor.py
def load_pipeline(self, config: Union[str, Path, Dict[str, Any]]):
    """Load augmentation pipeline from configuration.

    Args:
        config: Pipeline configuration (file path, dict, or Config object)
    """
    self.pipeline = build_pipeline(config)
augment_episode(episode_id: str, pipeline: Optional[Any] = None) -> Episode

Augment a single episode.

Parameters:

Name Type Description Default
episode_id str

ID of the episode to augment

required
pipeline Optional[Any]

Optional pipeline to use (uses loaded pipeline if None)

None

Returns:

Type Description
Episode

Augmented episode with transformed data

Raises:

Type Description
ValueError

If no pipeline is available

Source code in src/dataphy/dataset/augmentor.py
def augment_episode(self, episode_id: str, pipeline: Optional[Any] = None) -> Episode:
    """Augment a single episode.

    Args:
        episode_id: ID of the episode to augment
        pipeline: Optional pipeline to use (uses loaded pipeline if None)

    Returns:
        Augmented episode with transformed data

    Raises:
        ValueError: If no pipeline is available
    """
    if pipeline is None:
        pipeline = self.pipeline

    if pipeline is None:
        raise ValueError("No pipeline loaded. Call load_pipeline() first.")

    episode = self.dataset_loader.get_episode(episode_id)
    augmented_data = {}

    # Apply augmentation to episode data
    for key, value in episode.data.items():
        if key == "images" and isinstance(value, list):
            # Augment images
            augmented_images = []
            for img_path in value:
                if isinstance(img_path, str):
                    img = Image.open(img_path)
                else:
                    img = img_path

                # Convert to tensor for augmentation
                img_tensor = torch.from_numpy(np.array(img)).float() / 255.0
                if len(img_tensor.shape) == 3:
                    img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0)

                # Apply augmentation
                augmented_tensor = pipeline(img_tensor)

                # Convert back to PIL
                if augmented_tensor.dim() == 4:
                    augmented_tensor = augmented_tensor.squeeze(0)
                if augmented_tensor.shape[0] == 3:
                    augmented_tensor = augmented_tensor.permute(1, 2, 0)

                augmented_img = Image.fromarray(
                    (augmented_tensor.numpy() * 255).astype(np.uint8)
                )
                augmented_images.append(augmented_img)

            augmented_data[key] = augmented_images
        else:
            # Keep non-image data as is
            augmented_data[key] = value

    return Episode(
        id=f"{episode_id}_augmented",
        data=augmented_data,
        metadata=episode.metadata,
        length=episode.length
    )
augment_timestep(episode_id: str, timestep: int, pipeline=None) -> Dict[str, Any]

Augment a single timestep.

Source code in src/dataphy/dataset/augmentor.py
def augment_timestep(self, episode_id: str, timestep: int, pipeline=None) -> Dict[str, Any]:
    """Augment a single timestep."""
    if pipeline is None:
        pipeline = self.pipeline

    if pipeline is None:
        raise ValueError("No pipeline loaded. Call load_pipeline() first.")

    timestep_data = self.dataset_loader.get_timestep(episode_id, timestep)
    augmented_data = {}

    for key, value in timestep_data.items():
        if key == "image":
            if isinstance(value, str):
                img = Image.open(value)
            else:
                img = value

            # Convert to tensor for augmentation
            img_tensor = torch.from_numpy(np.array(img)).float() / 255.0
            if len(img_tensor.shape) == 3:
                img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0)

            # Apply augmentation
            augmented_tensor = pipeline(img_tensor)

            # Convert back to PIL
            if augmented_tensor.dim() == 4:
                augmented_tensor = augmented_tensor.squeeze(0)
            if augmented_tensor.shape[0] == 3:
                augmented_tensor = augmented_tensor.permute(1, 2, 0)

            augmented_img = Image.fromarray(
                (augmented_tensor.numpy() * 255).astype(np.uint8)
            )
            augmented_data[key] = augmented_img
        else:
            # Keep non-image data as is
            augmented_data[key] = value

    return augmented_data
augment_timesteps(episode_id: str, start: int, end: int, pipeline=None) -> List[Dict[str, Any]]

Augment a range of timesteps.

Source code in src/dataphy/dataset/augmentor.py
def augment_timesteps(self, episode_id: str, start: int, end: int, pipeline=None) -> List[Dict[str, Any]]:
    """Augment a range of timesteps."""
    return [self.augment_timestep(episode_id, i, pipeline) for i in range(start, end)]
augment_dataset(config: AugmentationConfig) -> List[Episode]

Augment dataset according to configuration.

Source code in src/dataphy/dataset/augmentor.py
def augment_dataset(self, config: AugmentationConfig) -> List[Episode]:
    """Augment dataset according to configuration."""
    if config.random_seed is not None:
        random.seed(config.random_seed)
        torch.manual_seed(config.random_seed)
        np.random.seed(config.random_seed)

    # Load pipeline if not already loaded
    if self.pipeline is None:
        self.load_pipeline(config.pipeline_config)

    if config.target == "episode":
        episode_ids = config.episode_ids or self.dataset_loader.get_episode_ids()
        return [self.augment_episode(ep_id) for ep_id in episode_ids]

    elif config.target == "timestep":
        # For timestep augmentation, we need to specify episode and range
        if not config.episode_ids or len(config.episode_ids) != 1:
            raise ValueError("For timestep augmentation, exactly one episode_id must be specified")

        episode_id = config.episode_ids[0]
        if config.timestep_range:
            start, end = config.timestep_range
            return self.augment_timesteps(episode_id, start, end)
        else:
            # Augment all timesteps in episode
            episode = self.dataset_loader.get_episode(episode_id)
            return self.augment_timesteps(episode_id, 0, episode.length)

    elif config.target == "dataset":
        # Augment entire dataset
        episode_ids = self.dataset_loader.get_episode_ids()
        return [self.augment_episode(ep_id) for ep_id in episode_ids]

    else:
        raise ValueError(f"Unknown augmentation target: {config.target}")
augment_full_dataset(config: AugmentationConfig, output_path: Union[str, Path]) -> Dict[str, Any]

Augment entire dataset creating new episodes with specified parameters.

This method creates a completely new dataset with: - All original episodes (if preserve_original=True) - Specified number of augmented versions per episode - Updated metadata reflecting the expanded dataset

Parameters:

Name Type Description Default
config AugmentationConfig

Augmentation configuration with dataset-level parameters

required
output_path Union[str, Path]

Path where the new augmented dataset will be saved

required

Returns:

Type Description
Dict[str, Any]

Dictionary with augmentation results and metadata

Source code in src/dataphy/dataset/augmentor.py
def augment_full_dataset(
    self,
    config: AugmentationConfig,
    output_path: Union[str, Path]
) -> Dict[str, Any]:
    """Augment entire dataset creating new episodes with specified parameters.

    This method creates a completely new dataset with:
    - All original episodes (if preserve_original=True)
    - Specified number of augmented versions per episode
    - Updated metadata reflecting the expanded dataset

    Args:
        config: Augmentation configuration with dataset-level parameters
        output_path: Path where the new augmented dataset will be saved

    Returns:
        Dictionary with augmentation results and metadata
    """
    output_path = Path(output_path)
    output_path.mkdir(parents=True, exist_ok=True)

    # Get dataset info and episodes
    dataset_info = self.dataset_loader.get_dataset_info()
    original_episodes = config.episode_ids or self.dataset_loader.get_episode_ids()

    # Initialize results tracking
    results = {
        "original_episode_count": len(original_episodes),
        "augmented_episodes_per_original": config.num_augmented_episodes,
        "total_new_episodes": 0,
        "augmentation_stats": defaultdict(int),
        "failed_episodes": [],
        "dataset_metadata": {}
    }

    # Set random seed for reproducible augmentation
    if config.random_seed is not None:
        random.seed(config.random_seed)
        torch.manual_seed(config.random_seed)
        np.random.seed(config.random_seed)

    # Load augmentation pipeline
    if self.pipeline is None:
        self.load_pipeline(config.pipeline_config)

    print(f"Starting dataset augmentation:")
    print(f"  Original episodes: {len(original_episodes)}")
    print(f"  Augmented versions per episode: {config.num_augmented_episodes}")
    print(f"  Output path: {output_path}")

    # Copy original dataset structure if preserving originals
    if config.preserve_original:
        self._copy_original_dataset(output_path)
        results["total_new_episodes"] += len(original_episodes)

    # Process each episode
    for episode_idx, episode_id in enumerate(original_episodes):
        if config.progress_callback:
            config.progress_callback(f"Processing episode {episode_id}", episode_idx, len(original_episodes))

        try:
            # Create multiple augmented versions
            for aug_idx in range(config.num_augmented_episodes):
                augmented_episode_id = f"{episode_id}_aug_{aug_idx + 1:03d}"
                print(f"  Creating {augmented_episode_id} ({episode_idx + 1}/{len(original_episodes)}, aug {aug_idx + 1}/{config.num_augmented_episodes})")

                # Generate episode-specific seed for consistency
                episode_seed = None
                if config.sync_views or config.random_seed is not None:
                    base_seed = config.random_seed or 42
                    episode_seed = hash(f"{episode_id}_aug_{aug_idx}") % (2**31) + base_seed

                # Augment and save episode
                success = self._augment_and_save_episode(
                    episode_id=episode_id,
                    augmented_episode_id=augmented_episode_id,
                    output_path=output_path,
                    config=config,
                    episode_seed=episode_seed
                )

                if success:
                    results["total_new_episodes"] += 1
                    results["augmentation_stats"]["successful"] += 1
                else:
                    results["augmentation_stats"]["failed"] += 1
                    results["failed_episodes"].append(augmented_episode_id)

        except Exception as e:
            print(f"  Error processing episode {episode_id}: {e}")
            results["failed_episodes"].append(episode_id)
            results["augmentation_stats"]["failed"] += 1
            continue

    # Update dataset metadata
    # Handle both DatasetInfo objects and dictionaries
    if hasattr(dataset_info, 'metadata'):
        original_metadata = dataset_info.metadata
    elif isinstance(dataset_info, dict):
        original_metadata = dataset_info
    else:
        original_metadata = {}

    updated_metadata = self._update_dataset_metadata(
        original_metadata=original_metadata,
        results=results,
        config=config
    )

    # Save updated metadata
    metadata_file = output_path / "dataset_metadata.json"
    with open(metadata_file, 'w') as f:
        json.dump(updated_metadata, f, indent=2)

    results["dataset_metadata"] = updated_metadata

    print(f"\nDataset augmentation completed:")
    print(f"  Total episodes in new dataset: {results['total_new_episodes']}")
    print(f"  Successfully augmented: {results['augmentation_stats']['successful']}")
    print(f"  Failed augmentations: {results['augmentation_stats']['failed']}")
    print(f"  Metadata saved: {metadata_file}")

    return results
save_augmented_dataset(episodes: List[Episode], output_path: Union[str, Path])

Save augmented episodes to disk.

Source code in src/dataphy/dataset/augmentor.py
def save_augmented_dataset(self, episodes: List[Episode], output_path: Union[str, Path]):
    """Save augmented episodes to disk."""
    output_path = Path(output_path)
    output_path.mkdir(parents=True, exist_ok=True)

    for episode in episodes:
        episode_dir = output_path / episode.id
        episode_dir.mkdir(exist_ok=True)

        # Save images
        if "images" in episode.data:
            for i, img in enumerate(episode.data["images"]):
                if isinstance(img, str):
                    # Copy existing image
                    shutil.copy2(img, episode_dir / f"image_{i:04d}.jpg")
                else:
                    # Save PIL image
                    img.save(episode_dir / f"image_{i:04d}.jpg")

        # Save metadata
        if episode.metadata:
            with open(episode_dir / "metadata.json", 'w') as f:
                json.dump(episode.metadata, f, indent=2)

Functions