"""Find corrupted and duplicate travel-time parquet files. A travel-time parquet file is considered corrupted when the R5 routing computation failed or was interrupted, leaving either zero rows or only the origin postcode. We detect this by comparing each file's row count against a per-mode threshold derived from the 5th-percentile of all files in that mode. Files at or below 1 row are always flagged. Duplicates arise when places.parquet is rebuilt between R5 runs — each place gets a new numeric index prefix, so the skip-completed logic doesn't recognize previous results. --dedup keeps only the largest file per slug and removes the rest. Usage: uv run python pipeline/check_travel_times.py [--travel-times property-data/travel-times] [--threshold-pct 5] [--delete] [--dedup] """ from __future__ import annotations import argparse import os import sys from dataclasses import dataclass from pathlib import Path import polars as pl @dataclass class BadFile: mode: str filename: str slug: str rows: int def scan_mode(mode_dir: Path, mode: str) -> tuple[list[tuple[str, str, int]], int]: """Return (filename, slug, row_count) for every parquet in a mode dir.""" entries: list[tuple[str, str, int]] = [] errors = 0 for f in sorted(os.listdir(mode_dir)): if not f.endswith(".parquet"): continue path = mode_dir / f slug = f.removesuffix(".parquet") # Strip numeric prefix (e.g. "000699-london-bridge" → "london-bridge") if "-" in slug: prefix, rest = slug.split("-", 1) if prefix.isdigit(): slug = rest try: rows = pl.scan_parquet(path).select(pl.len()).collect().item() except Exception as exc: print(f" ERROR reading {mode}/{f}: {exc}", file=sys.stderr) errors += 1 entries.append((f, slug, -1)) continue entries.append((f, slug, rows)) return entries, errors def percentile(values: list[int], pct: float) -> float: """Linear-interpolation percentile on a sorted list.""" if not values: return 0.0 s = sorted(values) idx = (pct / 100) * (len(s) - 1) lo = int(idx) hi = min(lo + 1, len(s) - 1) frac = idx - lo return s[lo] + frac * (s[hi] - s[lo]) def find_bad_files( base_dir: Path, threshold_pct: float ) -> tuple[list[BadFile], dict[str, dict]]: """Scan all modes and return bad files + per-mode stats.""" bad: list[BadFile] = [] stats: dict[str, dict] = {} modes = sorted( d for d in os.listdir(base_dir) if (base_dir / d).is_dir() ) for mode in modes: mode_dir = base_dir / mode entries, errors = scan_mode(mode_dir, mode) if not entries: continue row_counts = [r for _, _, r in entries if r >= 0] if not row_counts: continue p5 = percentile(row_counts, threshold_pct) median = percentile(row_counts, 50) # Threshold: max of 1 and the chosen percentile — ensures we always # catch files with 0-1 rows even if p5 is 0 (e.g. walking mode). threshold = max(1, int(p5)) mode_bad = [] for filename, slug, rows in entries: if rows <= threshold: bf = BadFile(mode=mode, filename=filename, slug=slug, rows=rows) mode_bad.append(bf) bad.append(bf) stats[mode] = { "total": len(entries), "errors": errors, "bad": len(mode_bad), "threshold": threshold, "p5": p5, "median": median, "min": min(row_counts), "max": max(row_counts), } return bad, stats def find_duplicates(base_dir: Path) -> tuple[list[BadFile], dict[str, dict]]: """Find duplicate files (same slug, different numeric prefix). Keep the largest.""" dupes: list[BadFile] = [] stats: dict[str, dict] = {} modes = sorted(d for d in os.listdir(base_dir) if (base_dir / d).is_dir()) for mode in modes: mode_dir = base_dir / mode entries, _ = scan_mode(mode_dir, mode) if not entries: continue # Group by slug, keep largest slug_files: dict[str, list[tuple[str, int]]] = {} for filename, slug, rows in entries: slug_files.setdefault(slug, []).append((filename, rows)) mode_dupes = 0 for slug, files in slug_files.items(): if len(files) <= 1: continue # Keep the file with the most rows files.sort(key=lambda x: x[1], reverse=True) for filename, rows in files[1:]: dupes.append(BadFile(mode=mode, filename=filename, slug=slug, rows=rows)) mode_dupes += 1 duped_slugs = sum(1 for fs in slug_files.values() if len(fs) > 1) stats[mode] = { "total": len(entries), "unique_slugs": len(slug_files), "duped_slugs": duped_slugs, "removable": mode_dupes, } return dupes, stats def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--travel-times", type=Path, default=Path("property-data/travel-times"), help="Path to travel-times directory", ) parser.add_argument( "--threshold-pct", type=float, default=5, help="Percentile below which files are flagged (default: 5th)", ) parser.add_argument( "--delete", action="store_true", help="Delete corrupted files (so R5 will recompute them)", ) parser.add_argument( "--dedup", action="store_true", help="Remove duplicate files (keep largest per slug)", ) args = parser.parse_args() if not args.travel_times.is_dir(): print(f"Error: {args.travel_times} is not a directory", file=sys.stderr) sys.exit(1) # --- Corruption check --- bad_files, stats = find_bad_files(args.travel_times, args.threshold_pct) print("=== Per-mode summary ===\n") print(f"{'Mode':<10} {'Total':>6} {'Bad':>5} {'Threshold':>10} {'Median':>8} {'Range':>20}") print("-" * 65) for mode, s in sorted(stats.items()): rng = f"{s['min']:,}–{s['max']:,}" print( f"{mode:<10} {s['total']:>6} {s['bad']:>5} {s['threshold']:>10,} " f"{s['median']:>8,.0f} {rng:>20}" ) if bad_files: print(f"\n=== Corrupted files ({len(bad_files)} total) ===\n") current_mode = "" for bf in sorted(bad_files, key=lambda b: (b.mode, b.rows, b.slug)): if bf.mode != current_mode: current_mode = bf.mode print(f"\n {current_mode}/") status = "UNREADABLE" if bf.rows < 0 else f"{bf.rows} rows" print(f" {bf.filename} ({status})") if args.delete: print(f"\nDeleting {len(bad_files)} corrupted files...") deleted = _delete_files(args.travel_times, bad_files) print(f"Deleted {deleted}/{len(bad_files)} files.") else: print("\nRun with --delete to remove these files so R5 can recompute them.") else: print("\nNo corrupted files found.") # --- Dedup check --- dupe_files, dupe_stats = find_duplicates(args.travel_times) total_removable = sum(s["removable"] for s in dupe_stats.values()) if total_removable > 0: print(f"\n=== Duplicates ({total_removable} removable files) ===\n") print(f"{'Mode':<10} {'Total':>6} {'Unique':>7} {'Duped slugs':>12} {'Removable':>10}") print("-" * 50) for mode, s in sorted(dupe_stats.items()): if s["removable"] > 0: print( f"{mode:<10} {s['total']:>6} {s['unique_slugs']:>7} " f"{s['duped_slugs']:>12} {s['removable']:>10}" ) if args.dedup: # Exclude files already deleted by --delete deleted_set = {(bf.mode, bf.filename) for bf in bad_files} if args.delete else set() to_delete = [df for df in dupe_files if (df.mode, df.filename) not in deleted_set] print(f"\nRemoving {len(to_delete)} duplicate files (keeping largest per slug)...") deleted = _delete_files(args.travel_times, to_delete) print(f"Deleted {deleted}/{len(to_delete)} files.") else: print("\nRun with --dedup to remove duplicates (keeps largest per slug).") else: print("\nNo duplicates found.") def _delete_files(base_dir: Path, files: list[BadFile]) -> int: deleted = 0 for bf in files: path = base_dir / bf.mode / bf.filename try: path.unlink() deleted += 1 except OSError as exc: print(f" Failed to delete {path}: {exc}", file=sys.stderr) return deleted if __name__ == "__main__": main()