good changes

This commit is contained in:
Andras Schmelczer 2026-03-25 08:04:48 +00:00
parent 160283f1a1
commit c997ea46a5
26 changed files with 991 additions and 288 deletions

View file

@ -0,0 +1,269 @@
"""Find corrupted and duplicate travel-time parquet files.
A travel-time parquet file is considered corrupted when the R5 routing
computation failed or was interrupted, leaving either zero rows or only
the origin postcode. We detect this by comparing each file's row count
against a per-mode threshold derived from the 5th-percentile of all files
in that mode. Files at or below 1 row are always flagged.
Duplicates arise when places.parquet is rebuilt between R5 runs each
place gets a new numeric index prefix, so the skip-completed logic
doesn't recognize previous results. --dedup keeps only the largest
file per slug and removes the rest.
Usage:
uv run python pipeline/check_travel_times.py [--travel-times property-data/travel-times]
[--threshold-pct 5]
[--delete]
[--dedup]
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass
from pathlib import Path
import polars as pl
@dataclass
class BadFile:
mode: str
filename: str
slug: str
rows: int
def scan_mode(mode_dir: Path, mode: str) -> tuple[list[tuple[str, str, int]], int]:
"""Return (filename, slug, row_count) for every parquet in a mode dir."""
entries: list[tuple[str, str, int]] = []
errors = 0
for f in sorted(os.listdir(mode_dir)):
if not f.endswith(".parquet"):
continue
path = mode_dir / f
slug = f.removesuffix(".parquet")
# Strip numeric prefix (e.g. "000699-london-bridge" → "london-bridge")
if "-" in slug:
prefix, rest = slug.split("-", 1)
if prefix.isdigit():
slug = rest
try:
rows = pl.scan_parquet(path).select(pl.len()).collect().item()
except Exception as exc:
print(f" ERROR reading {mode}/{f}: {exc}", file=sys.stderr)
errors += 1
entries.append((f, slug, -1))
continue
entries.append((f, slug, rows))
return entries, errors
def percentile(values: list[int], pct: float) -> float:
"""Linear-interpolation percentile on a sorted list."""
if not values:
return 0.0
s = sorted(values)
idx = (pct / 100) * (len(s) - 1)
lo = int(idx)
hi = min(lo + 1, len(s) - 1)
frac = idx - lo
return s[lo] + frac * (s[hi] - s[lo])
def find_bad_files(
base_dir: Path, threshold_pct: float
) -> tuple[list[BadFile], dict[str, dict]]:
"""Scan all modes and return bad files + per-mode stats."""
bad: list[BadFile] = []
stats: dict[str, dict] = {}
modes = sorted(
d
for d in os.listdir(base_dir)
if (base_dir / d).is_dir()
)
for mode in modes:
mode_dir = base_dir / mode
entries, errors = scan_mode(mode_dir, mode)
if not entries:
continue
row_counts = [r for _, _, r in entries if r >= 0]
if not row_counts:
continue
p5 = percentile(row_counts, threshold_pct)
median = percentile(row_counts, 50)
# Threshold: max of 1 and the chosen percentile — ensures we always
# catch files with 0-1 rows even if p5 is 0 (e.g. walking mode).
threshold = max(1, int(p5))
mode_bad = []
for filename, slug, rows in entries:
if rows <= threshold:
bf = BadFile(mode=mode, filename=filename, slug=slug, rows=rows)
mode_bad.append(bf)
bad.append(bf)
stats[mode] = {
"total": len(entries),
"errors": errors,
"bad": len(mode_bad),
"threshold": threshold,
"p5": p5,
"median": median,
"min": min(row_counts),
"max": max(row_counts),
}
return bad, stats
def find_duplicates(base_dir: Path) -> tuple[list[BadFile], dict[str, dict]]:
"""Find duplicate files (same slug, different numeric prefix). Keep the largest."""
dupes: list[BadFile] = []
stats: dict[str, dict] = {}
modes = sorted(d for d in os.listdir(base_dir) if (base_dir / d).is_dir())
for mode in modes:
mode_dir = base_dir / mode
entries, _ = scan_mode(mode_dir, mode)
if not entries:
continue
# Group by slug, keep largest
slug_files: dict[str, list[tuple[str, int]]] = {}
for filename, slug, rows in entries:
slug_files.setdefault(slug, []).append((filename, rows))
mode_dupes = 0
for slug, files in slug_files.items():
if len(files) <= 1:
continue
# Keep the file with the most rows
files.sort(key=lambda x: x[1], reverse=True)
for filename, rows in files[1:]:
dupes.append(BadFile(mode=mode, filename=filename, slug=slug, rows=rows))
mode_dupes += 1
duped_slugs = sum(1 for fs in slug_files.values() if len(fs) > 1)
stats[mode] = {
"total": len(entries),
"unique_slugs": len(slug_files),
"duped_slugs": duped_slugs,
"removable": mode_dupes,
}
return dupes, stats
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--travel-times",
type=Path,
default=Path("property-data/travel-times"),
help="Path to travel-times directory",
)
parser.add_argument(
"--threshold-pct",
type=float,
default=5,
help="Percentile below which files are flagged (default: 5th)",
)
parser.add_argument(
"--delete",
action="store_true",
help="Delete corrupted files (so R5 will recompute them)",
)
parser.add_argument(
"--dedup",
action="store_true",
help="Remove duplicate files (keep largest per slug)",
)
args = parser.parse_args()
if not args.travel_times.is_dir():
print(f"Error: {args.travel_times} is not a directory", file=sys.stderr)
sys.exit(1)
# --- Corruption check ---
bad_files, stats = find_bad_files(args.travel_times, args.threshold_pct)
print("=== Per-mode summary ===\n")
print(f"{'Mode':<10} {'Total':>6} {'Bad':>5} {'Threshold':>10} {'Median':>8} {'Range':>20}")
print("-" * 65)
for mode, s in sorted(stats.items()):
rng = f"{s['min']:,}{s['max']:,}"
print(
f"{mode:<10} {s['total']:>6} {s['bad']:>5} {s['threshold']:>10,} "
f"{s['median']:>8,.0f} {rng:>20}"
)
if bad_files:
print(f"\n=== Corrupted files ({len(bad_files)} total) ===\n")
current_mode = ""
for bf in sorted(bad_files, key=lambda b: (b.mode, b.rows, b.slug)):
if bf.mode != current_mode:
current_mode = bf.mode
print(f"\n {current_mode}/")
status = "UNREADABLE" if bf.rows < 0 else f"{bf.rows} rows"
print(f" {bf.filename} ({status})")
if args.delete:
print(f"\nDeleting {len(bad_files)} corrupted files...")
deleted = _delete_files(args.travel_times, bad_files)
print(f"Deleted {deleted}/{len(bad_files)} files.")
else:
print(f"\nRun with --delete to remove these files so R5 can recompute them.")
else:
print("\nNo corrupted files found.")
# --- Dedup check ---
dupe_files, dupe_stats = find_duplicates(args.travel_times)
total_removable = sum(s["removable"] for s in dupe_stats.values())
if total_removable > 0:
print(f"\n=== Duplicates ({total_removable} removable files) ===\n")
print(f"{'Mode':<10} {'Total':>6} {'Unique':>7} {'Duped slugs':>12} {'Removable':>10}")
print("-" * 50)
for mode, s in sorted(dupe_stats.items()):
if s["removable"] > 0:
print(
f"{mode:<10} {s['total']:>6} {s['unique_slugs']:>7} "
f"{s['duped_slugs']:>12} {s['removable']:>10}"
)
if args.dedup:
# Exclude files already deleted by --delete
deleted_set = {(bf.mode, bf.filename) for bf in bad_files} if args.delete else set()
to_delete = [df for df in dupe_files if (df.mode, df.filename) not in deleted_set]
print(f"\nRemoving {len(to_delete)} duplicate files (keeping largest per slug)...")
deleted = _delete_files(args.travel_times, to_delete)
print(f"Deleted {deleted}/{len(to_delete)} files.")
else:
print("\nRun with --dedup to remove duplicates (keeps largest per slug).")
else:
print("\nNo duplicates found.")
def _delete_files(base_dir: Path, files: list[BadFile]) -> int:
deleted = 0
for bf in files:
path = base_dir / bf.mode / bf.filename
try:
path.unlink()
deleted += 1
except OSError as exc:
print(f" Failed to delete {path}: {exc}", file=sys.stderr)
return deleted
if __name__ == "__main__":
main()

View file

@ -1,44 +0,0 @@
"""Download OS GeoSure ground stability data (5km hex grid).
Downloads the GB-Hex-5km-GeoSure dataset from Ordnance Survey as an ESRI
Shapefile and extracts it.
Source: https://osdatahub.os.uk/downloads/open/GeoSure
License: Open Government Licence v3.0
"""
import argparse
import tempfile
from pathlib import Path
from pipeline.utils import download, extract_zip
URL = "https://api.os.uk/downloads/v1/products/GB-Hex-5km-GeoSure/downloads?area=GB&format=ESRI%C2%AE+Shapefile&redirect"
def main() -> None:
parser = argparse.ArgumentParser(
description="Download OS GeoSure ground stability data"
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Output directory for extracted shapefile",
)
args = parser.parse_args()
with tempfile.TemporaryDirectory() as cache_dir:
zip_path = Path(cache_dir) / "geosure.zip"
download(URL, zip_path, timeout=300)
extract_zip(zip_path, args.output)
shp_files = list(args.output.rglob("*.shp"))
print(f"Extracted {len(shp_files)} shapefiles to {args.output}")
for f in shp_files:
print(f" {f.relative_to(args.output)}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,140 @@
"""Download Census 2021 median age by LSOA.
Downloads five-year age band counts (TS007A) from the NOMIS API, then computes
the median age per LSOA using linear interpolation within the median class.
Source: NOMIS (ONS Census 2021 TS007A dataset, NM_2020_1)
License: Open Government Licence v3.0
"""
import argparse
from io import BytesIO
from pathlib import Path
import httpx
import polars as pl
# NOMIS API: Census 2021 TS007A (age by five-year bands) by LSOA 2021 (TYPE151)
# c2021_age_19=1..18 selects 18 five-year bands (excluding 0 = Total)
# measures=20100 selects absolute count
BASE_URL = "https://www.nomisweb.co.uk/api/v01/dataset/NM_2020_1.data.csv?date=latest&geography=TYPE151&c2021_age_19=1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18&measures=20100&select=GEOGRAPHY_CODE,C2021_AGE_19_NAME,OBS_VALUE"
PAGE_SIZE = 25000
# Five-year age bands in order, with lower bounds for interpolation.
# The last band (85+) is open-ended — we treat it as 85-89 for median purposes.
AGE_BANDS = [
(0, 5), # Aged 0 to 4 years
(5, 5), # Aged 5 to 9 years
(10, 5), # Aged 10 to 14 years
(15, 5), # Aged 15 to 19 years
(20, 5), # Aged 20 to 24 years
(25, 5), # Aged 25 to 29 years
(30, 5), # Aged 30 to 34 years
(35, 5), # Aged 35 to 39 years
(40, 5), # Aged 40 to 44 years
(45, 5), # Aged 45 to 49 years
(50, 5), # Aged 50 to 54 years
(55, 5), # Aged 55 to 59 years
(60, 5), # Aged 60 to 64 years
(65, 5), # Aged 65 to 69 years
(70, 5), # Aged 70 to 74 years
(75, 5), # Aged 75 to 79 years
(80, 5), # Aged 80 to 84 years
(85, 5), # Aged 85 years and over
]
def compute_median_age(counts: list[int]) -> float:
"""Compute median age from five-year band counts using linear interpolation."""
total = sum(counts)
if total == 0:
return float("nan")
half = total / 2
cumulative = 0
for i, count in enumerate(counts):
if cumulative + count >= half:
lower_bound, width = AGE_BANDS[i]
# Linear interpolation within the median band
return lower_bound + ((half - cumulative) / count) * width
cumulative += count
return float("nan")
def download_and_convert(output_path: Path) -> None:
print("Downloading Census 2021 age by five-year bands from NOMIS...")
frames = []
offset = 0
while True:
url = f"{BASE_URL}&recordoffset={offset}"
response = httpx.get(url, follow_redirects=True, timeout=120)
response.raise_for_status()
if len(response.content) == 0:
break
chunk = pl.read_csv(BytesIO(response.content))
if chunk.height == 0:
break
frames.append(chunk)
print(f" Fetched {chunk.height} rows (offset={offset})")
if chunk.height < PAGE_SIZE:
break
offset += PAGE_SIZE
df = pl.concat(frames)
print(f"Total rows: {df.height}")
# Filter to England only
df = df.filter(pl.col("GEOGRAPHY_CODE").str.starts_with("E"))
# Pivot: one row per LSOA, columns = age band names, values = counts
pivoted = df.pivot(
on="C2021_AGE_19_NAME",
index="GEOGRAPHY_CODE",
values="OBS_VALUE",
)
# Extract age band columns in order and compute median
# NOMIS returns band names like "Aged 0 to 4 years", "Aged 85 years and over"
band_cols = [c for c in pivoted.columns if c != "GEOGRAPHY_CODE"]
# Sort by the lower bound of each band
band_cols.sort(key=lambda c: int(c.split()[1]))
print(f"Age bands found: {len(band_cols)}")
print(f" First: {band_cols[0]}")
print(f" Last: {band_cols[-1]}")
# Compute median age per LSOA
rows = pivoted.select("GEOGRAPHY_CODE", *band_cols).to_dicts()
medians = []
for row in rows:
counts = [row[col] for col in band_cols]
median = compute_median_age(counts)
medians.append({"lsoa21": row["GEOGRAPHY_CODE"], "median_age": round(median, 1)})
result = pl.DataFrame(medians).with_columns(
pl.col("median_age").cast(pl.Float32),
)
print(f"England LSOAs: {result.height}")
print(f"Median age range: {result['median_age'].min()} - {result['median_age'].max()}")
print(f"Mean of medians: {result['median_age'].mean():.1f}")
output_path.parent.mkdir(parents=True, exist_ok=True)
result.write_parquet(output_path, compression="zstd")
print(f"Saved to {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Download Census 2021 median age by LSOA"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
download_and_convert(args.output)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,127 @@
"""Download OS Open Greenspace and extract site centroids.
Downloads the OS Open Greenspace dataset as ESRI Shapefile, computes
WGS84 centroids for each greenspace site polygon, and outputs a parquet
with lat/lng/category columns compatible with the POI proximity pipeline.
Source: https://osdatahub.os.uk/downloads/open/OpenGreenspace
License: Open Government Licence v3.0
"""
import argparse
import tempfile
from pathlib import Path
import numpy as np
import polars as pl
import shapefile as shp
from pyproj import Transformer
from shapely.geometry import shape as to_shapely
from pipeline.utils.download import download, extract_zip
URL = "https://api.os.uk/downloads/v1/products/OpenGreenspace/downloads?area=GB&format=ESRI%C2%AE+Shapefile&redirect"
_to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)
def download_greenspace(output: Path) -> None:
output.parent.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory() as cache_dir:
zip_path = Path(cache_dir) / "greenspace.zip"
extract_dir = Path(cache_dir) / "extracted"
download(URL, zip_path, timeout=300)
extract_zip(zip_path, extract_dir)
# Find the GreenspaceSite shapefile (not the AccessPoint one)
shp_files = list(extract_dir.rglob("*GreenspaceSite*.shp"))
if not shp_files:
shp_files = [
f
for f in extract_dir.rglob("*.shp")
if "AccessPoint" not in f.name
]
if not shp_files:
raise FileNotFoundError(
"No GreenspaceSite shapefile found in download"
)
print(f"Reading {shp_files[0].name}...")
reader = shp.Reader(str(shp_files[0]), encoding="latin-1")
# Find the "function" field (greenspace type)
field_names = [f[0] for f in reader.fields[1:]] # skip deletion flag
func_field = None
for name in field_names:
if "funct" in name.lower():
func_field = name
break
if func_field is None:
raise ValueError(
f"No 'function' field found. Available: {field_names}"
)
func_idx = field_names.index(func_field)
# Find a name field if available
name_idx = None
for name in field_names:
if "distname" in name.lower():
name_idx = field_names.index(name)
break
lats = []
lngs = []
categories = []
names = []
for sr in reader.shapeRecords():
func = sr.record[func_idx]
site_name = sr.record[name_idx] if name_idx is not None else ""
try:
geom = to_shapely(sr.shape.__geo_interface__)
if geom.is_empty or not geom.is_valid:
continue
centroid = geom.centroid
lng, lat = _to_wgs84.transform(centroid.x, centroid.y)
except Exception:
continue
lats.append(lat)
lngs.append(lng)
categories.append(func)
names.append(site_name or "")
df = pl.DataFrame(
{
"lat": np.array(lats, dtype=np.float64),
"lng": np.array(lngs, dtype=np.float64),
"category": categories,
"name": names,
}
)
df.write_parquet(output)
size_mb = output.stat().st_size / (1024 * 1024)
print(f"Wrote {output} ({size_mb:.1f} MB, {len(df):,} greenspace sites)")
counts = df.group_by("category").len().sort("len", descending=True)
for row in counts.iter_rows(named=True):
print(f" {row['category']}: {row['len']:,}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Download OS Open Greenspace site centroids"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
download_greenspace(args.output)
if __name__ == "__main__":
main()

View file

@ -13,7 +13,6 @@ from pipeline.utils.poi_counts import count_pois_per_postcode, min_distance_per_
POI_GROUPS_2KM = {
"restaurants": ["Restaurant", "Fast Food"],
"groceries": ["Greengrocer", "Supermarket", "Convenience Store"],
"parks": ["Park"],
}
# Train/tube stations counted at 1km radius
@ -21,11 +20,18 @@ TRAIN_TUBE_GROUP = {
"train_tube": ["Metro or Tram stop", "Rail station"],
}
# Groups for which to compute distance to nearest POI
# Groups for which to compute distance to nearest POI (from filtered POIs)
DISTANCE_GROUPS = {
"train_tube": ["Metro or Tram stop", "Rail station"],
}
# OS Open Greenspace function types used for park counts and distance calculation.
# Uses the authoritative OS dataset instead of OSM point POIs for better coverage
# of green spaces that are only mapped as polygons in OSM.
GREENSPACE_PARK_FUNCTIONS = {
"parks": ["Public Park Or Garden", "Playing Field", "Play Space"],
}
def main():
parser = argparse.ArgumentParser(
@ -37,6 +43,12 @@ def main():
parser.add_argument(
"--pois", type=Path, required=True, help="Filtered POIs parquet"
)
parser.add_argument(
"--greenspace",
type=Path,
required=True,
help="OS Open Greenspace centroids parquet",
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet path"
)
@ -60,11 +72,25 @@ def main():
postcodes, pois, groups=TRAIN_TUBE_GROUP, radius_km=1
)
# Distance to nearest train/tube station
# Distance to nearest train/tube station (from filtered POIs)
distances = min_distance_per_postcode(postcodes, pois, groups=DISTANCE_GROUPS)
# Park counts and distances from OS Open Greenspace
greenspace = pl.read_parquet(args.greenspace)
park_counts_2km = count_pois_per_postcode(
postcodes, greenspace, groups=GREENSPACE_PARK_FUNCTIONS, radius_km=2
)
park_distances = min_distance_per_postcode(
postcodes, greenspace, groups=GREENSPACE_PARK_FUNCTIONS
)
# Join all results on postcode
result = counts_2km.join(counts_1km, on="postcode").join(distances, on="postcode")
result = (
counts_2km.join(counts_1km, on="postcode")
.join(distances, on="postcode")
.join(park_counts_2km, on="postcode")
.join(park_distances, on="postcode")
)
result.write_parquet(args.output)
size_mb = args.output.stat().st_size / (1024 * 1024)