Skip to content

Lerobot

dataphy.sources.lerobot

Classes

RepositoryNotFoundError

Bases: Exception

Raised when a repository is not found or not accessible.

RevisionNotFoundError

Bases: Exception

Raised when a specific revision is not found in a repository.

NetworkError

Bases: Exception

Raised when there's a network connectivity issue.

Functions

fetch(source: Dict, output_dir: str, runner: str = 'DirectRunner') -> str

Fetch LeRobot datasets from Hugging Face Hub.

Parameters:

Name Type Description Default
source Dict

Dict containing: - repo_id: Hugging Face repository ID (e.g., "lerobot/lerobot-100k" or "datasets/lerobot/libero_goal_image") - split: Dataset split (e.g., "train", "val", "test") - revision: Git revision (default: "main") - filter_pattern: Optional pattern to filter files - repo_type: Repository type ("model" or "dataset", auto-detected if not provided)

required
output_dir str

Local directory to save files

required
runner str

Apache Beam runner (not used for HF)

'DirectRunner'

Returns:

Type Description
str

Path to the downloaded dataset directory

Raises:

Type Description
ValueError

If repo_id is not provided

RepositoryNotFoundError

If the repository doesn't exist or is not accessible

RevisionNotFoundError

If the specified revision doesn't exist

NetworkError

If there's a network connectivity issue

