perfect-postcode/pipeline/check_travel_times.py
2026-06-02 13:46:18 +01:00

287 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Find corrupted and duplicate travel-time parquet files.
A travel-time parquet file is considered corrupted when the R5 routing
computation failed or was interrupted, leaving either zero rows or only
the origin postcode. We detect this by an absolute, structural criterion:
a file is corrupt only when it is unreadable or has a row count at or below
CORRUPT_ROW_FLOOR. Per-mode percentile/median/range figures are reported
for context only — they never drive the deletable set, so repeated runs
(including with --delete) are idempotent and never erode legitimate
small-catchment (rural/island) origins.
Duplicates arise when places.parquet is rebuilt between R5 runs — each
place gets a new numeric index prefix, so the skip-completed logic
doesn't recognize previous results. --dedup keeps only the largest
file per slug and removes the rest.
Usage:
uv run python pipeline/check_travel_times.py [--travel-times property-data/travel-times]
[--delete]
[--dedup]
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass
from pathlib import Path
import polars as pl
# Absolute row-count floor for corruption: a readable file with this many
# rows or fewer holds at most the origin postcode (R5 failed/interrupted).
# This is a structural threshold, NOT a population percentile, so repeated
# runs are idempotent and never delete a fresh fraction of valid files.
CORRUPT_ROW_FLOOR = 1
@dataclass
class BadFile:
mode: str
filename: str
slug: str
rows: int
def scan_mode(mode_dir: Path, mode: str) -> tuple[list[tuple[str, str, int]], int]:
"""Return (filename, slug, row_count) for every parquet in a mode dir."""
entries: list[tuple[str, str, int]] = []
errors = 0
for f in sorted(os.listdir(mode_dir)):
if not f.endswith(".parquet"):
continue
path = mode_dir / f
slug = f.removesuffix(".parquet")
# Strip numeric prefix (e.g. "000699-london-bridge" → "london-bridge")
if "-" in slug:
prefix, rest = slug.split("-", 1)
if prefix.isdigit():
slug = rest
try:
rows = pl.scan_parquet(path).select(pl.len()).collect().item()
except Exception as exc:
print(f" ERROR reading {mode}/{f}: {exc}", file=sys.stderr)
errors += 1
entries.append((f, slug, -1))
continue
entries.append((f, slug, rows))
return entries, errors
def percentile(values: list[int], pct: float) -> float:
"""Linear-interpolation percentile on a sorted list."""
if not values:
return 0.0
s = sorted(values)
idx = (pct / 100) * (len(s) - 1)
lo = int(idx)
hi = min(lo + 1, len(s) - 1)
frac = idx - lo
return s[lo] + frac * (s[hi] - s[lo])
def find_bad_files(base_dir: Path) -> tuple[list[BadFile], dict[str, dict]]:
"""Scan all modes and return bad files + per-mode stats.
A file is "bad" (deletable) only by an absolute structural criterion:
it is unreadable (rows < 0) or holds at most the origin postcode
(rows <= CORRUPT_ROW_FLOOR). The p5/median/min/max figures are computed
purely for reporting and do NOT influence the deletable set.
"""
bad: list[BadFile] = []
stats: dict[str, dict] = {}
modes = sorted(d for d in os.listdir(base_dir) if (base_dir / d).is_dir())
for mode in modes:
mode_dir = base_dir / mode
entries, errors = scan_mode(mode_dir, mode)
if not entries:
continue
row_counts = [r for _, _, r in entries if r >= 0]
if not row_counts:
continue
# Reporting statistics only — these never decide what gets deleted.
p5 = percentile(row_counts, 5)
median = percentile(row_counts, 50)
mode_bad = []
for filename, slug, rows in entries:
# Corrupt = unreadable, or at/below the absolute origin-only floor.
if rows < 0 or rows <= CORRUPT_ROW_FLOOR:
bf = BadFile(mode=mode, filename=filename, slug=slug, rows=rows)
mode_bad.append(bf)
bad.append(bf)
stats[mode] = {
"total": len(entries),
"errors": errors,
"bad": len(mode_bad),
"floor": CORRUPT_ROW_FLOOR,
"p5": p5,
"median": median,
"min": min(row_counts),
"max": max(row_counts),
}
return bad, stats
def find_duplicates(base_dir: Path) -> tuple[list[BadFile], dict[str, dict]]:
"""Find duplicate files (same slug, different numeric prefix). Keep the largest."""
dupes: list[BadFile] = []
stats: dict[str, dict] = {}
modes = sorted(d for d in os.listdir(base_dir) if (base_dir / d).is_dir())
for mode in modes:
mode_dir = base_dir / mode
entries, _ = scan_mode(mode_dir, mode)
if not entries:
continue
# Group by slug, keep largest
slug_files: dict[str, list[tuple[str, int]]] = {}
for filename, slug, rows in entries:
slug_files.setdefault(slug, []).append((filename, rows))
mode_dupes = 0
for slug, files in slug_files.items():
if len(files) <= 1:
continue
# Keep the file with the most rows
files.sort(key=lambda x: x[1], reverse=True)
for filename, rows in files[1:]:
dupes.append(
BadFile(mode=mode, filename=filename, slug=slug, rows=rows)
)
mode_dupes += 1
duped_slugs = sum(1 for fs in slug_files.values() if len(fs) > 1)
stats[mode] = {
"total": len(entries),
"unique_slugs": len(slug_files),
"duped_slugs": duped_slugs,
"removable": mode_dupes,
}
return dupes, stats
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--travel-times",
type=Path,
default=Path("property-data/travel-times"),
help="Path to travel-times directory",
)
parser.add_argument(
"--delete",
action="store_true",
help=(
"Delete corrupted files (unreadable or "
f"<= {CORRUPT_ROW_FLOOR} row) so R5 will recompute them"
),
)
parser.add_argument(
"--dedup",
action="store_true",
help="Remove duplicate files (keep largest per slug)",
)
args = parser.parse_args()
if not args.travel_times.is_dir():
print(f"Error: {args.travel_times} is not a directory", file=sys.stderr)
sys.exit(1)
# --- Corruption check ---
bad_files, stats = find_bad_files(args.travel_times)
print("=== Per-mode summary ===\n")
# Floor is the absolute deletion threshold; p5/median/range are reporting
# context only and do not affect which files are flagged as corrupt.
print(
f"{'Mode':<10} {'Total':>6} {'Bad':>5} {'Floor':>6} {'P5':>8} {'Median':>8} {'Range':>20}"
)
print("-" * 71)
for mode, s in sorted(stats.items()):
rng = f"{s['min']:,}{s['max']:,}"
print(
f"{mode:<10} {s['total']:>6} {s['bad']:>5} {s['floor']:>6,} "
f"{s['p5']:>8,.0f} {s['median']:>8,.0f} {rng:>20}"
)
if bad_files:
print(f"\n=== Corrupted files ({len(bad_files)} total) ===\n")
current_mode = ""
for bf in sorted(bad_files, key=lambda b: (b.mode, b.rows, b.slug)):
if bf.mode != current_mode:
current_mode = bf.mode
print(f"\n {current_mode}/")
status = "UNREADABLE" if bf.rows < 0 else f"{bf.rows} rows"
print(f" {bf.filename} ({status})")
if args.delete:
print(f"\nDeleting {len(bad_files)} corrupted files...")
deleted = _delete_files(args.travel_times, bad_files)
print(f"Deleted {deleted}/{len(bad_files)} files.")
else:
print("\nRun with --delete to remove these files so R5 can recompute them.")
else:
print("\nNo corrupted files found.")
# --- Dedup check ---
dupe_files, dupe_stats = find_duplicates(args.travel_times)
total_removable = sum(s["removable"] for s in dupe_stats.values())
if total_removable > 0:
print(f"\n=== Duplicates ({total_removable} removable files) ===\n")
print(
f"{'Mode':<10} {'Total':>6} {'Unique':>7} {'Duped slugs':>12} {'Removable':>10}"
)
print("-" * 50)
for mode, s in sorted(dupe_stats.items()):
if s["removable"] > 0:
print(
f"{mode:<10} {s['total']:>6} {s['unique_slugs']:>7} "
f"{s['duped_slugs']:>12} {s['removable']:>10}"
)
if args.dedup:
# Exclude files already deleted by --delete
deleted_set = (
{(bf.mode, bf.filename) for bf in bad_files} if args.delete else set()
)
to_delete = [
df for df in dupe_files if (df.mode, df.filename) not in deleted_set
]
print(
f"\nRemoving {len(to_delete)} duplicate files (keeping largest per slug)..."
)
deleted = _delete_files(args.travel_times, to_delete)
print(f"Deleted {deleted}/{len(to_delete)} files.")
else:
print("\nRun with --dedup to remove duplicates (keeps largest per slug).")
else:
print("\nNo duplicates found.")
def _delete_files(base_dir: Path, files: list[BadFile]) -> int:
deleted = 0
for bf in files:
path = base_dir / bf.mode / bf.filename
try:
path.unlink()
deleted += 1
except OSError as exc:
print(f" Failed to delete {path}: {exc}", file=sys.stderr)
return deleted
if __name__ == "__main__":
main()