Skip to content

Lerobot

dataphy.dataset.lerobot

LeRobot dataset loader for Dataphy SDK.

Classes

LeRobotDatasetLoader(dataset_path: Union[str, Path], **kwargs: Any)

Bases: BaseDatasetLoader

Loader for LeRobot datasets from Hugging Face Hub.

This loader supports LeRobot dataset formats including both local files and Hugging Face Hub datasets. It handles various dataset structures including chunked parquet files, video directories, and metadata files.

Parameters:

Name Type Description Default
dataset_path Union[str, Path]

Path to the LeRobot dataset directory

required
**kwargs Any

Additional arguments passed to base class

{}
Source code in src/dataphy/dataset/lerobot.py
def __init__(self, dataset_path: Union[str, Path], **kwargs: Any):
    """Initialize the LeRobot dataset loader.

    Args:
        dataset_path: Path to the LeRobot dataset directory
        **kwargs: Additional arguments passed to base class
    """
    super().__init__(dataset_path, **kwargs)
    self.episode_files = []
    self.episode_info = {}
    self._load_episode_structure()
Attributes
episode_files = [] instance-attribute
episode_info = {} instance-attribute
dataset_path = Path(dataset_path) instance-attribute
kwargs = kwargs instance-attribute
Functions
load_info() -> DatasetInfo

Load dataset information.

Returns:

Type Description
DatasetInfo

DatasetInfo object containing metadata about the dataset

Source code in src/dataphy/dataset/lerobot.py
def load_info(self) -> DatasetInfo:
    """Load dataset information.

    Returns:
        DatasetInfo object containing metadata about the dataset
    """
    episode_lengths = [self.episode_info[ep.stem] for ep in self.episode_files]
    total_timesteps = sum(episode_lengths)

    # Try to load metadata
    metadata = {}
    metadata_file = self.dataset_path / "metadata.json"
    if metadata_file.exists():
        try:
            with open(metadata_file, 'r') as f:
                metadata = json.load(f)
        except:
            pass

    return DatasetInfo(
        name=self.dataset_path.name,
        format=DatasetFormat.LEROBOT,
        num_episodes=len(self.episode_files),
        total_timesteps=total_timesteps,
        episode_lengths=episode_lengths,
        metadata=metadata
    )
get_episode(episode_id: str) -> Episode

Get a specific episode by ID.

Source code in src/dataphy/dataset/lerobot.py
def get_episode(self, episode_id: str) -> Episode:
    """Get a specific episode by ID."""
    if episode_id not in self.episode_info:
        raise ValueError(f"Episode {episode_id} not found")

    # Find the episode path from the episode_files list
    episode_path = None
    for ep_file in self.episode_files:
        if ep_file.stem == episode_id:
            episode_path = ep_file
            break

    if episode_path is None:
        raise ValueError(f"Episode file for {episode_id} not found")

    # Load episode data
    data = self._load_episode_data(episode_path)
    metadata = self._load_episode_metadata(episode_path)

    return Episode(
        id=episode_id,
        data=data,
        metadata=metadata,
        length=self.episode_info[episode_id]
    )
list_episodes() -> List[str]

List all available episode IDs.

Source code in src/dataphy/dataset/lerobot.py
def list_episodes(self) -> List[str]:
    """List all available episode IDs."""
    return list(self.episode_info.keys())
get_dataset_info() -> Dict[str, Any]

Get dataset information as a dictionary (for CLI compatibility).

Source code in src/dataphy/dataset/lerobot.py
def get_dataset_info(self) -> Dict[str, Any]:
    """Get dataset information as a dictionary (for CLI compatibility)."""
    info = self.load_info()
    return {
        'format': info.format.value,
        'total_episodes': info.num_episodes,
        'total_timesteps': info.total_timesteps,
        'episode_lengths': info.episode_lengths,
        'features': list(info.metadata.keys()) if info.metadata else []
    }
get_episodes(episode_ids: Optional[List[str]] = None) -> List[Episode]

Get multiple episodes.

Source code in src/dataphy/dataset/lerobot.py
def get_episodes(self, episode_ids: Optional[List[str]] = None) -> List[Episode]:
    """Get multiple episodes."""
    if episode_ids is None:
        episode_ids = list(self.episode_info.keys())

    return [self.get_episode(ep_id) for ep_id in episode_ids]
get_timestep(episode_id: str, timestep: int) -> Dict[str, Any]

Get a specific timestep from an episode.

Source code in src/dataphy/dataset/lerobot.py
def get_timestep(self, episode_id: str, timestep: int) -> Dict[str, Any]:
    """Get a specific timestep from an episode."""
    episode = self.get_episode(episode_id)

    if timestep >= episode.length:
        raise ValueError(f"Timestep {timestep} out of range for episode {episode_id}")

    # Extract timestep data
    timestep_data = {}

    # Handle Parquet data
    if 'parquet_data' in episode.data:
        try:
            import pandas as pd
            df = episode.data['parquet_data']
            if timestep < len(df):
                row = df.iloc[timestep]
                for col in df.columns:
                    timestep_data[col] = row[col]
        except Exception as e:
            timestep_data['error'] = str(e)
        return timestep_data
    elif 'parquet_file' in episode.data:
        timestep_data['parquet_file'] = episode.data['parquet_file']
        timestep_data['timestep'] = timestep
        timestep_data['error'] = "pandas not installed - using placeholder data"
        return timestep_data

    # Handle images
    if 'images' in episode.data:
        if timestep < len(episode.data['images']):
            timestep_data['image'] = episode.data['images'][timestep]

    # Handle other data types
    for key, value in episode.data.items():
        if key in ['images', 'parquet_file', 'parquet_data']:
            continue

        if isinstance(value, list) and timestep < len(value):
            timestep_data[key] = value[timestep]
        elif isinstance(value, np.ndarray):
            if len(value.shape) > 0 and timestep < value.shape[0]:
                timestep_data[key] = value[timestep]
        else:
            # For non-sequential data, include as is
            timestep_data[key] = value

    return timestep_data
get_timesteps(episode_id: str, start: int, end: int) -> List[Dict[str, Any]]

Get a range of timesteps from an episode.

Source code in src/dataphy/dataset/lerobot.py
def get_timesteps(self, episode_id: str, start: int, end: int) -> List[Dict[str, Any]]:
    """Get a range of timesteps from an episode."""
    return [self.get_timestep(episode_id, i) for i in range(start, end)]
get_episode_ids() -> List[str]

Get all episode IDs.

Source code in src/dataphy/dataset/lerobot.py
def get_episode_ids(self) -> List[str]:
    """Get all episode IDs."""
    return list(self.episode_info.keys())
get_random_episode() -> Episode

Get a random episode.

Source code in src/dataphy/dataset/base.py
def get_random_episode(self) -> Episode:
    """Get a random episode."""
    import random
    episode_ids = self.get_episode_ids()
    episode_id = random.choice(episode_ids)
    return self.get_episode(episode_id)
get_random_timestep() -> Tuple[str, int, Dict[str, Any]]

Get a random timestep from a random episode.

Source code in src/dataphy/dataset/base.py
def get_random_timestep(self) -> Tuple[str, int, Dict[str, Any]]:
    """Get a random timestep from a random episode."""
    episode = self.get_random_episode()
    import random
    timestep = random.randint(0, episode.length - 1)
    return episode.id, timestep, self.get_timestep(episode.id, timestep)