Source code in src/dataphy/sources/lerobot.py
def fetch(source: Dict, output_dir: str, runner: str = "DirectRunner") -> str:
    """
    Fetch LeRobot datasets from Hugging Face Hub.

    Args:
        source: Dict containing:
            - repo_id: Hugging Face repository ID (e.g., "lerobot/lerobot-100k" or "datasets/lerobot/libero_goal_image")
            - split: Dataset split (e.g., "train", "val", "test")
            - revision: Git revision (default: "main")
            - filter_pattern: Optional pattern to filter files
            - repo_type: Repository type ("model" or "dataset", auto-detected if not provided)
        output_dir: Local directory to save files
        runner: Apache Beam runner (not used for HF)

    Returns:
        Path to the downloaded dataset directory

    Raises:
        ValueError: If repo_id is not provided
        RepositoryNotFoundError: If the repository doesn't exist or is not accessible
        RevisionNotFoundError: If the specified revision doesn't exist
        NetworkError: If there's a network connectivity issue
    """
    repo_id = source.get("repo_id")
    split = source.get("split", "train")
    revision = source.get("revision", "main")
    filter_pattern = source.get("filter_pattern")
    repo_type = source.get("repo_type")

    if not repo_id:
        raise ValueError("repo_id is required for LeRobot dataset fetching")

    # Auto-detect repository type if not provided
    if not repo_type:
        if repo_id.startswith("datasets/"):
            repo_type = "dataset"
        else:
            repo_type = "model"  # Default to model type

    local_dir = pathlib.Path(output_dir)
    local_dir.mkdir(parents=True, exist_ok=True)

    # Verify repository access before attempting download
    print(f"Verifying access to repository {repo_id} (type: {repo_type})...")
    try:
        # Try LeRobot approach first (which works for visualization)
        try:
            from lerobot.datasets.lerobot_dataset import LeRobotDataset
            test_dataset = LeRobotDataset(repo_id, tolerance_s=30.0)  # Use relaxed tolerance
            print(f"Repository accessible via LeRobot library. Found {len(test_dataset.episode_data_index['from'])} episodes.")
        except ImportError:
            # Fall back to Hugging Face Hub API
            files = list_repo_files(repo_id, repo_type=repo_type)
            print(f"Repository accessible via Hugging Face Hub. Found {len(files)} files.")
        except Exception as lerobot_error:
            # If LeRobot fails due to sync issues, that's okay - the repository exists
            if "timestamps unexpectedly violate the tolerance" in str(lerobot_error):
                print(f"Repository accessible via LeRobot library (sync issues detected, but repository exists).")
                print(f"Note: This repository has timestamp synchronization issues, but can be used with relaxed tolerance.")
            else:
                # For other errors, try Hugging Face Hub
                print(f"LeRobot verification failed: {lerobot_error}")
                files = list_repo_files(repo_id, repo_type=repo_type)
                print(f"Repository accessible via Hugging Face Hub. Found {len(files)} files.")
    except Exception as e:
        error_msg = str(e).lower()
        if "not found" in error_msg or "doesn't exist" in error_msg:
            raise RepositoryNotFoundError(f"Repository '{repo_id}' not found or not accessible. Please check the repository ID and ensure it exists on Hugging Face Hub.")
        elif "permission" in error_msg or "access" in error_msg:
            raise PermissionError(f"Permission denied accessing repository '{repo_id}'. The repository might be private or you don't have access to it.")
        else:
            raise Exception(f"Failed to verify repository access for '{repo_id}': {str(e)}")

    # Download the entire repository snapshot
    print(f"Downloading LeRobot dataset from {repo_id}...")

    try:
        # Try using LeRobot's approach first (which works for visualization)
        print(f"Attempting to access repository using LeRobot library...")
        try:
            from lerobot.datasets.lerobot_dataset import LeRobotDataset
            # Test if we can access the repository through LeRobot with adaptive tolerance
            tolerance_s = 30.0
            max_tolerance = 200.0  # Maximum tolerance to try
            test_dataset = None

            while tolerance_s <= max_tolerance:
                try:
                    test_dataset = LeRobotDataset(repo_id, tolerance_s=tolerance_s)
                    print(f"Repository accessible via LeRobot library with tolerance_s={tolerance_s}. Found {len(test_dataset.episode_data_index['from'])} episodes.")
                    break
                except Exception as e:
                    if "timestamps unexpectedly violate the tolerance" in str(e):
                        print(f"Tolerance {tolerance_s}s failed, trying {tolerance_s + 20}s...")
                        tolerance_s += 20.0
                    else:
                        # If it's not a tolerance issue, re-raise the exception
                        raise e

            if test_dataset is None:
                raise Exception(f"Could not access repository with tolerance up to {max_tolerance}s")

            # Since LeRobot can access it but Hugging Face Hub API can't, 
            # we'll use LeRobot's internal download mechanism
            print(f"Using LeRobot's internal download mechanism...")

            # Create a temporary dataset to trigger the download using the same tolerance
            temp_dataset = LeRobotDataset(repo_id, tolerance_s=tolerance_s)

            # The dataset should now be cached locally
            # We need to find where LeRobot stores its cache
            import os
            cache_dir = os.path.expanduser("~/.cache/huggingface")

            # Look for the downloaded files in the cache
            import glob
            # Try multiple cache patterns
            cache_patterns = [
                os.path.join(cache_dir, "lerobot", repo_id),  # Primary LeRobot cache location
                os.path.join(cache_dir, "**", repo_id.replace("/", "--")),
                os.path.join(cache_dir, "datasets--" + repo_id.replace("/", "--"))
            ]

            cache_dirs = []
            for pattern in cache_patterns:
                found_dirs = glob.glob(pattern, recursive=True)
                cache_dirs.extend(found_dirs)
                if found_dirs:
                    print(f"Found cache directories: {found_dirs}")

            if cache_dirs:
                # Copy from cache to our output directory
                import shutil

                # Prioritize the LeRobot cache directory if available
                source_dir = None
                for cache_dir_path in cache_dirs:
                    if "lerobot/lerobot" in cache_dir_path:
                        source_dir = cache_dir_path
                        break

                if not source_dir:
                    source_dir = cache_dirs[0]  # Fallback to first found directory

                print(f"Found cached files at: {source_dir}")

                # Copy all files from cache to output directory
                for item in os.listdir(source_dir):
                    source_item = os.path.join(source_dir, item)
                    dest_item = os.path.join(local_dir, item)
                    if os.path.isdir(source_item):
                        shutil.copytree(source_item, dest_item, dirs_exist_ok=True)
                    else:
                        shutil.copy2(source_item, dest_item)

                print(f"Copied files from LeRobot cache to: {local_dir}")
            else:
                raise Exception("Could not find cached files from LeRobot download")

        except ImportError:
            print("LeRobot library not available, using Hugging Face Hub directly...")
            snapshot_download(
                repo_id=repo_id, 
                revision=revision, 
                local_dir=str(local_dir), 
                local_dir_use_symlinks=False
            )
        except Exception as lerobot_error:
            print(f"LeRobot access failed: {lerobot_error}")
            # Fall back to direct Hugging Face Hub access
            snapshot_download(
                repo_id=repo_id, 
                revision=revision, 
                local_dir=str(local_dir), 
                local_dir_use_symlinks=False
            )
    except Exception as e:
        error_msg = str(e).lower()
        original_error = str(e)

        # Add debugging information
        print(f"Debug: Original error: {original_error}")
        print(f"Debug: Error type: {type(e).__name__}")

        # Handle specific error cases
        if "not found" in error_msg or "doesn't exist" in error_msg:
            raise RepositoryNotFoundError(f"Repository '{repo_id}' not found or not accessible. Please check the repository ID and ensure it exists on Hugging Face Hub.")
        elif "revision" in error_msg and ("not found" in error_msg or "doesn't exist" in error_msg):
            raise RevisionNotFoundError(f"Revision '{revision}' not found in repository '{repo_id}'. Please check the revision name.")
        elif "network" in error_msg or "connection" in error_msg or "timeout" in error_msg:
            raise NetworkError(f"Network error while accessing repository '{repo_id}'. Please check your internet connection and try again.")
        elif "permission" in error_msg or "access" in error_msg:
            raise PermissionError(f"Permission denied accessing repository '{repo_id}'. The repository might be private or you don't have access to it.")
        elif "rate limit" in error_msg or "too many requests" in error_msg:
            raise Exception(f"Rate limit exceeded while accessing repository '{repo_id}'. Please wait a moment and try again.")
        elif "ssl" in error_msg or "certificate" in error_msg:
            raise NetworkError(f"SSL/TLS error while accessing repository '{repo_id}'. This might be a network configuration issue.")
        else:
            # Re-raise the original exception with a more user-friendly message
            raise Exception(f"Failed to download dataset from '{repo_id}': {original_error}")



    print(f"LeRobot dataset downloaded to: {local_dir}")
    return str(local_dir)

