"""Validate pipeline outputs before Make stamps are touched.""" from __future__ import annotations import argparse import json import sys import zipfile from pathlib import Path import polars as pl def _failures_for_file(path: Path) -> list[str]: if not path.exists(): return [f"{path}: missing"] if not path.is_file(): return [f"{path}: not a file"] try: size = path.stat().st_size except OSError as exc: return [f"{path}: unreadable metadata: {exc}"] if size <= 0: return [f"{path}: empty file"] return [] def _failures_for_dir(path: Path) -> list[str]: if not path.exists(): return [f"{path}: missing"] if not path.is_dir(): return [f"{path}: not a directory"] try: if not any(not child.name.startswith(".") for child in path.iterdir()): return [f"{path}: empty directory"] except OSError as exc: return [f"{path}: unreadable directory: {exc}"] return [] def _failures_for_parquet(path: Path) -> list[str]: failures = _failures_for_file(path) if failures: return failures try: row_count = pl.scan_parquet(path).select(pl.len()).collect().item() except Exception as exc: return [f"{path}: unreadable parquet: {exc}"] if row_count <= 0: return [f"{path}: parquet has no rows"] return [] def _failures_for_zip(path: Path) -> list[str]: failures = _failures_for_file(path) if failures: return failures if not zipfile.is_zipfile(path): return [f"{path}: unreadable zip"] try: with zipfile.ZipFile(path) as archive: if not archive.namelist(): return [f"{path}: zip has no members"] except Exception as exc: return [f"{path}: unreadable zip: {exc}"] return [] def _split_glob(spec: str) -> tuple[Path, str]: if "::" not in spec: raise argparse.ArgumentTypeError( f"{spec!r} must use BASE::PATTERN, for example data::**/*.csv" ) base, pattern = spec.split("::", 1) if not base or not pattern: raise argparse.ArgumentTypeError(f"{spec!r} must include BASE and PATTERN") return Path(base), pattern def _split_pair(spec: str, label: str) -> tuple[Path, Path]: if "::" not in spec: raise argparse.ArgumentTypeError( f"{spec!r} must use LEFT::RIGHT for {label}" ) left, right = spec.split("::", 1) if not left or not right: raise argparse.ArgumentTypeError(f"{spec!r} must include both paths") return Path(left), Path(right) def _canonical_postcode(value: object) -> str: compact = "".join(str(value).split()).upper() if len(compact) >= 5: return f"{compact[:-3]} {compact[-3:]}" return compact def _matched_files(spec: str) -> tuple[Path, str, list[Path]]: base, pattern = _split_glob(spec) if not base.exists(): return base, pattern, [] return base, pattern, sorted(path for path in base.glob(pattern) if path.is_file()) def _failures_for_glob(spec: str) -> list[str]: base, pattern, paths = _matched_files(spec) if not paths: return [f"{base}: no files matched {pattern!r}"] failures: list[str] = [] for path in paths: failures.extend(_failures_for_file(path)) return failures def _failures_for_zip_glob(spec: str) -> list[str]: base, pattern, paths = _matched_files(spec) if not paths: return [f"{base}: no zip files matched {pattern!r}"] failures: list[str] = [] for path in paths: failures.extend(_failures_for_zip(path)) return failures def _postcode_column(columns: list[str]) -> str | None: for name in ("postcode", "Postcode", "pcds", "PCDS"): if name in columns: return name return None def _parquet_postcodes(path: Path) -> set[str]: schema = pl.scan_parquet(path).collect_schema() column = _postcode_column(schema.names()) if column is None: raise ValueError(f"{path}: missing postcode column") values = ( pl.scan_parquet(path) .select(pl.col(column).drop_nulls().unique()) .collect() .get_column(column) .to_list() ) return {_canonical_postcode(value) for value in values if _canonical_postcode(value)} def _boundary_postcodes(path: Path) -> set[str]: units_dir = path / "units" if (path / "units").is_dir() else path postcodes: set[str] = set() for geojson_path in sorted(units_dir.glob("*.geojson")): with geojson_path.open("r", encoding="utf-8") as handle: data = json.load(handle) for feature in data.get("features", []): properties = feature.get("properties") or {} value = properties.get("postcodes") if value is not None: postcode = _canonical_postcode(value) if postcode: postcodes.add(postcode) return postcodes def _sample(values: set[str]) -> str: return ", ".join(sorted(values)[:10]) def _failures_for_postcode_boundary_match(spec: str) -> list[str]: parquet_path, boundaries_path = _split_pair(spec, "postcode boundary matching") failures = _failures_for_parquet(parquet_path) + _failures_for_dir(boundaries_path) if failures: return failures try: parquet_postcodes = _parquet_postcodes(parquet_path) boundary_postcodes = _boundary_postcodes(boundaries_path) except Exception as exc: return [f"{parquet_path} / {boundaries_path}: postcode match check failed: {exc}"] failures = [] if not boundary_postcodes: failures.append(f"{boundaries_path}: no boundary postcodes found") missing_boundaries = parquet_postcodes - boundary_postcodes orphan_boundaries = boundary_postcodes - parquet_postcodes if missing_boundaries: failures.append( f"{boundaries_path}: {len(missing_boundaries):,} postcodes from {parquet_path} " f"are missing boundaries; sample: {_sample(missing_boundaries)}" ) if orphan_boundaries: failures.append( f"{boundaries_path}: {len(orphan_boundaries):,} boundary postcodes are absent from " f"{parquet_path}; sample: {_sample(orphan_boundaries)}" ) return failures def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--file", action="append", default=[], type=Path) parser.add_argument("--dir", action="append", default=[], type=Path) parser.add_argument("--parquet", action="append", default=[], type=Path) parser.add_argument("--zip", action="append", default=[], type=Path) parser.add_argument( "--glob", action="append", default=[], help="Require at least one non-empty file matching BASE::PATTERN", ) parser.add_argument( "--zip-glob", action="append", default=[], help="Require at least one readable zip matching BASE::PATTERN", ) parser.add_argument( "--postcode-boundary-match", action="append", default=[], help="Require postcode parquet keys to exactly match boundary GeoJSON postcodes: PARQUET::DIR", ) args = parser.parse_args() failures: list[str] = [] for path in args.file: failures.extend(_failures_for_file(path)) for path in args.dir: failures.extend(_failures_for_dir(path)) for path in args.parquet: failures.extend(_failures_for_parquet(path)) for path in args.zip: failures.extend(_failures_for_zip(path)) for spec in args.glob: failures.extend(_failures_for_glob(spec)) for spec in args.zip_glob: failures.extend(_failures_for_zip_glob(spec)) for spec in args.postcode_boundary_match: failures.extend(_failures_for_postcode_boundary_match(spec)) if failures: print("Output validation failed:", file=sys.stderr) for failure in failures: print(f" - {failure}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())