Skip to content

S3

dataphy.sources.s3

Functions

fetch(source: Dict, output_dir: str, include: str, runner: str = 'DirectRunner')

Source code in src/dataphy/sources/s3.py
def fetch(source: Dict, output_dir: str, include: str, runner: str = "DirectRunner"):
    import fsspec

    bucket = source.get("bucket")
    keys: List[str] = source.get("keys", [])
    endpoint = source.get("endpoint") or os.getenv("AWS_ENDPOINT_URL_S3")

    fs = fsspec.filesystem("s3", client_kwargs={"endpoint_url": endpoint} if endpoint else None)
    patterns = [p.strip() for p in include.split(",") if p.strip()]

    outdir = pathlib.Path(output_dir)
    outdir.mkdir(parents=True, exist_ok=True)

    for k in keys:
        if not any(fnmatch.fnmatch(pathlib.PurePosixPath(k).name.lower(), pat.lower()) for pat in patterns):
            continue
        src = f"s3://{bucket}/{k}"
        dst = outdir / k
        dst.parent.mkdir(parents=True, exist_ok=True)
        fs.get(src, str(dst))
    return str(outdir)