get_dataset_info(repo_id: str, dataset_name: str) -> Dict

Get information about a LeRobot dataset repository.

Parameters:

Name Type Description Default
repo_id str

Hugging Face repository ID

required
dataset_name str

Name of the dataset (used for backward compatibility)

required

Returns:

Type Description
Dict

Dictionary with dataset information or empty dict if repository not accessible

Raises:

Type Description
RepositoryNotFoundError

If the repository doesn't exist or is not accessible

NetworkError

If there's a network connectivity issue

Source code in src/dataphy/sources/lerobot.py
def get_dataset_info(repo_id: str, dataset_name: str) -> Dict:
    """
    Get information about a LeRobot dataset repository.

    Args:
        repo_id: Hugging Face repository ID
        dataset_name: Name of the dataset (used for backward compatibility)

    Returns:
        Dictionary with dataset information or empty dict if repository not accessible

    Raises:
        RepositoryNotFoundError: If the repository doesn't exist or is not accessible
        NetworkError: If there's a network connectivity issue
    """
    try:
        # Try LeRobot approach first (which works for visualization)
        try:
            from lerobot.datasets.lerobot_dataset import LeRobotDataset
            # Test if we can access the repository through LeRobot with adaptive tolerance
            tolerance_s = 30.0
            max_tolerance = 200.0  # Maximum tolerance to try
            test_dataset = None

            while tolerance_s <= max_tolerance:
                try:
                    test_dataset = LeRobotDataset(repo_id, tolerance_s=tolerance_s)
                    print(f"Repository accessible via LeRobot library with tolerance_s={tolerance_s}. Found {len(test_dataset.episode_data_index['from'])} episodes.")
                    break
                except Exception as e:
                    if "timestamps unexpectedly violate the tolerance" in str(e):
                        print(f"Tolerance {tolerance_s}s failed, trying {tolerance_s + 20}s...")
                        tolerance_s += 20.0
                    else:
                        # If it's not a tolerance issue, re-raise the exception
                        raise e

            if test_dataset is None:
                raise Exception(f"Could not access repository with tolerance up to {max_tolerance}s")

            # Get detailed information from the LeRobot dataset
            detailed_info = {}
            try:
                # Extract information from the dataset
                detailed_info = {
                    "robot_type": getattr(test_dataset, 'robot_type', 'unknown'),
                    "total_episodes": len(test_dataset.episode_data_index['from']),
                    "total_frames": sum(len(test_dataset.episode_data_index['from'][i]) for i in range(len(test_dataset.episode_data_index['from']))),
                    "total_videos": len(test_dataset.episode_data_index['from']) * 2,  # Assuming 2 cameras per episode
                    "fps": getattr(test_dataset, 'fps', 30),
                    "codebase_version": getattr(test_dataset, 'codebase_version', 'unknown'),
                    "available_features": list(test_dataset.episode_data_index.keys()) if hasattr(test_dataset, 'episode_data_index') else []
                }
            except Exception as e:
                # If we can't get detailed info, continue with basic info
                print(f"Could not extract detailed info: {e}")
                pass

            return {
                "dataset_name": dataset_name,
                "available_splits": ["chunked"],  # LeRobot datasets are typically chunked
                "file_types": ["annotations", "parquet", "videos"],
                "total_files": len(test_dataset.episode_data_index['from']) * 3,  # Rough estimate
                "detailed_info": detailed_info
            }

        except ImportError:
            # Fall back to Hugging Face Hub API
            files = list_repo_files(repo_id)

            splits = set()
            file_types = set()

            for file_path in files:
                parts = file_path.split("/")

                # Look for splits in common patterns
                if len(parts) >= 2:
                    # Check if second part looks like a split (train, val, test)
                    if parts[1] in ["train", "val", "test", "validation"]:
                        splits.add(parts[1])
                    # Also check for data/chunk-XXX pattern which might indicate splits
                    elif parts[0] == "data" and len(parts) >= 2 and "chunk" in parts[1]:
                        splits.add("chunked")

                # Categorize file types
                if file_path.endswith((".jpg", ".jpeg", ".png")):
                    file_types.add("images")
                elif file_path.endswith((".json", ".jsonl")):
                    file_types.add("annotations")
                elif file_path.endswith((".mp4", ".avi")):
                    file_types.add("videos")
                elif file_path.endswith((".parquet")):
                    file_types.add("parquet")

            # Try to download and read the info.json file for detailed information
            detailed_info = {}
            try:
                import tempfile
                import json
                from huggingface_hub import hf_hub_download

                # Download info.json
                info_path = hf_hub_download(
                    repo_id=repo_id,
                    filename="meta/info.json",
                    cache_dir=tempfile.gettempdir()
                )

                with open(info_path, 'r') as f:
                    detailed_info = json.load(f)

            except Exception as e:
                # If we can't get detailed info, continue with basic info
                pass

            return {
                "dataset_name": dataset_name,
                "available_splits": sorted(list(splits)) if splits else ["single"],
                "file_types": sorted(list(file_types)),
                "total_files": len(files),
                "detailed_info": detailed_info
            }

    except Exception as e:
        error_msg = str(e).lower()

        # Handle specific error cases
        if "not found" in error_msg or "doesn't exist" in error_msg:
            raise RepositoryNotFoundError(f"Repository '{repo_id}' not found or not accessible. Please check the repository ID and ensure it exists on Hugging Face Hub.")
        elif "network" in error_msg or "connection" in error_msg or "timeout" in error_msg:
            raise NetworkError(f"Network error while accessing repository '{repo_id}'. Please check your internet connection and try again.")
        elif "permission" in error_msg or "access" in error_msg:
            raise PermissionError(f"Permission denied accessing repository '{repo_id}'. The repository might be private or you don't have access to it.")
        else:
            # For other errors, return empty dict to maintain backward compatibility
            print(f"Error getting dataset info for {repo_id}: {e}")
            return {}