"""Validate pipeline outputs before Make stamps are touched.""" from __future__ import annotations import argparse import sys import zipfile from pathlib import Path import polars as pl def _failures_for_file(path: Path) -> list[str]: if not path.exists(): return [f"{path}: missing"] if not path.is_file(): return [f"{path}: not a file"] try: size = path.stat().st_size except OSError as exc: return [f"{path}: unreadable metadata: {exc}"] if size <= 0: return [f"{path}: empty file"] return [] def _failures_for_dir(path: Path) -> list[str]: if not path.exists(): return [f"{path}: missing"] if not path.is_dir(): return [f"{path}: not a directory"] try: if not any(not child.name.startswith(".") for child in path.iterdir()): return [f"{path}: empty directory"] except OSError as exc: return [f"{path}: unreadable directory: {exc}"] return [] def _failures_for_parquet(path: Path) -> list[str]: failures = _failures_for_file(path) if failures: return failures try: row_count = pl.scan_parquet(path).select(pl.len()).collect().item() except Exception as exc: return [f"{path}: unreadable parquet: {exc}"] if row_count <= 0: return [f"{path}: parquet has no rows"] return [] def _failures_for_zip(path: Path) -> list[str]: failures = _failures_for_file(path) if failures: return failures if not zipfile.is_zipfile(path): return [f"{path}: unreadable zip"] try: with zipfile.ZipFile(path) as archive: if not archive.namelist(): return [f"{path}: zip has no members"] except Exception as exc: return [f"{path}: unreadable zip: {exc}"] return [] def _split_glob(spec: str) -> tuple[Path, str]: if "::" not in spec: raise argparse.ArgumentTypeError( f"{spec!r} must use BASE::PATTERN, for example data::**/*.csv" ) base, pattern = spec.split("::", 1) if not base or not pattern: raise argparse.ArgumentTypeError(f"{spec!r} must include BASE and PATTERN") return Path(base), pattern def _matched_files(spec: str) -> tuple[Path, str, list[Path]]: base, pattern = _split_glob(spec) if not base.exists(): return base, pattern, [] return base, pattern, sorted(path for path in base.glob(pattern) if path.is_file()) def _failures_for_glob(spec: str) -> list[str]: base, pattern, paths = _matched_files(spec) if not paths: return [f"{base}: no files matched {pattern!r}"] failures: list[str] = [] for path in paths: failures.extend(_failures_for_file(path)) return failures def _failures_for_zip_glob(spec: str) -> list[str]: base, pattern, paths = _matched_files(spec) if not paths: return [f"{base}: no zip files matched {pattern!r}"] failures: list[str] = [] for path in paths: failures.extend(_failures_for_zip(path)) return failures def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--file", action="append", default=[], type=Path) parser.add_argument("--dir", action="append", default=[], type=Path) parser.add_argument("--parquet", action="append", default=[], type=Path) parser.add_argument("--zip", action="append", default=[], type=Path) parser.add_argument( "--glob", action="append", default=[], help="Require at least one non-empty file matching BASE::PATTERN", ) parser.add_argument( "--zip-glob", action="append", default=[], help="Require at least one readable zip matching BASE::PATTERN", ) args = parser.parse_args() failures: list[str] = [] for path in args.file: failures.extend(_failures_for_file(path)) for path in args.dir: failures.extend(_failures_for_dir(path)) for path in args.parquet: failures.extend(_failures_for_parquet(path)) for path in args.zip: failures.extend(_failures_for_zip(path)) for spec in args.glob: failures.extend(_failures_for_glob(spec)) for spec in args.zip_glob: failures.extend(_failures_for_zip_glob(spec)) if failures: print("Output validation failed:", file=sys.stderr) for failure in failures: print(f" - {failure}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())