Skip to content

Gcs

dataphy.sources.gcs

Functions

fetch(source: Dict, output_dir: str, include: str, runner: str = 'DirectRunner')

Source code in src/dataphy/sources/gcs.py
def fetch(source: Dict, output_dir: str, include: str, runner: str = "DirectRunner"):
    import gcsfs

    bucket = source.get("bucket")
    keys: List[str] = source.get("keys", [])
    fs = gcsfs.GCSFileSystem()
    patterns = [p.strip() for p in include.split(",") if p.strip()]

    outdir = pathlib.Path(output_dir)
    outdir.mkdir(parents=True, exist_ok=True)

    for k in keys:
        if not any(fnmatch.fnmatch(pathlib.PurePosixPath(k).name.lower(), pat.lower()) for pat in patterns):
            continue
        src = f"gs://{bucket}/{k}"
        dst = outdir / k
        dst.parent.mkdir(parents=True, exist_ok=True)
        fs.get(src, str(dst))
    return str(outdir)