269 lines
9 KiB
Python
269 lines
9 KiB
Python
"""Find corrupted and duplicate travel-time parquet files.
|
||
|
||
A travel-time parquet file is considered corrupted when the R5 routing
|
||
computation failed or was interrupted, leaving either zero rows or only
|
||
the origin postcode. We detect this by comparing each file's row count
|
||
against a per-mode threshold derived from the 5th-percentile of all files
|
||
in that mode. Files at or below 1 row are always flagged.
|
||
|
||
Duplicates arise when places.parquet is rebuilt between R5 runs — each
|
||
place gets a new numeric index prefix, so the skip-completed logic
|
||
doesn't recognize previous results. --dedup keeps only the largest
|
||
file per slug and removes the rest.
|
||
|
||
Usage:
|
||
uv run python pipeline/check_travel_times.py [--travel-times property-data/travel-times]
|
||
[--threshold-pct 5]
|
||
[--delete]
|
||
[--dedup]
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
import polars as pl
|
||
|
||
|
||
@dataclass
|
||
class BadFile:
|
||
mode: str
|
||
filename: str
|
||
slug: str
|
||
rows: int
|
||
|
||
|
||
def scan_mode(mode_dir: Path, mode: str) -> tuple[list[tuple[str, str, int]], int]:
|
||
"""Return (filename, slug, row_count) for every parquet in a mode dir."""
|
||
entries: list[tuple[str, str, int]] = []
|
||
errors = 0
|
||
for f in sorted(os.listdir(mode_dir)):
|
||
if not f.endswith(".parquet"):
|
||
continue
|
||
path = mode_dir / f
|
||
slug = f.removesuffix(".parquet")
|
||
# Strip numeric prefix (e.g. "000699-london-bridge" → "london-bridge")
|
||
if "-" in slug:
|
||
prefix, rest = slug.split("-", 1)
|
||
if prefix.isdigit():
|
||
slug = rest
|
||
try:
|
||
rows = pl.scan_parquet(path).select(pl.len()).collect().item()
|
||
except Exception as exc:
|
||
print(f" ERROR reading {mode}/{f}: {exc}", file=sys.stderr)
|
||
errors += 1
|
||
entries.append((f, slug, -1))
|
||
continue
|
||
entries.append((f, slug, rows))
|
||
return entries, errors
|
||
|
||
|
||
def percentile(values: list[int], pct: float) -> float:
|
||
"""Linear-interpolation percentile on a sorted list."""
|
||
if not values:
|
||
return 0.0
|
||
s = sorted(values)
|
||
idx = (pct / 100) * (len(s) - 1)
|
||
lo = int(idx)
|
||
hi = min(lo + 1, len(s) - 1)
|
||
frac = idx - lo
|
||
return s[lo] + frac * (s[hi] - s[lo])
|
||
|
||
|
||
def find_bad_files(
|
||
base_dir: Path, threshold_pct: float
|
||
) -> tuple[list[BadFile], dict[str, dict]]:
|
||
"""Scan all modes and return bad files + per-mode stats."""
|
||
bad: list[BadFile] = []
|
||
stats: dict[str, dict] = {}
|
||
|
||
modes = sorted(
|
||
d
|
||
for d in os.listdir(base_dir)
|
||
if (base_dir / d).is_dir()
|
||
)
|
||
|
||
for mode in modes:
|
||
mode_dir = base_dir / mode
|
||
entries, errors = scan_mode(mode_dir, mode)
|
||
if not entries:
|
||
continue
|
||
|
||
row_counts = [r for _, _, r in entries if r >= 0]
|
||
if not row_counts:
|
||
continue
|
||
|
||
p5 = percentile(row_counts, threshold_pct)
|
||
median = percentile(row_counts, 50)
|
||
# Threshold: max of 1 and the chosen percentile — ensures we always
|
||
# catch files with 0-1 rows even if p5 is 0 (e.g. walking mode).
|
||
threshold = max(1, int(p5))
|
||
|
||
mode_bad = []
|
||
for filename, slug, rows in entries:
|
||
if rows <= threshold:
|
||
bf = BadFile(mode=mode, filename=filename, slug=slug, rows=rows)
|
||
mode_bad.append(bf)
|
||
bad.append(bf)
|
||
|
||
stats[mode] = {
|
||
"total": len(entries),
|
||
"errors": errors,
|
||
"bad": len(mode_bad),
|
||
"threshold": threshold,
|
||
"p5": p5,
|
||
"median": median,
|
||
"min": min(row_counts),
|
||
"max": max(row_counts),
|
||
}
|
||
|
||
return bad, stats
|
||
|
||
|
||
def find_duplicates(base_dir: Path) -> tuple[list[BadFile], dict[str, dict]]:
|
||
"""Find duplicate files (same slug, different numeric prefix). Keep the largest."""
|
||
dupes: list[BadFile] = []
|
||
stats: dict[str, dict] = {}
|
||
|
||
modes = sorted(d for d in os.listdir(base_dir) if (base_dir / d).is_dir())
|
||
|
||
for mode in modes:
|
||
mode_dir = base_dir / mode
|
||
entries, _ = scan_mode(mode_dir, mode)
|
||
if not entries:
|
||
continue
|
||
|
||
# Group by slug, keep largest
|
||
slug_files: dict[str, list[tuple[str, int]]] = {}
|
||
for filename, slug, rows in entries:
|
||
slug_files.setdefault(slug, []).append((filename, rows))
|
||
|
||
mode_dupes = 0
|
||
for slug, files in slug_files.items():
|
||
if len(files) <= 1:
|
||
continue
|
||
# Keep the file with the most rows
|
||
files.sort(key=lambda x: x[1], reverse=True)
|
||
for filename, rows in files[1:]:
|
||
dupes.append(BadFile(mode=mode, filename=filename, slug=slug, rows=rows))
|
||
mode_dupes += 1
|
||
|
||
duped_slugs = sum(1 for fs in slug_files.values() if len(fs) > 1)
|
||
stats[mode] = {
|
||
"total": len(entries),
|
||
"unique_slugs": len(slug_files),
|
||
"duped_slugs": duped_slugs,
|
||
"removable": mode_dupes,
|
||
}
|
||
|
||
return dupes, stats
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument(
|
||
"--travel-times",
|
||
type=Path,
|
||
default=Path("property-data/travel-times"),
|
||
help="Path to travel-times directory",
|
||
)
|
||
parser.add_argument(
|
||
"--threshold-pct",
|
||
type=float,
|
||
default=5,
|
||
help="Percentile below which files are flagged (default: 5th)",
|
||
)
|
||
parser.add_argument(
|
||
"--delete",
|
||
action="store_true",
|
||
help="Delete corrupted files (so R5 will recompute them)",
|
||
)
|
||
parser.add_argument(
|
||
"--dedup",
|
||
action="store_true",
|
||
help="Remove duplicate files (keep largest per slug)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not args.travel_times.is_dir():
|
||
print(f"Error: {args.travel_times} is not a directory", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# --- Corruption check ---
|
||
bad_files, stats = find_bad_files(args.travel_times, args.threshold_pct)
|
||
|
||
print("=== Per-mode summary ===\n")
|
||
print(f"{'Mode':<10} {'Total':>6} {'Bad':>5} {'Threshold':>10} {'Median':>8} {'Range':>20}")
|
||
print("-" * 65)
|
||
for mode, s in sorted(stats.items()):
|
||
rng = f"{s['min']:,}–{s['max']:,}"
|
||
print(
|
||
f"{mode:<10} {s['total']:>6} {s['bad']:>5} {s['threshold']:>10,} "
|
||
f"{s['median']:>8,.0f} {rng:>20}"
|
||
)
|
||
|
||
if bad_files:
|
||
print(f"\n=== Corrupted files ({len(bad_files)} total) ===\n")
|
||
current_mode = ""
|
||
for bf in sorted(bad_files, key=lambda b: (b.mode, b.rows, b.slug)):
|
||
if bf.mode != current_mode:
|
||
current_mode = bf.mode
|
||
print(f"\n {current_mode}/")
|
||
status = "UNREADABLE" if bf.rows < 0 else f"{bf.rows} rows"
|
||
print(f" {bf.filename} ({status})")
|
||
|
||
if args.delete:
|
||
print(f"\nDeleting {len(bad_files)} corrupted files...")
|
||
deleted = _delete_files(args.travel_times, bad_files)
|
||
print(f"Deleted {deleted}/{len(bad_files)} files.")
|
||
else:
|
||
print("\nRun with --delete to remove these files so R5 can recompute them.")
|
||
else:
|
||
print("\nNo corrupted files found.")
|
||
|
||
# --- Dedup check ---
|
||
dupe_files, dupe_stats = find_duplicates(args.travel_times)
|
||
|
||
total_removable = sum(s["removable"] for s in dupe_stats.values())
|
||
if total_removable > 0:
|
||
print(f"\n=== Duplicates ({total_removable} removable files) ===\n")
|
||
print(f"{'Mode':<10} {'Total':>6} {'Unique':>7} {'Duped slugs':>12} {'Removable':>10}")
|
||
print("-" * 50)
|
||
for mode, s in sorted(dupe_stats.items()):
|
||
if s["removable"] > 0:
|
||
print(
|
||
f"{mode:<10} {s['total']:>6} {s['unique_slugs']:>7} "
|
||
f"{s['duped_slugs']:>12} {s['removable']:>10}"
|
||
)
|
||
|
||
if args.dedup:
|
||
# Exclude files already deleted by --delete
|
||
deleted_set = {(bf.mode, bf.filename) for bf in bad_files} if args.delete else set()
|
||
to_delete = [df for df in dupe_files if (df.mode, df.filename) not in deleted_set]
|
||
print(f"\nRemoving {len(to_delete)} duplicate files (keeping largest per slug)...")
|
||
deleted = _delete_files(args.travel_times, to_delete)
|
||
print(f"Deleted {deleted}/{len(to_delete)} files.")
|
||
else:
|
||
print("\nRun with --dedup to remove duplicates (keeps largest per slug).")
|
||
else:
|
||
print("\nNo duplicates found.")
|
||
|
||
|
||
def _delete_files(base_dir: Path, files: list[BadFile]) -> int:
|
||
deleted = 0
|
||
for bf in files:
|
||
path = base_dir / bf.mode / bf.filename
|
||
try:
|
||
path.unlink()
|
||
deleted += 1
|
||
except OSError as exc:
|
||
print(f" Failed to delete {path}: {exc}", file=sys.stderr)
|
||
return deleted
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|