LGTM
This commit is contained in:
parent
a8165249a4
commit
a4103b0896
64 changed files with 5376 additions and 3832 deletions
152
pipeline/validate_outputs.py
Normal file
152
pipeline/validate_outputs.py
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
"""Validate pipeline outputs before Make stamps are touched."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
import polars as pl
|
||||
|
||||
|
||||
def _failures_for_file(path: Path) -> list[str]:
|
||||
if not path.exists():
|
||||
return [f"{path}: missing"]
|
||||
if not path.is_file():
|
||||
return [f"{path}: not a file"]
|
||||
try:
|
||||
size = path.stat().st_size
|
||||
except OSError as exc:
|
||||
return [f"{path}: unreadable metadata: {exc}"]
|
||||
if size <= 0:
|
||||
return [f"{path}: empty file"]
|
||||
return []
|
||||
|
||||
|
||||
def _failures_for_dir(path: Path) -> list[str]:
|
||||
if not path.exists():
|
||||
return [f"{path}: missing"]
|
||||
if not path.is_dir():
|
||||
return [f"{path}: not a directory"]
|
||||
try:
|
||||
if not any(not child.name.startswith(".") for child in path.iterdir()):
|
||||
return [f"{path}: empty directory"]
|
||||
except OSError as exc:
|
||||
return [f"{path}: unreadable directory: {exc}"]
|
||||
return []
|
||||
|
||||
|
||||
def _failures_for_parquet(path: Path) -> list[str]:
|
||||
failures = _failures_for_file(path)
|
||||
if failures:
|
||||
return failures
|
||||
try:
|
||||
row_count = pl.scan_parquet(path).select(pl.len()).collect().item()
|
||||
except Exception as exc:
|
||||
return [f"{path}: unreadable parquet: {exc}"]
|
||||
if row_count <= 0:
|
||||
return [f"{path}: parquet has no rows"]
|
||||
return []
|
||||
|
||||
|
||||
def _failures_for_zip(path: Path) -> list[str]:
|
||||
failures = _failures_for_file(path)
|
||||
if failures:
|
||||
return failures
|
||||
if not zipfile.is_zipfile(path):
|
||||
return [f"{path}: unreadable zip"]
|
||||
try:
|
||||
with zipfile.ZipFile(path) as archive:
|
||||
if not archive.namelist():
|
||||
return [f"{path}: zip has no members"]
|
||||
except Exception as exc:
|
||||
return [f"{path}: unreadable zip: {exc}"]
|
||||
return []
|
||||
|
||||
|
||||
def _split_glob(spec: str) -> tuple[Path, str]:
|
||||
if "::" not in spec:
|
||||
raise argparse.ArgumentTypeError(
|
||||
f"{spec!r} must use BASE::PATTERN, for example data::**/*.csv"
|
||||
)
|
||||
base, pattern = spec.split("::", 1)
|
||||
if not base or not pattern:
|
||||
raise argparse.ArgumentTypeError(f"{spec!r} must include BASE and PATTERN")
|
||||
return Path(base), pattern
|
||||
|
||||
|
||||
def _matched_files(spec: str) -> tuple[Path, str, list[Path]]:
|
||||
base, pattern = _split_glob(spec)
|
||||
if not base.exists():
|
||||
return base, pattern, []
|
||||
return base, pattern, sorted(path for path in base.glob(pattern) if path.is_file())
|
||||
|
||||
|
||||
def _failures_for_glob(spec: str) -> list[str]:
|
||||
base, pattern, paths = _matched_files(spec)
|
||||
if not paths:
|
||||
return [f"{base}: no files matched {pattern!r}"]
|
||||
|
||||
failures: list[str] = []
|
||||
for path in paths:
|
||||
failures.extend(_failures_for_file(path))
|
||||
return failures
|
||||
|
||||
|
||||
def _failures_for_zip_glob(spec: str) -> list[str]:
|
||||
base, pattern, paths = _matched_files(spec)
|
||||
if not paths:
|
||||
return [f"{base}: no zip files matched {pattern!r}"]
|
||||
|
||||
failures: list[str] = []
|
||||
for path in paths:
|
||||
failures.extend(_failures_for_zip(path))
|
||||
return failures
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--file", action="append", default=[], type=Path)
|
||||
parser.add_argument("--dir", action="append", default=[], type=Path)
|
||||
parser.add_argument("--parquet", action="append", default=[], type=Path)
|
||||
parser.add_argument("--zip", action="append", default=[], type=Path)
|
||||
parser.add_argument(
|
||||
"--glob",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Require at least one non-empty file matching BASE::PATTERN",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zip-glob",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Require at least one readable zip matching BASE::PATTERN",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
failures: list[str] = []
|
||||
for path in args.file:
|
||||
failures.extend(_failures_for_file(path))
|
||||
for path in args.dir:
|
||||
failures.extend(_failures_for_dir(path))
|
||||
for path in args.parquet:
|
||||
failures.extend(_failures_for_parquet(path))
|
||||
for path in args.zip:
|
||||
failures.extend(_failures_for_zip(path))
|
||||
for spec in args.glob:
|
||||
failures.extend(_failures_for_glob(spec))
|
||||
for spec in args.zip_glob:
|
||||
failures.extend(_failures_for_zip_glob(spec))
|
||||
|
||||
if failures:
|
||||
print("Output validation failed:", file=sys.stderr)
|
||||
for failure in failures:
|
||||
print(f" - {failure}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue