diff --git a/Makefile.data b/Makefile.data index bfcc286..660b9e1 100644 --- a/Makefile.data +++ b/Makefile.data @@ -29,10 +29,11 @@ PROPERTIES_PQ := $(DATA_DIR)/properties.parquet MERGE_STAMP := $(DATA_DIR)/.merge_done PRICE_INDEX := $(DATA_DIR)/price_index.parquet PRICES_STAMP := $(DATA_DIR)/.prices_done -EPC := $(MANUAL_DATA)/certificates.csv +EPC := $(MANUAL_DATA)/domestic-csv.zip ETHNICITY := $(DATA_DIR)/ethnicity_by_la.parquet CRIME_DIR := $(MANUAL_DATA)/crime CRIME := $(DATA_DIR)/crime_by_lsoa.parquet +CRIME_STAMP := $(CRIME_DIR)/.downloaded NOISE := $(DATA_DIR)/road_noise.parquet OFSTED := $(DATA_DIR)/ofsted.parquet NAPTAN := $(DATA_DIR)/naptan.parquet @@ -65,7 +66,7 @@ PMTILES_VERSION := 1.22.3 .PHONY: prepare merge tiles \ download-arcgis download-price-paid download-deprivation download-ethnicity \ download-naptan download-pois download-grocery-retail-points download-ofsted download-broadband download-rental-prices \ - download-postcodes download-noise download-inspire \ + download-postcodes download-noise download-inspire download-crime \ download-oa-boundaries download-uprn-lookup download-transit-network download-greenspace download-os-greenspace download-pbf download-places download-lsoa-population download-median-age download-england-boundary download-rightmove-outcodes \ transform-pois transform-epc-pp transform-crime transform-poi-proximity \ transform-school-proximity transform-postcode-boundaries \ @@ -78,6 +79,7 @@ download-arcgis: $(ARCGIS) download-price-paid: $(PRICE_PAID) download-deprivation: $(IOD) download-ethnicity: $(ETHNICITY) +download-crime: $(CRIME_STAMP) download-naptan: $(NAPTAN) download-pois: $(POIS_RAW) download-grocery-retail-points: $(GROCERY_RETAIL_POINTS) @@ -121,10 +123,10 @@ $(TILES): $(EPC): @echo "" @echo "=== EPC dataset not found ===" - @echo "The EPC certificates file is required: $@" + @echo "The EPC certificates archive is required: $@" @echo "" - @echo "To obtain it, register at https://epc.opendatacommunities.org/login" - @echo "and place certificates.csv in manual-data/" + @echo "To obtain it, register at https://get-energy-performance-data.communities.gov.uk/filter-properties?property_type=domestic" + @echo "and place domestic-csv.zip in manual-data/" @echo "" @exit 1 @@ -140,6 +142,10 @@ $(IOD): $(ETHNICITY): uv run python -m pipeline.download.ethnicity --output $@ +$(CRIME_STAMP): + uv run python -m pipeline.download.crime --output $(CRIME_DIR) + @touch $@ + $(NAPTAN): uv run python -m pipeline.download.naptan --output $@ @@ -216,15 +222,7 @@ $(POIS_FILTERED): $(POIS_RAW) $(NAPTAN) $(GROCERY_RETAIL_POINTS) $(ENGLAND_BOUND $(EPC_PP): $(PRICE_PAID) $(EPC) uv run python -m pipeline.transform.join_epc_pp --epc $(EPC) --price-paid $(PRICE_PAID) --output $@ -$(CRIME): - @if [ ! -d "$(CRIME_DIR)" ]; then \ - echo ""; \ - echo "=== Crime dataset not found ==="; \ - echo "Place police.uk crime CSVs in $(CRIME_DIR)/"; \ - echo "Download from https://data.police.uk/data/"; \ - echo ""; \ - exit 1; \ - fi +$(CRIME): $(CRIME_STAMP) uv run python -m pipeline.transform.crime --input $(CRIME_DIR) --output $@ $(POI_PROXIMITY): $(ARCGIS) $(POIS_FILTERED) $(OS_GREENSPACE) diff --git a/manual-data/fixed_broadband_coverage.zip b/manual-data/fixed_broadband_coverage.zip deleted file mode 100644 index b42cf58..0000000 Binary files a/manual-data/fixed_broadband_coverage.zip and /dev/null differ diff --git a/manual-data/journey_times_bank.parquet b/manual-data/journey_times_bank.parquet deleted file mode 100644 index fcdf3b6..0000000 Binary files a/manual-data/journey_times_bank.parquet and /dev/null differ diff --git a/manual-data/journey_times_fitzrovia.parquet b/manual-data/journey_times_fitzrovia.parquet deleted file mode 100644 index 825614f..0000000 Binary files a/manual-data/journey_times_fitzrovia.parquet and /dev/null differ diff --git a/pipeline/download/crime.py b/pipeline/download/crime.py new file mode 100644 index 0000000..adab8e5 --- /dev/null +++ b/pipeline/download/crime.py @@ -0,0 +1,393 @@ +"""Download police.uk crime archive ZIPs. + +The archive page lists rolling monthly snapshots. Newer snapshots overlap older +ones, so extraction keeps files already written by newer archives. +""" + +from __future__ import annotations + +import argparse +import hashlib +import html +import json +import re +import shutil +import sys +import zipfile +from dataclasses import asdict, dataclass +from datetime import UTC, datetime +from pathlib import Path, PurePosixPath +from urllib.parse import urljoin + +import httpx +from tqdm import tqdm + +ARCHIVE_URL = "https://data.police.uk/data/archive/" +ARCHIVE_LINK_RE = re.compile( + r'
\s*.*?' + r'' + r"(?P\s*\((?P[^)]+)\)\s*" + r'

\s*(?P.*?)\s*

\s*' + r'

(?P.*?)

', + re.DOTALL, +) +VALID_MD5_RE = re.compile(r"^[0-9a-fA-F]{32}$") +MONTH_RE = re.compile(r"^\d{4}-\d{2}$") + + +@dataclass(frozen=True) +class CrimeArchive: + month: str + label: str + url: str + filename: str + size: str + contained_range: str + md5: str | None + raw_md5: str + + +def _clean_text(value: str) -> str: + text = re.sub(r"<[^>]+>", " ", value) + return re.sub(r"\s+", " ", html.unescape(text)).strip() + + +def parse_archives(page_html: str, base_url: str = ARCHIVE_URL) -> list[CrimeArchive]: + """Parse monthly crime archive links from the police.uk archive page.""" + archives: list[CrimeArchive] = [] + for match in ARCHIVE_LINK_RE.finditer(page_html): + raw_md5 = _clean_text(match.group("md5")).lower() + md5 = raw_md5 if VALID_MD5_RE.fullmatch(raw_md5) else None + href = html.unescape(match.group("href")) + archives.append( + CrimeArchive( + month=match.group("month"), + label=_clean_text(match.group("label")), + url=urljoin(base_url, href), + filename=Path(href).name, + size=_clean_text(match.group("size")), + contained_range=_clean_text(match.group("contained_range")), + md5=md5, + raw_md5=raw_md5, + ) + ) + + return archives + + +def fetch_archives(archive_url: str = ARCHIVE_URL) -> list[CrimeArchive]: + """Fetch and parse the archive index.""" + with httpx.Client( + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=60.0), + headers={"User-Agent": "perfect-postcode-data-pipeline/1.0"}, + ) as client: + response = client.get(archive_url) + response.raise_for_status() + + archives = parse_archives(response.text, archive_url) + if not archives: + raise RuntimeError(f"No monthly archive ZIPs found at {archive_url}") + return archives + + +def filter_archives( + archives: list[CrimeArchive], + *, + from_month: str | None = None, + to_month: str | None = None, + limit: int | None = None, +) -> list[CrimeArchive]: + """Filter archives by inclusive YYYY-MM bounds while preserving page order.""" + filtered = [ + archive + for archive in archives + if (from_month is None or archive.month >= from_month) + and (to_month is None or archive.month <= to_month) + ] + if limit is not None: + filtered = filtered[:limit] + return filtered + + +def file_md5(path: Path) -> str: + digest = hashlib.md5() + with path.open("rb") as file: + for chunk in iter(lambda: file.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def download_archive( + archive: CrimeArchive, + archive_dir: Path, + *, + verify: bool, + force: bool, + timeout: float, +) -> Path: + """Download one archive ZIP, resuming an existing .part file when possible.""" + dest = archive_dir / archive.filename + partial = dest.with_suffix(dest.suffix + ".part") + + if force: + dest.unlink(missing_ok=True) + partial.unlink(missing_ok=True) + + if dest.exists(): + if verify and archive.md5 is not None: + actual_md5 = file_md5(dest) + if actual_md5 == archive.md5: + print(f"{archive.filename}: already downloaded") + return dest + print( + f"{archive.filename}: checksum mismatch, downloading again", + file=sys.stderr, + ) + dest.unlink() + partial.unlink(missing_ok=True) + else: + print(f"{archive.filename}: already downloaded") + return dest + + resume_from = partial.stat().st_size if partial.exists() else 0 + headers = {"Range": f"bytes={resume_from}-"} if resume_from else {} + + with httpx.stream( + "GET", + archive.url, + headers=headers, + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=timeout), + ) as response: + if response.status_code == 206 and resume_from: + mode = "ab" + initial = resume_from + else: + response.raise_for_status() + mode = "wb" + initial = 0 + + total_header = int(response.headers.get("content-length", 0)) + total = initial + total_header if total_header else None + with ( + partial.open(mode) as output, + tqdm( + total=total, + initial=initial, + unit="B", + unit_scale=True, + unit_divisor=1024, + desc=archive.filename, + ) as progress, + ): + for chunk in response.iter_bytes(chunk_size=1024 * 1024): + output.write(chunk) + progress.update(len(chunk)) + + partial.replace(dest) + + if verify and archive.md5 is not None: + actual_md5 = file_md5(dest) + if actual_md5 != archive.md5: + dest.unlink(missing_ok=True) + raise RuntimeError( + f"{archive.filename}: MD5 mismatch: expected {archive.md5}, got {actual_md5}" + ) + + return dest + + +def _safe_csv_members( + archive: zipfile.ZipFile, +) -> list[tuple[zipfile.ZipInfo, PurePosixPath]]: + members: list[tuple[zipfile.ZipInfo, PurePosixPath]] = [] + for info in archive.infolist(): + rel_path = PurePosixPath(info.filename) + if ( + info.is_dir() + or rel_path.is_absolute() + or ".." in rel_path.parts + or rel_path.suffix.lower() != ".csv" + ): + continue + members.append((info, rel_path)) + return members + + +def extract_csvs( + zip_path: Path, + output_dir: Path, + *, + overwrite: bool = False, +) -> tuple[int, int]: + """Extract CSVs from one ZIP. Returns (extracted, skipped).""" + extracted = 0 + skipped = 0 + + with zipfile.ZipFile(zip_path) as archive: + for info, rel_path in _safe_csv_members(archive): + dest = output_dir.joinpath(*rel_path.parts) + if dest.exists() and not overwrite: + skipped += 1 + continue + + dest.parent.mkdir(parents=True, exist_ok=True) + with archive.open(info) as source, dest.open("wb") as target: + shutil.copyfileobj(source, target) + extracted += 1 + + return extracted, skipped + + +def write_manifest( + output_dir: Path, archive_url: str, archives: list[CrimeArchive] +) -> None: + manifest = { + "source": archive_url, + "fetched_at": datetime.now(UTC).isoformat(), + "archives": [asdict(archive) for archive in archives], + } + path = output_dir / "archive_manifest.json" + path.write_text(json.dumps(manifest, indent=2) + "\n") + + +def _month_arg(value: str) -> str: + if not MONTH_RE.fullmatch(value): + raise argparse.ArgumentTypeError("month must be in YYYY-MM format") + return value + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download all monthly police.uk crime archive ZIPs" + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Directory for extracted CSVs; ZIPs are kept under _archives/", + ) + parser.add_argument( + "--archive-url", + default=ARCHIVE_URL, + help=f"Archive index URL (default: {ARCHIVE_URL})", + ) + parser.add_argument( + "--from-month", + type=_month_arg, + help="Only download archives from this YYYY-MM onwards", + ) + parser.add_argument( + "--to-month", + type=_month_arg, + help="Only download archives up to this YYYY-MM", + ) + parser.add_argument( + "--limit", + type=int, + help="Download at most this many archives after filtering", + ) + parser.add_argument( + "--list", + action="store_true", + help="Print the archive URLs that would be downloaded and exit", + ) + parser.add_argument( + "--no-extract", + dest="extract", + action="store_false", + help="Download ZIPs only; do not extract CSVs", + ) + parser.add_argument( + "--overwrite-extracted", + action="store_true", + help="Overwrite CSVs when extracting overlapping archive snapshots", + ) + parser.add_argument( + "--no-verify", + dest="verify", + action="store_false", + help="Skip MD5 verification", + ) + parser.add_argument( + "--force", + action="store_true", + help="Redownload archives even if ZIP files already exist", + ) + parser.add_argument( + "--timeout", + type=float, + default=600.0, + help="Per-read timeout in seconds for large ZIP downloads", + ) + args = parser.parse_args() + + print("Fetching police.uk archive index...") + archives = filter_archives( + fetch_archives(args.archive_url), + from_month=args.from_month, + to_month=args.to_month, + limit=args.limit, + ) + if not archives: + raise SystemExit("No archives matched the requested filters") + + bad_md5 = [ + archive.filename for archive in archives if archive.raw_md5 and not archive.md5 + ] + if bad_md5: + print( + "Warning: ignoring malformed MD5 values for " + + ", ".join(bad_md5[:5]) + + ("..." if len(bad_md5) > 5 else ""), + file=sys.stderr, + ) + + print(f"Found {len(archives)} monthly archive ZIPs") + if args.list: + for archive in archives: + print(f"{archive.month}\t{archive.url}\t{archive.raw_md5}") + return + + args.output.mkdir(parents=True, exist_ok=True) + archive_dir = args.output / "_archives" + archive_dir.mkdir(parents=True, exist_ok=True) + write_manifest(args.output, args.archive_url, archives) + + total_extracted = 0 + total_skipped = 0 + for index, archive in enumerate(archives, start=1): + print(f"[{index}/{len(archives)}] {archive.label} ({archive.size})") + zip_path = download_archive( + archive, + archive_dir, + verify=args.verify, + force=args.force, + timeout=args.timeout, + ) + if args.extract: + extracted, skipped = extract_csvs( + zip_path, + args.output, + overwrite=args.overwrite_extracted, + ) + total_extracted += extracted + total_skipped += skipped + print( + f"{archive.filename}: extracted {extracted} CSVs" + + (f", skipped {skipped} existing CSVs" if skipped else "") + ) + + if args.extract: + print( + f"Done. ZIPs saved in {archive_dir}; extracted {total_extracted} CSVs" + + (f" and skipped {total_skipped} existing CSVs" if total_skipped else "") + + "." + ) + else: + print(f"Done. ZIPs saved in {archive_dir}.") + + +if __name__ == "__main__": + main() diff --git a/pipeline/download/map_assets.py b/pipeline/download/map_assets.py index 1de7d84..9456177 100644 --- a/pipeline/download/map_assets.py +++ b/pipeline/download/map_assets.py @@ -1,9 +1,15 @@ import argparse +import base64 +import json +import re import sys import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed +from io import BytesIO from pathlib import Path +from PIL import Image, ImageDraw + from pipeline.transform.transform_poi import NAPTAN_EMOJIS, _CATEGORIES GLYPHS_BASE = "https://protomaps.github.io/basemaps-assets/fonts" @@ -14,53 +20,80 @@ POI_ICON_BASE = "https://geolytix.github.io/MapIcons" # Font stacks used by @protomaps/basemaps with lang='en' FONT_STACKS = ["Noto Sans Regular", "Noto Sans Italic", "Noto Sans Medium"] -# Fallback emoji not in any category -_FALLBACK_EMOJIS = ["📍"] - POI_ICON_PATHS = [ - "asda/asda_express_24px.svg", - "asda/asda_green_basket_24px.svg", - "asda/asda_green_trolley_24px.svg", - "asda/asda_living_24px.svg", - "asda/asda_pfs_24px.svg", - "asda/asda_primary.svg", - "asda/asda_superstore_green_trolley_24px.svg", - "brands/aldi_24px.svg", - "brands/amazon_fresh_alt_24px.svg", - "brands/booths_24px.svg", - "brands/budgens_24px.svg", - "brands/centra_24px.svg", - "brands/cook.svg", - "brands/coop_24px.svg", - "brands/costco_24px.svg", - "brands/dunnes_stores_24px.svg", - "brands/farmfoods_updated_24px.svg", - "brands/heron_24px.svg", - "brands/iceland_24px.svg", - "brands/iceland_food_warehouse_24px.svg", - "brands/lidl_24px.svg", - "brands/little_waitrose_24px.svg", - "brands/makro_24px.svg", - "brands/mns_24px.svg", - "brands/mns_food_24px.svg", - "brands/mns_high_street_24px.svg", - "brands/mns_hospital_24px.svg", - "brands/mns_moto_24px.svg", - "brands/mns_outlet_24px.svg", - "brands/morrisons_24px.svg", - "brands/morrisons_daily_24px.svg", - "brands/sainsburys_24px.svg", - "brands/sainsburys_local_24px.svg", - "brands/spar_24px.svg", - "brands/tesco_24px.svg", - "brands/tesco_express_24px.svg", - "brands/tesco_extra_24px.svg", - "brands/waitrose_24px.svg", - "brands/wholefoods_24px.svg", - "logos/planet_organic_24px.svg", + "brands_2023/supermarkets/farmfoods.svg", + "brands_2023/supermarkets/heron_foods.svg", + "brands_2023/supermarkets/little_waitrose.svg", + "brands_2024/amazon_fresh.svg", + "brands_2024/booths.svg", + "brands_2024/budgens.svg", + "brands_2024/cook.svg", + "brands_2024/dunnes_stores.svg", + "brands_2024/iceland.svg", + "brands_2024/makro.svg", + "brands_2024/mns.svg", + "brands_2024/morrisons_daily.svg", + "brands_2024/sainsburys_local.svg", + "brands_2024/wholefoods.svg", + "logos/aldi.svg", + "logos/asda.svg", + "logos/centra.svg", + "logos/coop.svg", + "logos/lidl.svg", + "logos/morrisons.svg", + "logos/planet_organic.svg", + "logos/sainsburys.svg", + "logos/spar.svg", + "logos/tesco.svg", + "logos/tesco_express.svg", + "logos/tesco_extra.svg", + "logos/waitrose.svg", "public_transport/london_tube.svg", + "visuals/mns.svg", ] +DERIVED_POI_ICON_PATHS = [ + ("costco_logo", "brands/costco.svg", "logos/costco.svg"), + ( + "embedded_png", + "brands/iceland_food_warehouse_24px.svg", + "logos/the_food_warehouse.png", + ), +] + +POI_ICON_SVG_CROPS = { + "brands_2023/supermarkets/farmfoods.svg": (1.293, 7.314, 15.48, 3.293), + "brands_2023/supermarkets/heron_foods.svg": (0.062, 6.68, 17.995, 5.325), + "brands_2023/supermarkets/little_waitrose.svg": (0.916, 5.645, 16.365, 6.719), + "brands_2024/amazon_fresh.svg": (3.817, 1.646, 16.367, 16.358), + "brands_2024/booths.svg": (1.456, 7.143, 15.313, 3.512), + "brands_2024/budgens.svg": (2.251, 2.278, 13.6, 13.612), + "brands_2024/cook.svg": (5.028, 5.493, 13.945, 9.648), + "brands_2024/dunnes_stores.svg": (4.375, 7.732, 15.249, 5.055), + "brands_2024/iceland.svg": (1.136, 6.823, 16.067, 4.302), + "brands_2024/makro.svg": (4.411, 6.098, 16.397, 5.428), + "brands_2024/mns.svg": (4.042, 6.986, 16.171, 6.724), + "brands_2024/morrisons_daily.svg": (3.341, 4.414, 17.317, 8.248), + "brands_2024/sainsburys_local.svg": (4.58, 1.61, 14.84, 14.849), + "brands_2024/wholefoods.svg": (4.17, 2.193, 15.659, 15.668), + "logos/aldi.svg": (4.813, 2.563, 14.374, 14.383), + "logos/asda.svg": (3.91, 7.135, 16.181, 5.442), + "logos/centra.svg": (3.36, 7.35, 17.28, 4.651), + "logos/coop.svg": (6.407, 4.658, 11.187, 11.793), + "logos/costco.svg": (70.61, 144.908, 256.67, 85.825), + "logos/lidl.svg": (4.938, 2.973, 13.985, 13.985), + "logos/morrisons.svg": (5.231, 2.985, 13.538, 13.398), + "logos/planet_organic.svg": (5.528, 3.564, 12.943, 12.943), + "logos/sainsburys.svg": (7.502, 3.572, 8.996, 12.646), + "logos/spar.svg": (4.933, 2.968, 14.133, 13.853), + "logos/tesco.svg": (4.338, 6.865, 15.324, 5.359), + "logos/tesco_express.svg": (5.231, 5.933, 13.538, 8.345), + "logos/tesco_extra.svg": (4.933, 5.775, 14.133, 8.519), + "logos/waitrose.svg": (5.528, 6.09, 12.943, 9.855), +} + +POI_ICON_SVG_INTRINSIC_MAX = 512 + def collect_twemoji_codes() -> list[str]: """Derive twemoji hex codes from transform_poi categories. @@ -76,9 +109,6 @@ def collect_twemoji_codes() -> list[str]: for emoji in NAPTAN_EMOJIS.values(): emojis.add(emoji) - for emoji in _FALLBACK_EMOJIS: - emojis.add(emoji) - # First codepoint hex, matching frontend logic return sorted({f"{ord(e[0]):x}" for e in emojis}) @@ -97,6 +127,214 @@ def download_file(url: str, dest: Path) -> tuple[bool, str]: return False, url +def download_text(url: str) -> str: + with urllib.request.urlopen(url) as response: + return response.read().decode("utf-8") + + +def build_costco_logo(marker_svg: str) -> str: + start = marker_svg.find('") + if start < 0 or end < 0: + raise ValueError("Costco marker SVG layout changed") + + logo_group = marker_svg[start : end + 4] + return ( + '\n' + '\n' + f"{logo_group}\n" + "\n" + ) + + +def trim_white_png(png_bytes: bytes) -> bytes: + image = Image.open(BytesIO(png_bytes)).convert("RGBA") + pixels = image.load() + + for y in range(image.height): + for x in range(image.width): + red, green, blue, alpha = pixels[x, y] + if red > 245 and green > 245 and blue > 245: + pixels[x, y] = (red, green, blue, 0) + + alpha_box = image.getchannel("A").getbbox() + if alpha_box: + image = image.crop(alpha_box) + + out = BytesIO() + image.save(out, format="PNG") + return out.getvalue() + + +def extract_embedded_png(marker_svg: str) -> bytes: + match = re.search(r"base64,([^\"']+)", marker_svg) + if not match: + raise ValueError("POI marker SVG did not contain an embedded PNG") + return trim_white_png(base64.b64decode(match.group(1))) + + +def svg_intrinsic_size(width: float, height: float) -> tuple[int, int]: + if width <= 0 or height <= 0: + return (POI_ICON_SVG_INTRINSIC_MAX, POI_ICON_SVG_INTRINSIC_MAX) + if width >= height: + return ( + POI_ICON_SVG_INTRINSIC_MAX, + max(1, round(POI_ICON_SVG_INTRINSIC_MAX * height / width)), + ) + return ( + max(1, round(POI_ICON_SVG_INTRINSIC_MAX * width / height)), + POI_ICON_SVG_INTRINSIC_MAX, + ) + + +def set_svg_geometry(svg_text: str, crop: tuple[float, float, float, float]) -> str: + x, y, width, height = crop + view_box = f"{x:g} {y:g} {width:g} {height:g}" + intrinsic_width, intrinsic_height = svg_intrinsic_size(width, height) + + svg_text = re.sub(r'viewBox="[^"]+"', f'viewBox="{view_box}"', svg_text, count=1) + if 'viewBox="' not in svg_text: + svg_text = re.sub(r" tuple[float, float, float, float] | None: + match = re.search(r'viewBox="([^"]+)"', svg_text) + if not match: + return None + parts = [ + float(part) for part in re.split(r"[\s,]+", match.group(1).strip()) if part + ] + if len(parts) != 4: + return None + return (parts[0], parts[1], parts[2], parts[3]) + + +def crop_poi_svg_icons(poi_icons_dir: Path) -> None: + for icon_path, crop in POI_ICON_SVG_CROPS.items(): + dest = poi_icons_dir / icon_path + if not dest.exists(): + continue + svg_text = dest.read_text(encoding="utf-8") + if icon_path == "brands_2024/dunnes_stores.svg": + svg_text = svg_text.replace('fill="#fffcfc"', 'fill="#111111"') + svg_text = svg_text.replace('fill="#fcfcfc"', 'fill="#111111"') + dest.write_text(set_svg_geometry(svg_text, crop), encoding="utf-8") + + for dest in poi_icons_dir.rglob("*.svg"): + svg_text = dest.read_text(encoding="utf-8") + view_box = get_svg_view_box(svg_text) + if view_box: + dest.write_text(set_svg_geometry(svg_text, view_box), encoding="utf-8") + + +def download_derived_poi_icon( + kind: str, source_path: str, dest: Path +) -> tuple[bool, str]: + url = f"{POI_ICON_BASE}/{source_path}" + dest.parent.mkdir(parents=True, exist_ok=True) + + try: + source = download_text(url) + if kind == "costco_logo": + dest.write_text(build_costco_logo(source), encoding="utf-8") + elif kind == "embedded_png": + dest.write_bytes(extract_embedded_png(source)) + else: + raise ValueError(f"Unknown derived POI icon kind: {kind}") + return True, url + except urllib.error.HTTPError as e: + print(f" {e.code} {url}", file=sys.stderr) + return False, url + except Exception as e: + print(f" ERROR {url}: {e}", file=sys.stderr) + return False, url + + +# Slategray accent used by civic POI icons (school, library, building, …) in +# protomaps' v4 sprite. We match it so the townhall blends in with its peers. +_TOWNHALL_COLOR = { + "light": (135, 128, 171), + "dark": (118, 118, 127), +} +_TOWNHALL_LOGICAL_SIZE = 17 + + +def _render_townhall_glyph(size_px: int, color: tuple[int, int, int]) -> Image.Image: + # Draw at 8× resolution and downsample with Lanczos so the pediment's + # diagonals come out anti-aliased; PIL's polygon fill is otherwise aliased. + super_factor = 8 + canvas = size_px * super_factor + img = Image.new("RGBA", (canvas, canvas), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + fill = (*color, 255) + + def s(v: float) -> float: + return v * canvas / _TOWNHALL_LOGICAL_SIZE + + draw.polygon([(s(8.5), s(1)), (s(15), s(6.5)), (s(2), s(6.5))], fill=fill) + draw.rectangle([(s(1), s(6.5)), (s(16), s(8.5))], fill=fill) + for column_x in (3, 8, 13): + draw.rectangle([(s(column_x), s(8.5)), (s(column_x + 1.5), s(14))], fill=fill) + draw.rectangle([(s(0), s(14)), (s(17), s(15.5))], fill=fill) + + return img.resize((size_px, size_px), Image.LANCZOS) + + +def inject_townhall_sprite(sprites_dir: Path) -> None: + """Append a townhall glyph to each downloaded sprite sheet. + + Protomaps' v4 sprite omits `townhall` even though the basemap style + references it; we add the icon here so MapLibre can resolve the name + natively at runtime. + """ + for theme in ("light", "dark"): + color = _TOWNHALL_COLOR[theme] + for suffix, scale in (("", 1), ("@2x", 2)): + json_path = sprites_dir / f"{theme}{suffix}.json" + png_path = sprites_dir / f"{theme}{suffix}.png" + if not json_path.exists() or not png_path.exists(): + continue + + manifest = json.loads(json_path.read_text()) + sheet = Image.open(png_path).convert("RGBA") + + glyph_size = _TOWNHALL_LOGICAL_SIZE * scale + glyph = _render_townhall_glyph(glyph_size, color) + + new_width = max(sheet.width, glyph_size) + new_height = sheet.height + glyph_size + extended = Image.new("RGBA", (new_width, new_height), (0, 0, 0, 0)) + extended.paste(sheet, (0, 0)) + extended.paste(glyph, (0, sheet.height)) + extended.save(png_path, optimize=True) + + manifest["townhall"] = { + "x": 0, + "y": sheet.height, + "width": glyph_size, + "height": glyph_size, + "pixelRatio": scale, + } + json_path.write_text(json.dumps(manifest)) + + def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( @@ -147,7 +385,7 @@ def main(): # Skip already-downloaded files remaining = [(url, dest) for url, dest in tasks] - print(f"Downloading {len(remaining)} assets") + print(f"Downloading {len(remaining) + len(DERIVED_POI_ICON_PATHS)} assets") ok = 0 fail = 0 @@ -162,6 +400,18 @@ def main(): else: fail += 1 + for kind, source_path, dest_path in DERIVED_POI_ICON_PATHS: + success, _url = download_derived_poi_icon( + kind, source_path, poi_icons_dir / dest_path + ) + if success: + ok += 1 + else: + fail += 1 + + crop_poi_svg_icons(poi_icons_dir) + inject_townhall_sprite(sprites_dir) + print(f"Done: {ok} downloaded, {fail} failed") diff --git a/pipeline/download/noise.py b/pipeline/download/noise.py index 0433f14..9a947ec 100644 --- a/pipeline/download/noise.py +++ b/pipeline/download/noise.py @@ -18,6 +18,7 @@ endpoint is broken for that coverage). import argparse import tempfile +import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -29,8 +30,10 @@ from pyproj import Transformer from rasterio.merge import merge from rasterio.transform import rowcol -# Noise sources: (label, column_name, WCS base URL, coverage ID, WCS version) -# Road/rail work with WCS 1.0.0; airport requires WCS 2.0.1. +# Noise sources: +# (label, column_name, WCS base URL, coverage ID, WCS version, allow_missing_tiles) +# Road/rail work with WCS 1.0.0; airport requires WCS 2.0.1 and returns 500 +# for many sparse/no-coverage tiles, which should become nulls. NOISE_SOURCES = [ ( "Road", @@ -38,6 +41,7 @@ NOISE_SOURCES = [ "https://environment.data.gov.uk/spatialdata/road-noise-all-metrics-england-round-4/wcs", "Road_Noise_Lden_England_Round_4_All", "1.0.0", + False, ), ( "Rail", @@ -45,6 +49,7 @@ NOISE_SOURCES = [ "https://environment.data.gov.uk/spatialdata/noise-data/wcs", "Rail_Noise_Lden_England_Round_4_All", "1.0.0", + False, ), ( "Airport", @@ -52,6 +57,7 @@ NOISE_SOURCES = [ "https://environment.data.gov.uk/spatialdata/airport-noise-all-metrics-england-round-4/wcs", "dac9cba4-abe7-43bd-b8e9-8a83da52edd8__Airport_Noise_ALL_Lden", "2.0.1", + True, ), ] @@ -74,6 +80,14 @@ NATIVE_RESOLUTION = 10 # and keeps download size ~100x smaller than native 10m) RESOLUTION = 100 +# Retry/split behaviour for slow Defra WCS requests. Some 100km eastern tiles +# intermittently return 504s; smaller fallback requests usually succeed. +MAX_RETRIES = 3 +RETRY_BACKOFF_SECONDS = 5 +MIN_TILE_SIZE = 25_000 + +type Tile = tuple[int, int, int, int] + def _wcs_get_coverage_url( wcs_base: str, @@ -117,6 +131,53 @@ def _bng_from_latlon(lat: np.ndarray, lon: np.ndarray) -> tuple[np.ndarray, np.n return _TO_BNG.transform(lon, lat) # pyproj takes (x=lon, y=lat) +def _looks_like_tiff(response: httpx.Response) -> bool: + content_type = response.headers.get("content-type", "") + return "tiff" in content_type or response.content[:4] in (b"II*\x00", b"MM\x00*") + + +def _fetch_tile_bytes( + wcs_base: str, + coverage_id: str, + min_e: int, + min_n: int, + max_e: int, + max_n: int, + wcs_version: str = "1.0.0", +) -> bytes | None: + """Fetch one WCS tile. Returns None when the server reports no GeoTIFF.""" + url = _wcs_get_coverage_url( + wcs_base, coverage_id, min_e, min_n, max_e, max_n, wcs_version + ) + with httpx.Client(timeout=300, follow_redirects=True) as client: + resp = client.get(url) + resp.raise_for_status() + + if not _looks_like_tiff(resp): + return None + return resp.content + + +def _split_tile(min_e: int, min_n: int, max_e: int, max_n: int) -> list[Tile]: + e_edges = [min_e, max_e] + n_edges = [min_n, max_n] + if max_e - min_e > MIN_TILE_SIZE: + e_edges.insert(1, (min_e + max_e) // 2) + if max_n - min_n > MIN_TILE_SIZE: + n_edges.insert(1, (min_n + max_n) // 2) + + subtiles: list[Tile] = [] + for e0, e1 in zip(e_edges, e_edges[1:]): + for n0, n1 in zip(n_edges, n_edges[1:]): + if e1 > e0 and n1 > n0: + subtiles.append((e0, n0, e1, n1)) + return subtiles + + +def _tile_path(tile_dir: Path, min_e: int, min_n: int, max_e: int, max_n: int) -> Path: + return tile_dir / f"tile_{min_e}_{min_n}_{max_e}_{max_n}.tif" + + def _download_tile( wcs_base: str, coverage_id: str, @@ -124,30 +185,63 @@ def _download_tile( min_n: int, max_e: int, max_n: int, - tile_path: Path, + tile_dir: Path, wcs_version: str = "1.0.0", -) -> Path | None: - """Download a single WCS tile. Returns path if successful, None otherwise.""" - url = _wcs_get_coverage_url( - wcs_base, coverage_id, min_e, min_n, max_e, max_n, wcs_version + split_failures: bool = True, +) -> tuple[list[Path], list[Tile]]: + """Download a WCS tile, splitting on repeated server failures.""" + tile_path = _tile_path(tile_dir, min_e, min_n, max_e, max_n) + if tile_path.exists() and tile_path.stat().st_size > 0: + return [tile_path], [] + + last_error: Exception | None = None + for attempt in range(1, MAX_RETRIES + 1): + try: + content = _fetch_tile_bytes( + wcs_base, coverage_id, min_e, min_n, max_e, max_n, wcs_version + ) + if content is None: + return [], [] + tile_path.write_bytes(content) + return [tile_path], [] + except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e: + last_error = e + if attempt < MAX_RETRIES: + sleep_for = RETRY_BACKOFF_SECONDS * attempt + print( + f" Retrying tile ({min_e},{min_n})-({max_e},{max_n}) " + f"after {type(e).__name__} ({attempt}/{MAX_RETRIES})" + ) + time.sleep(sleep_for) + + subtiles = _split_tile(min_e, min_n, max_e, max_n) if split_failures else [] + if len(subtiles) > 1: + print( + f" Splitting tile ({min_e},{min_n})-({max_e},{max_n}) " + f"into {len(subtiles)} smaller requests after: {last_error}" + ) + paths: list[Path] = [] + failures: list[Tile] = [] + for e0, n0, e1, n1 in subtiles: + child_paths, child_failures = _download_tile( + wcs_base, + coverage_id, + e0, + n0, + e1, + n1, + tile_dir, + wcs_version, + split_failures, + ) + paths.extend(child_paths) + failures.extend(child_failures) + return paths, failures + + print( + f" Failed to download tile ({min_e},{min_n})-({max_e},{max_n}): {last_error}" ) - try: - with httpx.Client(timeout=300, follow_redirects=True) as client: - resp = client.get(url) - resp.raise_for_status() - - content_type = resp.headers.get("content-type", "") - if "tiff" not in content_type and resp.content[:4] not in ( - b"II*\x00", - b"MM\x00*", - ): - return None - - tile_path.write_bytes(resp.content) - return tile_path - except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e: - print(f" Failed to download tile ({min_e},{min_n})-({max_e},{max_n}): {e}") - return None + return [], [(min_e, min_n, max_e, max_n)] def download_raster( @@ -156,6 +250,7 @@ def download_raster( coverage_id: str, label: str, wcs_version: str = "1.0.0", + allow_missing_tiles: bool = False, ) -> list[Path]: """Download noise GeoTIFF raster covering England, returning paths to saved files.""" tiles = [] @@ -168,13 +263,13 @@ def download_raster( print( f"[{label}] Downloading {len(tiles)} tiles at {RESOLUTION}m resolution ({MAX_WORKERS} workers)..." ) - paths = [] + paths: list[Path] = [] + failures: list[Tile] = [] completed = 0 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {} for min_e, min_n, max_e, max_n in tiles: - tile_path = tile_dir / f"tile_{min_e}_{min_n}.tif" fut = executor.submit( _download_tile, wcs_base, @@ -183,23 +278,41 @@ def download_raster( min_n, max_e, max_n, - tile_path, + tile_dir, wcs_version, + not allow_missing_tiles, ) futures[fut] = (min_e, min_n) for fut in as_completed(futures): completed += 1 - result = fut.result() - if result is not None: - paths.append(result) + result_paths, result_failures = fut.result() + paths.extend(result_paths) + failures.extend(result_failures) print( - f"\r [{completed}/{len(tiles)}] Downloaded {len(paths)} valid tiles", + f"\r [{completed}/{len(tiles)}] Downloaded {len(paths)} GeoTIFFs", end="", flush=True, ) - print(f"\n[{label}] Downloaded {len(paths)}/{len(tiles)} tiles") + if failures: + preview = ", ".join( + f"({e0},{n0})-({e1},{n1})" for e0, n0, e1, n1 in failures[:5] + ) + suffix = "..." if len(failures) > 5 else "" + if allow_missing_tiles: + print( + f"\n[{label}] WARNING: skipped {len(failures)} missing/no-data " + f"tile requests: {preview}{suffix}" + ) + print(f"[{label}] Downloaded {len(paths)} GeoTIFFs from {len(tiles)} tiles") + return paths + raise RuntimeError( + f"[{label}] Failed to download {len(failures)} tile requests after " + f"retries and splitting: {preview}{suffix}" + ) + + print(f"\n[{label}] Downloaded {len(paths)} GeoTIFFs from {len(tiles)} tiles") return paths @@ -281,11 +394,23 @@ def main() -> None: result = postcodes.select("postcode") with tempfile.TemporaryDirectory() as tmp: - for label, col_name, wcs_base, coverage_id, wcs_version in NOISE_SOURCES: + for ( + label, + col_name, + wcs_base, + coverage_id, + wcs_version, + allow_missing_tiles, + ) in NOISE_SOURCES: tile_dir = Path(tmp) / label.lower() tile_dir.mkdir() tile_paths = download_raster( - tile_dir, wcs_base, coverage_id, label, wcs_version + tile_dir, + wcs_base, + coverage_id, + label, + wcs_version, + allow_missing_tiles, ) if not tile_paths: diff --git a/pipeline/download/places.py b/pipeline/download/places.py index e2a24b8..369a3b9 100644 --- a/pipeline/download/places.py +++ b/pipeline/download/places.py @@ -6,6 +6,7 @@ Reuses the same england-latest.osm.pbf as pois.py. """ import argparse +import re from pathlib import Path import osmium @@ -44,11 +45,37 @@ _STATION_STRIP = ( " underground station", " railway station", " dlr station", + " station dlr", + " dlr", " overground station", " tram stop", " station", ) +_DLR_CODE_RE = re.compile(r"ZZDL([A-Z0-9]{3})") + + +def _is_dlr_station(tags: dict[str, str]) -> bool: + name = tags.get("name", "").lower() + network = tags.get("network", "").lower() + operator = tags.get("operator", "").lower() + return ( + "docklands" in network + or "dlr" in network + or "docklands" in operator + or "dlr" in operator + or name.endswith(" dlr") + or " dlr " in name + ) + + +def _is_tram_station(tags: dict[str, str]) -> bool: + if _is_dlr_station(tags): + return False + station_tag = tags.get("station", "") + network = tags.get("network", "").lower() + return station_tag == "light_rail" or "tramlink" in network or "tram" in network + def _station_display_name(name: str, tags: dict[str, str]) -> str: """Build a descriptive station name like 'Bank tube station'.""" @@ -78,6 +105,96 @@ def _station_display_name(name: str, tags: dict[str, str]) -> str: return f"{name} {suffix}" +def _station_name_score(name: str) -> tuple[int, int]: + lower = name.lower() + suffix_penalty = int( + lower.endswith( + ( + " underground station", + " tube station", + " dlr station", + " railway station", + " rail station", + " station dlr", + " station", + ) + ) + or lower.endswith(" dlr") + ) + return (suffix_penalty, len(name)) + + +def _naptan_dlr_stations(naptan_path: Path) -> list[dict]: + """Extract station-level DLR destinations from NaPTAN access nodes.""" + df = pl.read_parquet(naptan_path) + required = {"id", "name", "category", "lat", "lng"} + missing = required - set(df.columns) + if missing: + raise ValueError(f"NaPTAN file is missing columns: {sorted(missing)}") + + rows: dict[str, dict] = {} + for row in df.iter_rows(named=True): + atco_id = str(row["id"] or "") + match = _DLR_CODE_RE.search(atco_id) + if not match: + continue + if row["category"] not in {"Tube station", "Rail station"}: + continue + + code = match.group(1) + raw_name = str(row["name"] or "") + if not raw_name: + continue + + lat = float(row["lat"]) + lon = float(row["lng"]) + current = rows.get(code) + if current is None: + rows[code] = { + "raw_name": raw_name, + "lat_sum": lat, + "lon_sum": lon, + "count": 1, + } + continue + + current["lat_sum"] += lat + current["lon_sum"] += lon + current["count"] += 1 + if _station_name_score(raw_name) < _station_name_score(current["raw_name"]): + current["raw_name"] = raw_name + + stations = [] + for station in rows.values(): + count = station["count"] + display_name = _station_display_name(station["raw_name"], {"network": "DLR"}) + stations.append( + { + "name": display_name, + "place_type": "station", + "lat": station["lat_sum"] / count, + "lon": station["lon_sum"] / count, + "population": 0, + "travel_destination": True, + } + ) + + return sorted(stations, key=lambda station: station["name"]) + + +def _append_naptan_dlr_stations(places: list[dict], naptan_path: Path) -> int: + existing_names = {str(place["name"]).casefold() for place in places} + added = 0 + for station in _naptan_dlr_stations(naptan_path): + key = station["name"].casefold() + if key in existing_names: + continue + places.append(station) + existing_names.add(key) + added += 1 + return added + + class PlaceHandler(osmium.SimpleHandler): def __init__(self, progress: tqdm, england_polygon) -> None: super().__init__() @@ -145,14 +262,7 @@ class PlaceHandler(osmium.SimpleHandler): # Railway stations (tube, national rail, DLR, overground, Elizabeth line) if n.tags.get("railway") == "station": tags = dict(n.tags) - station_tag = tags.get("station", "") - network = tags.get("network", "").lower() - # Skip tram stops - if ( - station_tag == "light_rail" - or "tramlink" in network - or "tram" in network - ): + if _is_tram_station(tags): return display_name = _station_display_name(name, tags) self._add( @@ -178,6 +288,11 @@ def main() -> None: required=True, help="England boundary GeoJSON file", ) + parser.add_argument( + "--naptan", + type=Path, + help="Optional NaPTAN parquet file used to add DLR station destinations", + ) args = parser.parse_args() pbf_file = args.pbf @@ -195,6 +310,9 @@ def main() -> None: handler.apply_file(str(pbf_file), locations=True) print(f"Extracted {len(handler.places):,} place nodes") + if args.naptan: + added = _append_naptan_dlr_stations(handler.places, args.naptan) + print(f"Added {added:,} DLR station destinations from NaPTAN") if handler.places: df = pl.DataFrame(handler.places) diff --git a/pipeline/download/test_crime.py b/pipeline/download/test_crime.py new file mode 100644 index 0000000..b986004 --- /dev/null +++ b/pipeline/download/test_crime.py @@ -0,0 +1,60 @@ +from zipfile import ZipFile + +from pipeline.download.crime import extract_csvs, parse_archives + + +def test_parse_archives_reads_monthly_zip_links_only(): + html = """ +

latest.zip

+
+
+ March 2026 (1.6 GB) +

Contains data from Apr 2023 to Mar 2026

+

6dde462489389445877f3988ef3f4f4b

+
+
+ June 2019 (1.6 GB) +

Contains data from Jul 2016 to Jun 2019

+

d6494297b24c1434bdb2504e95261bf8-100

+
+
+
+
+ Neighbourhood crime (2.2 MB) + 6b80e2b97d87f6668b7a45953924d191 +
+
+ """ + + archives = parse_archives(html, "https://data.police.uk/data/archive/") + + assert [archive.filename for archive in archives] == [ + "2026-03.zip", + "2019-06.zip", + ] + assert archives[0].url == "https://data.police.uk/data/archive/2026-03.zip" + assert archives[0].md5 == "6dde462489389445877f3988ef3f4f4b" + assert archives[1].md5 is None + assert archives[1].raw_md5 == "d6494297b24c1434bdb2504e95261bf8-100" + + +def test_extract_csvs_preserves_existing_newer_files(tmp_path): + zip_path = tmp_path / "older.zip" + output = tmp_path / "crime" + existing = output / "2023-01" / "2023-01-city-street.csv" + existing.parent.mkdir(parents=True) + existing.write_text("newer\n") + + with ZipFile(zip_path, "w") as archive: + archive.writestr("2023-01/2023-01-city-street.csv", "older\n") + archive.writestr("2022-12/2022-12-city-street.csv", "old\n") + archive.writestr("../escape.csv", "bad\n") + archive.writestr("notes.txt", "ignored\n") + + extracted, skipped = extract_csvs(zip_path, output) + + assert extracted == 1 + assert skipped == 1 + assert existing.read_text() == "newer\n" + assert (output / "2022-12" / "2022-12-city-street.csv").read_text() == "old\n" + assert not (tmp_path / "escape.csv").exists() diff --git a/pipeline/download/test_noise.py b/pipeline/download/test_noise.py new file mode 100644 index 0000000..59b2695 --- /dev/null +++ b/pipeline/download/test_noise.py @@ -0,0 +1,89 @@ +import httpx +import pytest + +from pipeline.download import noise + + +def test_download_tile_splits_after_retries(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "MAX_RETRIES", 1) + monkeypatch.setattr(noise, "MIN_TILE_SIZE", 50) + + def fake_fetch_tile_bytes( + wcs_base, + coverage_id, + min_e, + min_n, + max_e, + max_n, + wcs_version="1.0.0", + ): + if max_e - min_e > 50 or max_n - min_n > 50: + raise httpx.TimeoutException("too large") + return b"II*\x00fake-tiff" + + monkeypatch.setattr(noise, "_fetch_tile_bytes", fake_fetch_tile_bytes) + + paths, failures = noise._download_tile("base", "coverage", 0, 0, 100, 100, tmp_path) + + assert failures == [] + assert len(paths) == 4 + assert sorted(path.name for path in paths) == [ + "tile_0_0_50_50.tif", + "tile_0_50_50_100.tif", + "tile_50_0_100_50.tif", + "tile_50_50_100_100.tif", + ] + + +def test_download_tile_reports_unsplittable_failure(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "MAX_RETRIES", 1) + monkeypatch.setattr(noise, "MIN_TILE_SIZE", 100) + + def fake_fetch_tile_bytes(*args, **kwargs): + raise httpx.ConnectError("offline") + + monkeypatch.setattr(noise, "_fetch_tile_bytes", fake_fetch_tile_bytes) + + paths, failures = noise._download_tile("base", "coverage", 0, 0, 100, 100, tmp_path) + + assert paths == [] + assert failures == [(0, 0, 100, 100)] + + +def test_download_raster_tolerates_missing_tiles_when_allowed(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "BNG_MIN_E", 0) + monkeypatch.setattr(noise, "BNG_MAX_E", 100) + monkeypatch.setattr(noise, "BNG_MIN_N", 0) + monkeypatch.setattr(noise, "BNG_MAX_N", 100) + monkeypatch.setattr(noise, "TILE_SIZE", 100) + + def fake_download_tile(*args, **kwargs): + return [], [(0, 0, 100, 100)] + + monkeypatch.setattr(noise, "_download_tile", fake_download_tile) + + paths = noise.download_raster( + tmp_path, + "base", + "coverage", + "Airport", + allow_missing_tiles=True, + ) + + assert paths == [] + + +def test_download_raster_raises_on_missing_strict_tiles(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "BNG_MIN_E", 0) + monkeypatch.setattr(noise, "BNG_MAX_E", 100) + monkeypatch.setattr(noise, "BNG_MIN_N", 0) + monkeypatch.setattr(noise, "BNG_MAX_N", 100) + monkeypatch.setattr(noise, "TILE_SIZE", 100) + + def fake_download_tile(*args, **kwargs): + return [], [(0, 0, 100, 100)] + + monkeypatch.setattr(noise, "_download_tile", fake_download_tile) + + with pytest.raises(RuntimeError, match=r"\[Road\] Failed to download"): + noise.download_raster(tmp_path, "base", "coverage", "Road") diff --git a/pipeline/download/test_places.py b/pipeline/download/test_places.py new file mode 100644 index 0000000..978fa30 --- /dev/null +++ b/pipeline/download/test_places.py @@ -0,0 +1,81 @@ +import polars as pl + +from pipeline.download.places import ( + _is_dlr_station, + _is_tram_station, + _naptan_dlr_stations, + _station_display_name, +) + + +def test_dlr_light_rail_is_not_treated_as_tram(): + dlr_tags = { + "name": "Lewisham DLR", + "railway": "station", + "station": "light_rail", + "network": "Docklands Light Railway", + } + + assert _is_dlr_station(dlr_tags) + assert not _is_tram_station(dlr_tags) + assert _station_display_name("Lewisham DLR", dlr_tags) == "Lewisham DLR station" + assert ( + _station_display_name("Tower Gateway Station DLR", dlr_tags) + == "Tower Gateway DLR station" + ) + + +def test_tram_light_rail_is_still_excluded(): + tram_tags = { + "name": "East Croydon", + "railway": "station", + "station": "light_rail", + "network": "London Trams", + } + + assert not _is_dlr_station(tram_tags) + assert _is_tram_station(tram_tags) + + +def test_naptan_dlr_stations_are_deduplicated_by_atco_code(tmp_path): + naptan = tmp_path / "naptan.parquet" + pl.DataFrame( + { + "id": [ + "4900ZZDLSHA3", + "9400ZZDLSHA", + "4900ZZDLGRE1", + "490002076RV", + "4900ZZLUBNK", + ], + "name": [ + "Shadwell DLR", + "Shadwell DLR Station", + "Greenwich Station", + "Tower Gateway Station DLR", + "Bank", + ], + "category": [ + "Tube station", + "Tube station", + "Rail station", + "Bus stop", + "Tube station", + ], + "lat": [51.51156, 51.511693, 51.47794, 51.510575, 51.5131], + "lng": [-0.055595, -0.056643, -0.01442, -0.07514, -0.0894], + } + ).write_parquet(naptan) + + stations = _naptan_dlr_stations(naptan) + + assert [station["name"] for station in stations] == [ + "Greenwich DLR station", + "Shadwell DLR station", + ] + shadwell = next( + station for station in stations if station["name"].startswith("Shadwell") + ) + assert shadwell["lat"] == (51.51156 + 51.511693) / 2 + assert shadwell["place_type"] == "station" + assert shadwell["travel_destination"] is True diff --git a/pipeline/download/transit_network.py b/pipeline/download/transit_network.py index 94dd5f5..4cbf5f8 100644 --- a/pipeline/download/transit_network.py +++ b/pipeline/download/transit_network.py @@ -56,6 +56,7 @@ NR_AUTH_URL = "https://opendata.nationalrail.co.uk/authenticate" NR_TIMETABLE_URL = "https://opendata.nationalrail.co.uk/api/staticfeeds/3.0/timetable" USER_AGENT = "property-map-pipeline/1.0 (https://github.com)" +TRANSXCHANGE2GTFS_PACKAGE = "transxchange2gtfs@1.12.0" def _download_http( @@ -473,10 +474,50 @@ def convert_tfl_to_gtfs(raw_dir: Path, output_dir: Path) -> Path: download_naptan() print("Converting TfL TransXChange → GTFS...") + # The shim patches known packaging/runtime issues in the pinned npm package + # before loading its CLI from npx's temporary install. + shim_path = Path(__file__).with_name("transxchange2gtfs_shim.js") subprocess.run( - ["npx", "--yes", "transxchange2gtfs", str(txc_path), str(dest)], + [ + "npx", + "--yes", + "--package", + TRANSXCHANGE2GTFS_PACKAGE, + "sh", + "-c", + "\n".join( + [ + 'bin="$(command -v transxchange2gtfs)"', + 'script="$(readlink -f "$bin")"', + 'pkg_dir="$(dirname "$(dirname "$script")")"', + 'shim="$1"', + "shift", + 'exec node "$shim" "$pkg_dir" "$@"', + ] + ), + "transxchange2gtfs", + str(shim_path.resolve()), + str(txc_path.resolve()), + str(dest.resolve()), + ], check=True, ) + required_files = { + "agency.txt", + "calendar.txt", + "calendar_dates.txt", + "routes.txt", + "stop_times.txt", + "stops.txt", + "trips.txt", + } + if not dest.exists() or not zipfile.is_zipfile(dest): + raise RuntimeError(f"transxchange2gtfs did not create a valid GTFS zip: {dest}") + with zipfile.ZipFile(dest) as z: + missing = required_files - set(z.namelist()) + if missing: + missing_str = ", ".join(sorted(missing)) + raise RuntimeError(f"TfL GTFS zip is missing required files: {missing_str}") size_mb = dest.stat().st_size / (1024 * 1024) print(f" Saved to {dest} ({size_mb:.1f} MB)") return dest diff --git a/pipeline/download/transxchange2gtfs_shim.js b/pipeline/download/transxchange2gtfs_shim.js new file mode 100644 index 0000000..e1952e8 --- /dev/null +++ b/pipeline/download/transxchange2gtfs_shim.js @@ -0,0 +1,76 @@ +#!/usr/bin/env node +"use strict"; + +const fs = require("fs"); +const path = require("path"); +const { createRequire } = require("module"); + +const [pkgDirArg, ...converterArgs] = process.argv.slice(2); + +if (!pkgDirArg || converterArgs.length < 2) { + console.error( + "Usage: transxchange2gtfs_shim.js ", + ); + process.exit(2); +} + +const pkgDir = path.resolve(pkgDirArg); + +function replaceOnce(relativePath, before, after) { + const file = path.join(pkgDir, relativePath); + const original = fs.readFileSync(file, "utf8"); + if (original.includes(before)) { + fs.writeFileSync(file, original.replace(before, after)); + } else if (original.includes(after)) { + return; + } else { + throw new Error(`Could not patch ${relativePath}: expected text not found`); + } +} + +// The published 1.12.0 package has a few compatibility issues with current +// TfL TransXChange exports: +// - the bin script points at dist/src/cli.js, but the package ships dist/cli.js +// - the compiled date-holidays import expects a synthetic default export +// - some TfL journeys reference timing links without matching route-link geometry +// +// GTFS shapes are optional for R5 routing. Clear shape references and omit +// shapes.txt so missing route geometry does not drop otherwise usable trips. +function patchPackage() { + replaceOnce( + "dist/transxchange/TransXChangeJourneyStream.js", + "distanceSoFarM += routeLink.Distance;", + "distanceSoFarM += routeLink ? routeLink.Distance : 0;", + ); + replaceOnce( + "dist/gtfs/TripsStream.js", + "(0, crypto_1.createHash)('md5').update(JSON.stringify({ routeId: journey.route, routeLinkSeq: journey.routeLinkIds })).digest(\"hex\"));", + "\"\");", + ); + replaceOnce( + "dist/gtfs/StopTimesStream.js", + "stop.shapeDistTraveled, stop.exactTime ? \"1\" : \"0\");", + "\"\", stop.exactTime ? \"1\" : \"0\");", + ); + replaceOnce( + "dist/Container.js", + "\"stops.txt\": transxchange.pipe(new StopsStream_1.StopsStream(naptanIndex)),\n \"shapes.txt\": journeyStream.pipe(new ShapesStream_1.ShapesStream())", + "\"stops.txt\": transxchange.pipe(new StopsStream_1.StopsStream(naptanIndex))", + ); + replaceOnce( + "dist/Container.js", + "\"routes.txt\": transxchange.pipe(new RoutesStream_1.RoutesStream()),\n \"transfers.txt\": transxchange.pipe(new TransfersStream_1.TransfersStream(naptanIndex, locationIndex)),\n \"stops.txt\": transxchange.pipe(new StopsStream_1.StopsStream(naptanIndex))", + "\"routes.txt\": transxchange.pipe(new RoutesStream_1.RoutesStream()),\n \"stops.txt\": transxchange.pipe(new StopsStream_1.StopsStream(naptanIndex))", + ); +} + +patchPackage(); + +const pkgRequire = createRequire(path.join(pkgDir, "package.json")); +const Holidays = pkgRequire("date-holidays"); +if (!Holidays.default) { + Holidays.default = Holidays; +} + +process.argv = [process.argv[0], "transxchange2gtfs", ...converterArgs]; +require(path.join(pkgDir, "dist", "cli.js")); diff --git a/pipeline/transform/crime.py b/pipeline/transform/crime.py index acd4010..a9cf808 100644 --- a/pipeline/transform/crime.py +++ b/pipeline/transform/crime.py @@ -1,12 +1,32 @@ import argparse +import re from pathlib import Path import polars as pl +STREET_CRIME_CSV_RE = re.compile(r"^\d{4}-\d{2}-.+-street\.csv$") + + +def find_street_crime_csvs(crime_dir: Path) -> tuple[list[Path], int]: + csvs = sorted(crime_dir.rglob("*.csv")) + street_csvs = [path for path in csvs if STREET_CRIME_CSV_RE.fullmatch(path.name)] + return street_csvs, len(csvs) - len(street_csvs) + def transform_crime(crime_dir: Path, output_path: Path) -> None: - csvs = sorted(crime_dir.rglob("*.csv")) - print(f"Found {len(csvs)} CSV files across {len(list(crime_dir.iterdir()))} months") + csvs, ignored_csv_count = find_street_crime_csvs(crime_dir) + if not csvs: + raise FileNotFoundError(f"No street crime CSV files found in {crime_dir}") + + month_count = len({path.parent.name for path in csvs}) + print( + f"Found {len(csvs)} street crime CSV files across {month_count} months" + + ( + f" (ignored {ignored_csv_count} non-street CSVs)" + if ignored_csv_count + else "" + ) + ) df = pl.scan_csv( csvs, diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index 2f90389..95c9769 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -1,6 +1,15 @@ import argparse -import polars as pl +import csv +import io +import tempfile +import zipfile from pathlib import Path + +import polars as pl +import pyarrow as pa +import pyarrow.csv as pa_csv +import pyarrow.parquet as pq + from ..utils import fuzzy_join_on_postcode @@ -8,12 +17,168 @@ pl.Config.set_tbl_cols(-1) RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7} MIN_PRICE = 50_000 +EPC_SOURCE_COLUMNS = [ + "address", + "postcode", + "current_energy_rating", + "potential_energy_rating", + "property_type", + "built_form", + "inspection_date", + "total_floor_area", + "number_habitable_rooms", + "floor_height", + "construction_age_band", + "tenure", +] + + +def _normalise_csv_columns(columns: list[str]) -> list[str]: + return [column.strip().lower() for column in columns] + + +def _clean_string(column: str) -> pl.Expr: + stripped = pl.col(column).cast(pl.String).str.strip_chars() + return pl.when(stripped == "").then(None).otherwise(stripped) + + +def _clean_number(column: str, dtype: pl.DataType) -> pl.Expr: + return _clean_string(column).cast(dtype, strict=False) + + +def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame: + return ( + raw.select( + _clean_string("address").alias("epc_address"), + _clean_string("postcode").str.to_uppercase().alias("epc_postcode"), + _clean_string("current_energy_rating") + .str.to_uppercase() + .alias("current_energy_rating"), + _clean_string("potential_energy_rating") + .str.to_uppercase() + .alias("potential_energy_rating"), + _clean_string("property_type").alias("epc_property_type"), + _clean_string("built_form").alias("built_form"), + _clean_string("inspection_date").alias("inspection_date"), + _clean_number("total_floor_area", pl.Float64).alias("total_floor_area"), + _clean_number("number_habitable_rooms", pl.Int16).alias( + "number_habitable_rooms" + ), + _clean_number("floor_height", pl.Float64).alias("floor_height"), + _clean_string("construction_age_band").alias("construction_age_band"), + _clean_string("tenure").alias("tenure"), + ) + .filter(pl.col("epc_address").is_not_null()) + .with_columns( + pl.when(pl.col("number_habitable_rooms") == 0) + .then(None) + .otherwise(pl.col("number_habitable_rooms")) + .alias("number_habitable_rooms"), + ) + ) + + +def _certificate_member_names(zip_file: zipfile.ZipFile) -> list[str]: + return sorted( + name + for name in zip_file.namelist() + if not name.endswith("/") + and Path(name).name.lower().startswith("certificates") + and name.lower().endswith(".csv") + ) + + +def _read_zip_csv_header(zip_file: zipfile.ZipFile, member_name: str) -> list[str]: + with zip_file.open(member_name) as member: + text = io.TextIOWrapper(member, encoding="utf-8-sig", newline="") + try: + return next(csv.reader(text)) + except StopIteration as exc: + raise ValueError(f"EPC CSV member is empty: {member_name}") from exc + + +def _source_columns_for_header(header: list[str]) -> list[str]: + columns_by_normalised_name = { + normalised: source + for source, normalised in zip(header, _normalise_csv_columns(header)) + } + return [ + columns_by_normalised_name.get(column, column) for column in EPC_SOURCE_COLUMNS + ] + + +def _zip_certificates_to_parquet(zip_path: Path, output_path: Path) -> None: + schema = pa.schema((column, pa.string()) for column in EPC_SOURCE_COLUMNS) + writer = pq.ParquetWriter(output_path, schema=schema, compression="zstd") + + try: + try: + zip_file = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as exc: + raise ValueError( + f"{zip_path} is not a readable EPC zip archive; re-download " + "domestic-csv.zip and try again" + ) from exc + + with zip_file: + member_names = _certificate_member_names(zip_file) + if not member_names: + raise ValueError(f"No certificate CSV files found in {zip_path}") + + for member_name in member_names: + print(f"Reading EPC certificates from {member_name}") + source_columns = _source_columns_for_header( + _read_zip_csv_header(zip_file, member_name) + ) + convert_options = pa_csv.ConvertOptions( + include_columns=source_columns, + include_missing_columns=True, + column_types={ + source_column: pa.string() for source_column in source_columns + }, + strings_can_be_null=True, + ) + read_options = pa_csv.ReadOptions(block_size=64 * 1024 * 1024) + + with zip_file.open(member_name) as member: + reader = pa_csv.open_csv( + member, + read_options=read_options, + convert_options=convert_options, + ) + while True: + try: + batch = reader.read_next_batch() + except StopIteration: + break + + if batch.num_rows == 0: + continue + + writer.write_batch(batch.rename_columns(EPC_SOURCE_COLUMNS)) + finally: + writer.close() + + +def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame: + if epc_path.suffix.lower() == ".zip": + parquet_path = temp_dir / "epc-certificates.parquet" + _zip_certificates_to_parquet(epc_path, parquet_path) + raw = pl.scan_parquet(parquet_path) + else: + raw = pl.scan_csv( + epc_path, + infer_schema=False, + with_column_names=_normalise_csv_columns, + ) + + return _select_epc_columns(raw) def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") parser.add_argument( - "--epc", type=Path, required=True, help="EPC certificates CSV file" + "--epc", type=Path, required=True, help="EPC certificates CSV file or zip" ) parser.add_argument( "--price-paid", type=Path, required=True, help="Price paid parquet file" @@ -23,74 +188,56 @@ def main(): ) args = parser.parse_args() - epc_base = ( - pl.scan_csv(args.epc) - .select( - pl.col("ADDRESS").alias("epc_address"), - "POSTCODE", - "CURRENT_ENERGY_RATING", - "POTENTIAL_ENERGY_RATING", - pl.col("PROPERTY_TYPE").alias("epc_property_type"), - "BUILT_FORM", - "INSPECTION_DATE", - "TOTAL_FLOOR_AREA", - "NUMBER_HABITABLE_ROOMS", - "FLOOR_HEIGHT", - "CONSTRUCTION_AGE_BAND", - "TENURE", - ) - .filter(pl.col("epc_address").is_not_null()) - .with_columns( - pl.when(pl.col("NUMBER_HABITABLE_ROOMS") == 0) - .then(None) - .otherwise(pl.col("NUMBER_HABITABLE_ROOMS")) - .alias("NUMBER_HABITABLE_ROOMS"), - ) - ) + with tempfile.TemporaryDirectory(prefix="epc_certificates_") as tmpdir: + _run(args.epc, args.price_paid, args.output, Path(tmpdir)) + + +def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path): + epc_base = _scan_epc_certificates(epc_path, temp_dir) # Dedup fork: keep latest certificate per property (existing logic) epc = ( - epc_base.sort("INSPECTION_DATE", descending=True) - .group_by("epc_address", "POSTCODE") + epc_base.sort("inspection_date", descending=True) + .group_by("epc_address", "epc_postcode") .first() - .drop("TENURE") + .drop("tenure") ) # Events fork: detect renovation events between consecutive certificates # Collect eagerly because .over() window functions don't work in streaming # engine (fuzzy_join.py:50 uses sink_parquet which requires streaming). events = ( - epc_base.sort("INSPECTION_DATE") + epc_base.sort("inspection_date") .with_columns( - pl.col("CURRENT_ENERGY_RATING") + pl.col("current_energy_rating") .replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32) .alias("_rating_rank"), ) .with_columns( - pl.col("NUMBER_HABITABLE_ROOMS") + pl.col("number_habitable_rooms") .shift(1) - .over("epc_address", "POSTCODE") + .over("epc_address", "epc_postcode") .alias("_prev_rooms"), - pl.col("TOTAL_FLOOR_AREA") + pl.col("total_floor_area") .shift(1) - .over("epc_address", "POSTCODE") + .over("epc_address", "epc_postcode") .alias("_prev_area"), pl.col("_rating_rank") .shift(1) - .over("epc_address", "POSTCODE") + .over("epc_address", "epc_postcode") .alias("_prev_rating_rank"), ) .with_columns( pl.when( - pl.col("NUMBER_HABITABLE_ROOMS").is_not_null() + pl.col("number_habitable_rooms").is_not_null() & pl.col("_prev_rooms").is_not_null() - & (pl.col("NUMBER_HABITABLE_ROOMS") != pl.col("_prev_rooms")) + & (pl.col("number_habitable_rooms") != pl.col("_prev_rooms")) ) .then(pl.lit("Remodelling")) .when( - pl.col("TOTAL_FLOOR_AREA").is_not_null() + pl.col("total_floor_area").is_not_null() & pl.col("_prev_area").is_not_null() - & (pl.col("TOTAL_FLOOR_AREA") > pl.col("_prev_area")) + & (pl.col("total_floor_area") > pl.col("_prev_area")) ) .then(pl.lit("Extension")) .when( @@ -104,13 +251,13 @@ def main(): ) .filter(pl.col("_event").is_not_null()) .with_columns( - pl.col("INSPECTION_DATE") + pl.col("inspection_date") .cast(pl.String) .str.slice(0, 4) .cast(pl.Int32) .alias("_event_year"), ) - .group_by("epc_address", "POSTCODE") + .group_by("epc_address", "epc_postcode") .agg( pl.struct( pl.col("_event_year").alias("year"), @@ -128,8 +275,8 @@ def main(): # Social tenure fork: flag properties that were ever social housing social_tenure = ( - epc_base.filter(pl.col("TENURE").str.to_lowercase().str.contains("social")) - .select("epc_address", "POSTCODE") + epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social")) + .select("epc_address", "epc_postcode") .unique() .with_columns(pl.lit("Yes").alias("was_council_house")) .collect() @@ -140,12 +287,12 @@ def main(): epc = ( epc.join( events.lazy(), - on=["epc_address", "POSTCODE"], + on=["epc_address", "epc_postcode"], how="left", ) .join( social_tenure.lazy(), - on=["epc_address", "POSTCODE"], + on=["epc_address", "epc_postcode"], how="left", ) .with_columns( @@ -167,7 +314,7 @@ def main(): duration_map = {"F": "Freehold", "L": "Leasehold"} price_paid = ( - pl.scan_parquet(args.price_paid) + pl.scan_parquet(price_paid_path) .select( "price", "date_of_transfer", @@ -219,9 +366,9 @@ def main(): left_address_col="pp_address", right_address_col="epc_address", left_postcode_col="postcode", - right_postcode_col="POSTCODE", + right_postcode_col="epc_postcode", ) - .drop("POSTCODE") + .drop("epc_postcode") .collect(engine="streaming") ) @@ -236,7 +383,7 @@ def main(): # For new-builds (old_new == "Y"), use the first transaction date year as # the exact construction date; otherwise fall back to the EPC age band. epc_band_year = ( - pl.col("CONSTRUCTION_AGE_BAND") + pl.col("construction_age_band") .str.replace("England and Wales: ", "") .str.replace(" onwards", "") .str.extract(r"(\d{4})", 1) @@ -251,7 +398,7 @@ def main(): pl.when(is_new_build & transfer_year.is_not_null()) .then(transfer_year) .otherwise(epc_band_year) - .alias("CONSTRUCTION_AGE_BAND"), + .alias("construction_age_band"), pl.when(is_new_build & transfer_year.is_not_null()) .then(pl.lit(0, dtype=pl.UInt8)) .when(epc_band_year.is_not_null()) @@ -263,8 +410,8 @@ def main(): joined = joined.rename({col: col.lower() for col in joined.columns}) print(joined.head()) - joined.write_parquet(args.output) - print(f"Wrote {args.output}") + joined.write_parquet(output_path) + print(f"Wrote {output_path}") if __name__ == "__main__": diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 8b485a2..56b6b90 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -7,6 +7,15 @@ from pipeline.utils.postcode_mapping import build_postcode_mapping MIN_FLOOR_AREA_M2 = 10 +_IOD_PERCENTILE_COLUMNS = [ + "Education, Skills and Training Score", + "Income Score (rate)", + "Employment Score (rate)", + "Health Deprivation and Disability Score", + "Indoors Sub-domain Score", + "Outdoors Sub-domain Score", +] + _AREA_COLUMNS = [ "Postcode", @@ -76,6 +85,24 @@ _AREA_COLUMNS = [ ] +def _less_deprived_percentile_expr(column: str) -> pl.Expr: + """Convert an IoD deprivation score to a 0-100 less-deprived percentile.""" + non_null_count = pl.col(column).count() + descending_rank = pl.col(column).rank("average", descending=True) + return ( + pl.when(pl.col(column).is_null()) + .then(None) + .when(pl.col(column) == pl.col(column).min()) + .then(100.0) + .when(pl.col(column) == pl.col(column).max()) + .then(0.0) + .when(non_null_count > 1) + .then(((descending_rank - 1) / (non_null_count - 1) * 100).round(1)) + .otherwise(100.0) + .alias(column) + ) + + def _build( epc_pp_path: Path, arcgis_path: Path, @@ -134,20 +161,11 @@ def _build( ) wide = wide.join(arcgis, on="postcode", how="left") - iod = pl.scan_parquet(iod_path) + iod = pl.scan_parquet(iod_path).with_columns( + *(_less_deprived_percentile_expr(c) for c in _IOD_PERCENTILE_COLUMNS) + ) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") - # Invert deprivation scores so that higher values = less deprived (better) - iod_score_cols = [ - "Education, Skills and Training Score", - "Income Score (rate)", - "Employment Score (rate)", - "Health Deprivation and Disability Score", - "Indoors Sub-domain Score", - "Outdoors Sub-domain Score", - ] - wide = wide.with_columns(*(pl.col(c).max() - pl.col(c) for c in iod_score_cols)) - ethnicity = pl.scan_parquet(ethnicity_path) wide = wide.join( ethnicity, diff --git a/pipeline/transform/poi_proximity.py b/pipeline/transform/poi_proximity.py index f3a1204..4634d7f 100644 --- a/pipeline/transform/poi_proximity.py +++ b/pipeline/transform/poi_proximity.py @@ -1,6 +1,8 @@ """Compute POI proximity counts and distances per postcode from ArcGIS + filtered POIs.""" import argparse +import re +import unicodedata from pathlib import Path import polars as pl @@ -15,9 +17,25 @@ POI_GROUPS_2KM = { "groceries": ["Greengrocer", "Supermarket", "Convenience Store"], } -# Groups for which to compute distance to nearest POI (from filtered POIs) +# Groups for which to compute distance to nearest POI (from filtered POIs). +# Keep `train_tube` for the existing backend feature; the individual POI +# distance filters below power the frontend dropdown. DISTANCE_GROUPS = { "train_tube": ["Tube station", "Rail station"], + "grocery_store": [ + "Greengrocer", + "Supermarket", + "Convenience Store", + "Waitrose", + "Tesco", + ], + "tube_station": ["Tube station"], + "rail_station": ["Rail station"], + "waitrose": ["Waitrose"], + "tesco": ["Tesco"], + "cafe": ["Café"], + "pub": ["Pub"], + "restaurant": ["Restaurant"], } # OS Open Greenspace function types used for park counts and distance calculation. @@ -27,6 +45,69 @@ GREENSPACE_PARK_FUNCTIONS = { "parks": ["Public Park Or Garden", "Playing Field", "Play Space"], } +GROCERY_DYNAMIC_FILTER_MIN_POIS = 100 +DYNAMIC_FILTER_ALL_GROUPS = {"Public Transport", "Leisure"} +DYNAMIC_FILTER_COUNT_THRESHOLD_GROUPS = {"Groceries"} + + +def _poi_category_slug(category: str) -> str: + ascii_text = ( + unicodedata.normalize("NFKD", category) + .encode("ascii", "ignore") + .decode("ascii") + .lower() + ) + slug = re.sub(r"[^a-z0-9]+", "_", ascii_text).strip("_") + return slug or "poi" + + +def _build_poi_category_groups( + pois: pl.DataFrame, +) -> tuple[dict[str, list[str]], dict[str, str]]: + """Build one proximity group for each POI category selected for filters.""" + if "group" not in pois.columns: + raise ValueError("POI dataframe must include a 'group' column") + + categories = ( + pois.group_by("group", "category") + .len() + .filter( + pl.col("group").is_in(list(DYNAMIC_FILTER_ALL_GROUPS)) + | ( + pl.col("group").is_in(list(DYNAMIC_FILTER_COUNT_THRESHOLD_GROUPS)) + & (pl.col("len") > GROCERY_DYNAMIC_FILTER_MIN_POIS) + ) + ) + .select("category") + .sort("category") + .to_series() + .to_list() + ) + used_slugs: dict[str, int] = {} + groups: dict[str, list[str]] = {} + display_names: dict[str, str] = {} + + for category in categories: + if not isinstance(category, str) or not category: + continue + base_slug = f"poi_{_poi_category_slug(category)}" + slug_count = used_slugs.get(base_slug, 0) + used_slugs[base_slug] = slug_count + 1 + group_key = base_slug if slug_count == 0 else f"{base_slug}_{slug_count + 1}" + groups[group_key] = [category] + display_names[group_key] = category + + return groups, display_names + + +def _dynamic_poi_metric_renames(display_names: dict[str, str]) -> dict[str, str]: + renames: dict[str, str] = {} + for group_key, category in display_names.items(): + renames[f"{group_key}_nearest_km"] = f"Distance to nearest {category} POI (km)" + renames[f"{group_key}_2km"] = f"Number of {category} POIs within 2km" + renames[f"{group_key}_5km"] = f"Number of {category} POIs within 5km" + return renames + def main(): parser = argparse.ArgumentParser( @@ -56,12 +137,35 @@ def main(): ) pois = pl.read_parquet(args.pois) + poi_category_groups, poi_display_names = _build_poi_category_groups(pois) # Count amenity POIs within 2km counts_2km = count_pois_per_postcode( postcodes, pois, groups=POI_GROUPS_2KM, radius_km=2 ) + # Dynamic POI filters: nearest distance plus counts within 2km and 5km for + # the selected public transport, grocery, and leisure categories. + dynamic_counts_2km = count_pois_per_postcode( + postcodes, pois, groups=poi_category_groups, radius_km=2 + ) + dynamic_counts_5km = count_pois_per_postcode( + postcodes, pois, groups=poi_category_groups, radius_km=5 + ) + dynamic_distances = min_distance_per_postcode( + postcodes, pois, groups=poi_category_groups + ) + dynamic_renames = _dynamic_poi_metric_renames(poi_display_names) + dynamic_counts_2km = dynamic_counts_2km.rename( + {k: v for k, v in dynamic_renames.items() if k in dynamic_counts_2km.columns} + ) + dynamic_counts_5km = dynamic_counts_5km.rename( + {k: v for k, v in dynamic_renames.items() if k in dynamic_counts_5km.columns} + ) + dynamic_distances = dynamic_distances.rename( + {k: v for k, v in dynamic_renames.items() if k in dynamic_distances.columns} + ) + # Distance to nearest train/tube station (from filtered POIs) distances = min_distance_per_postcode(postcodes, pois, groups=DISTANCE_GROUPS) @@ -77,6 +181,9 @@ def main(): # Join all results on postcode result = ( counts_2km.join(distances, on="postcode") + .join(dynamic_counts_2km, on="postcode") + .join(dynamic_counts_5km, on="postcode") + .join(dynamic_distances, on="postcode") .join(park_counts_1km, on="postcode") .join(park_distances, on="postcode") ) diff --git a/pipeline/transform/test_crime.py b/pipeline/transform/test_crime.py new file mode 100644 index 0000000..fa5d40c --- /dev/null +++ b/pipeline/transform/test_crime.py @@ -0,0 +1,47 @@ +import polars as pl + +from pipeline.transform.crime import find_street_crime_csvs, transform_crime + + +def test_find_street_crime_csvs_ignores_archive_sidecars(tmp_path): + crime_dir = tmp_path / "crime" + month_dir = crime_dir / "2024-01" + month_dir.mkdir(parents=True) + street = month_dir / "2024-01-test-force-street.csv" + street.touch() + (month_dir / "2024-01-test-force-outcomes.csv").touch() + (month_dir / "2024-01-test-force-stop-and-search.csv").touch() + (crime_dir / "notes.csv").touch() + + csvs, ignored_count = find_street_crime_csvs(crime_dir) + + assert csvs == [street] + assert ignored_count == 3 + + +def test_transform_crime_reads_only_street_crime_csvs(tmp_path): + crime_dir = tmp_path / "crime" + month_dir = crime_dir / "2024-01" + month_dir.mkdir(parents=True) + + (month_dir / "2024-01-test-force-street.csv").write_text( + "\n".join( + [ + "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context", + "1,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", + "2,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", + "3,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,,No LSOA,Robbery,Under investigation,", + ] + ) + + "\n" + ) + (month_dir / "2024-01-test-force-outcomes.csv").write_text( + "Crime ID,Month,Reported by,Outcome type\n1,2024-01,Test Force,Charged\n" + ) + + output = tmp_path / "crime.parquet" + transform_crime(crime_dir, output) + + result = pl.read_parquet(output).to_dicts() + + assert result == [{"LSOA code": "E01000001", "Burglary (avg/yr)": 2.0}] diff --git a/pipeline/transform/test_join_epc_pp.py b/pipeline/transform/test_join_epc_pp.py new file mode 100644 index 0000000..7980ea9 --- /dev/null +++ b/pipeline/transform/test_join_epc_pp.py @@ -0,0 +1,174 @@ +import csv +import io +import zipfile +from datetime import date +from pathlib import Path + +import polars as pl + +from pipeline.transform.join_epc_pp import ( + EPC_SOURCE_COLUMNS, + _run, + _scan_epc_certificates, +) + + +def _write_csv(path: Path, fieldnames: list[str], rows: list[dict[str, str]]) -> None: + with path.open("w", newline="") as file: + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def _row(**overrides: str) -> dict[str, str]: + row = { + "address": "1 Example Street", + "postcode": " aa1 1aa ", + "current_energy_rating": "c", + "potential_energy_rating": "b", + "property_type": "House", + "built_form": "Mid-Terrace", + "inspection_date": "2024-01-02", + "total_floor_area": "84.5", + "number_habitable_rooms": "5", + "floor_height": "2.4", + "construction_age_band": "England and Wales: 1950-1966", + "tenure": "owner-occupied", + } + row.update(overrides) + return row + + +def test_scan_epc_certificates_supports_legacy_uppercase_csv(tmp_path: Path): + csv_path = tmp_path / "certificates.csv" + fieldnames = [column.upper() for column in EPC_SOURCE_COLUMNS] + row = {column.upper(): value for column, value in _row().items()} + row["NUMBER_HABITABLE_ROOMS"] = "0" + _write_csv(csv_path, fieldnames, [row]) + + df = _scan_epc_certificates(csv_path, tmp_path).collect() + + assert df.to_dicts() == [ + { + "epc_address": "1 Example Street", + "epc_postcode": "AA1 1AA", + "current_energy_rating": "C", + "potential_energy_rating": "B", + "epc_property_type": "House", + "built_form": "Mid-Terrace", + "inspection_date": "2024-01-02", + "total_floor_area": 84.5, + "number_habitable_rooms": None, + "floor_height": 2.4, + "construction_age_band": "England and Wales: 1950-1966", + "tenure": "owner-occupied", + } + ] + + +def test_scan_epc_certificates_supports_domestic_zip(tmp_path: Path): + zip_path = tmp_path / "domestic-csv.zip" + rows_2023 = [_row(address="2 Example Street", inspection_date="2023-03-04")] + rows_2024 = [ + _row( + address="3 Example Street", + postcode="BB2 2BB", + inspection_date="2024-05-06", + total_floor_area="", + tenure="Rented (social)", + ) + ] + + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for member_name, rows in [ + ("certificates-2023.csv", rows_2023), + ("nested/certificates-2024.csv", rows_2024), + ]: + csv_text = [",".join(EPC_SOURCE_COLUMNS)] + csv_text.extend( + ",".join(row[column] for column in EPC_SOURCE_COLUMNS) for row in rows + ) + archive.writestr(member_name, "\n".join(csv_text) + "\n") + archive.writestr("recommendations-2024.csv", "address,postcode\nignored,X\n") + + df = _scan_epc_certificates(zip_path, tmp_path).sort("inspection_date").collect() + + assert df.select("epc_address", "epc_postcode", "total_floor_area").to_dicts() == [ + { + "epc_address": "2 Example Street", + "epc_postcode": "AA1 1AA", + "total_floor_area": 84.5, + }, + { + "epc_address": "3 Example Street", + "epc_postcode": "BB2 2BB", + "total_floor_area": None, + }, + ] + assert df.get_column("tenure").to_list() == ["owner-occupied", "Rented (social)"] + assert df.schema["number_habitable_rooms"] == pl.Int16 + + +def test_run_joins_domestic_zip_with_price_paid(tmp_path: Path): + zip_path = tmp_path / "domestic-csv.zip" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + csv_buffer = io.StringIO() + writer = csv.DictWriter(csv_buffer, fieldnames=EPC_SOURCE_COLUMNS) + writer.writeheader() + writer.writerows( + [ + _row( + current_energy_rating="d", + inspection_date="2023-01-01", + total_floor_area="80", + tenure="Rented (social)", + ), + _row( + current_energy_rating="c", + inspection_date="2024-01-01", + total_floor_area="85", + tenure="owner-occupied", + ), + ] + ) + archive.writestr("certificates-2024.csv", csv_buffer.getvalue()) + + price_paid_path = tmp_path / "price-paid.parquet" + pl.DataFrame( + { + "price": [250_000], + "date_of_transfer": [date(2024, 2, 3)], + "property_type": ["T"], + "postcode": ["AA1 1AA"], + "paon": ["1"], + "saon": [None], + "street": ["Example Street"], + "locality": [None], + "town_city": ["Exampletown"], + "duration": ["F"], + "old_new": ["N"], + } + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + assert df.select( + "epc_address", + "current_energy_rating", + "total_floor_area", + "construction_age_band", + "was_council_house", + ).to_dicts() == [ + { + "epc_address": "1 Example Street", + "current_energy_rating": "C", + "total_floor_area": 85.0, + "construction_age_band": 1950, + "was_council_house": "Yes", + } + ] + assert df.get_column("renovation_history").list.len().to_list() == [1] diff --git a/pipeline/transform/test_merge.py b/pipeline/transform/test_merge.py new file mode 100644 index 0000000..3964eca --- /dev/null +++ b/pipeline/transform/test_merge.py @@ -0,0 +1,33 @@ +import polars as pl + +from pipeline.transform.merge import ( + _is_dynamic_poi_metric_column, + _less_deprived_percentile_expr, +) + + +def test_less_deprived_percentile_expr_preserves_direction_and_nulls() -> None: + df = pl.DataFrame({"Income Score (rate)": [1.0, 2.0, 3.0, None]}) + + result = df.lazy().with_columns( + _less_deprived_percentile_expr("Income Score (rate)") + ).collect() + + assert result["Income Score (rate)"].to_list() == [100.0, 50.0, 0.0, None] + + +def test_less_deprived_percentile_expr_uses_exact_scale_endpoints() -> None: + df = pl.DataFrame({"Income Score (rate)": [1.0, 1.0, 2.0, 3.0, 3.0]}) + + result = df.lazy().with_columns( + _less_deprived_percentile_expr("Income Score (rate)") + ).collect() + + assert result["Income Score (rate)"].to_list() == [100.0, 100.0, 50.0, 0.0, 0.0] + + +def test_dynamic_poi_metric_columns_are_area_level() -> None: + assert _is_dynamic_poi_metric_column("Distance to nearest Cafe POI (km)") + assert _is_dynamic_poi_metric_column("Number of Cafe POIs within 2km") + assert _is_dynamic_poi_metric_column("Number of Cafe POIs within 5km") + assert not _is_dynamic_poi_metric_column("Number of restaurants within 2km") diff --git a/pipeline/transform/test_poi_proximity.py b/pipeline/transform/test_poi_proximity.py new file mode 100644 index 0000000..68fb9c7 --- /dev/null +++ b/pipeline/transform/test_poi_proximity.py @@ -0,0 +1,41 @@ +import polars as pl + +from pipeline.transform.poi_proximity import _build_poi_category_groups + + +def test_dynamic_poi_groups_include_requested_categories_only() -> None: + pois = pl.DataFrame( + { + "group": ( + ["Public Transport"] * 2 + + ["Leisure"] * 2 + + ["Groceries"] * 101 + + ["Groceries"] * 100 + + ["Education"] * 200 + + ["Health"] * 200 + ), + "category": ( + ["Rail station", "Bus stop"] + + ["Café", "Restaurant"] + + ["Tesco"] * 101 + + ["Waitrose"] * 100 + + ["School"] * 200 + + ["Pharmacy"] * 200 + ), + "lat": [51.5] * 605, + "lng": [-0.1] * 605, + } + ) + + groups, display_names = _build_poi_category_groups(pois) + + assert set(display_names.values()) == { + "Bus stop", + "Café", + "Rail station", + "Restaurant", + "Tesco", + } + assert "poi_waitrose" not in groups + assert "poi_school" not in groups + assert "poi_pharmacy" not in groups diff --git a/pipeline/transform/test_transform_poi.py b/pipeline/transform/test_transform_poi.py index 2d14716..a9e0a0a 100644 --- a/pipeline/transform/test_transform_poi.py +++ b/pipeline/transform/test_transform_poi.py @@ -79,6 +79,33 @@ def test_transform_grocery_retail_points_keeps_fascia_icon_category(): ] +def test_transform_grocery_retail_points_accepts_base_fascias(): + raw = pl.DataFrame( + { + "id": [101, 102, 103, 104], + "retailer": ["Aldi", "Asda", "Booths", "Whole Foods Market"], + "fascia": ["Aldi", "Asda Superstore", "Booths", "Whole Foods Market"], + "store_name": [ + "Aldi Test", + "Asda Test Superstore", + "Booths Test", + "Whole Foods Test", + ], + "long_wgs": [-0.141, -0.142, -0.143, -0.144], + "lat_wgs": [51.515, 51.516, 51.517, 51.518], + } + ) + + pois = transform_grocery_retail_points(raw) + + assert pois.select("category", "icon_category").to_dicts() == [ + {"category": "Aldi", "icon_category": "Aldi"}, + {"category": "Asda", "icon_category": "Asda Superstore"}, + {"category": "Booths", "icon_category": "Booths"}, + {"category": "Whole Foods Market", "icon_category": "Whole Foods Market"}, + ] + + def test_transform_grocery_retail_points_drops_invalid_rows(): raw = pl.DataFrame( { diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index 831d6d2..5c9b237 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -1078,19 +1078,40 @@ COOP_RETAILERS = { } GROCERY_RETAILER_DISPLAY_NAMES: dict[str, str] = { + "Aldi": "Aldi", + "Asda": "Asda", + "Booths": "Booths", + "Budgens": "Budgens", + "Centra": "Centra", "Cook": "COOK", + "Costco": "Costco", + "Dunnes Stores": "Dunnes Stores", + "Farmfoods": "Farmfoods", "Heron": "Heron Foods", + "Iceland": "Iceland", + "Lidl": "Lidl", + "Makro": "Makro", "Marks and Spencer": "M&S", + "Morrisons": "Morrisons", + "Planet Organic": "Planet Organic", "Sainsburys": "Sainsbury's", + "Spar": "Spar", + "Tesco": "Tesco", + "Waitrose": "Waitrose", + "Whole Foods Market": "Whole Foods Market", **{retailer: "Co-op" for retailer in COOP_RETAILERS}, } GROCERY_FASCIA_ICON_NAMES: dict[str, str] = { + **GROCERY_RETAILER_DISPLAY_NAMES, "Aldi Local": "Aldi", "Asda Express": "Asda Express", "Asda Living": "Asda Living", "Asda PFS": "Asda PFS", + "Asda Supercentre": "Asda Supercentre", + "Asda Supermarket": "Asda Supermarket", + "Asda Superstore": "Asda Superstore", "Cooltrader": "Heron Foods", "Co-op Food": "Co-op", "Cook": "COOK", @@ -1112,6 +1133,7 @@ GROCERY_FASCIA_ICON_NAMES: dict[str, str] = { "Marks and Spencer Travel SF": "M&S Food", "Morrisons Daily": "Morrisons Daily", "Morrisons Select": "Morrisons", + "Sainsbury's Local": "Sainsbury's Local", "Sainsburys": "Sainsbury's", "Sainsburys Local": "Sainsbury's Local", "Spar PFS": "Spar", @@ -1128,12 +1150,18 @@ GROCERY_FASCIA_ICON_NAMES: dict[str, str] = { def normalize_grocery_retailer(retailer: str | None) -> str: if retailer is None: return "" - return GROCERY_RETAILER_DISPLAY_NAMES.get(retailer, retailer) + display_name = GROCERY_RETAILER_DISPLAY_NAMES.get(retailer) + if display_name is None: + raise ValueError(f"Missing grocery retailer display name for {retailer!r}") + return display_name def normalize_grocery_icon_category(fascia: str | None, retailer: str | None) -> str: if fascia: - return GROCERY_FASCIA_ICON_NAMES.get(fascia, normalize_grocery_retailer(fascia)) + icon_name = GROCERY_FASCIA_ICON_NAMES.get(fascia) + if icon_name is None: + raise ValueError(f"Missing grocery fascia icon name for {fascia!r}") + return icon_name return normalize_grocery_retailer(retailer) diff --git a/pipeline/utils/poi_counts.py b/pipeline/utils/poi_counts.py index 8685d66..cdf9bfa 100644 --- a/pipeline/utils/poi_counts.py +++ b/pipeline/utils/poi_counts.py @@ -2,9 +2,12 @@ import numpy as np import polars as pl +from scipy.spatial import cKDTree from .haversine import haversine_km +EARTH_RADIUS_KM = 6371.0088 + def _build_poi_grid( pois: pl.DataFrame, grid_size: float = 0.05 @@ -49,6 +52,21 @@ def _get_nearby_indices( return np.concatenate(nearby_indices) +def _project_lat_lng_km( + lats: np.ndarray, lngs: np.ndarray, origin_lat: float +) -> np.ndarray: + """Project WGS84 coordinates to local km coordinates for nearest-neighbour lookup.""" + lat_rad = np.radians(lats) + lng_rad = np.radians(lngs) + origin_lat_rad = np.radians(origin_lat) + return np.column_stack( + ( + EARTH_RADIUS_KM * lng_rad * np.cos(origin_lat_rad), + EARTH_RADIUS_KM * lat_rad, + ) + ) + + def count_pois_per_postcode( postcodes_df: pl.DataFrame, pois: pl.DataFrame, @@ -136,7 +154,7 @@ def min_distance_per_postcode( ) -> pl.DataFrame: """ For each postcode, compute the distance (km) to the closest POI per group. - Returns NaN where no POI of that group exists within the grid search range (~5.5km). + Returns NaN where no POI of that group exists. """ print("Computing minimum POI distances per postcode...") @@ -144,51 +162,84 @@ def min_distance_per_postcode( n_pois = len(pois) print(f" {n_postcodes:,} postcodes, {n_pois:,} POIs") - grid_size = 0.05 - print(" Building POI spatial grid...") - poi_lats, poi_lngs, poi_cats, poi_grid = _build_poi_grid(pois, grid_size) - print(f" POI grid has {len(poi_grid):,} occupied cells") - - category_masks = {} - for group, categories in groups.items(): - mask = np.isin(poi_cats, categories) - category_masks[group] = mask - print(f" {group}: {mask.sum():,} POIs") - pc_lats = postcodes_df["lat"].to_numpy() pc_lons = postcodes_df["lon"].to_numpy() pc_codes = postcodes_df["postcode"].to_list() + valid_pc_mask = np.isfinite(pc_lats) & np.isfinite(pc_lons) + valid_pc_indices = np.flatnonzero(valid_pc_mask) result_min_dist = { group: np.full(n_postcodes, np.nan, dtype=np.float32) for group in groups } - batch_size = 50000 - n_batches = (n_postcodes + batch_size - 1) // batch_size - print(f" Processing {n_postcodes:,} postcodes in {n_batches} batches...") + if n_pois == 0 or len(valid_pc_indices) == 0: + print(" No valid postcode/POI coordinates; returning NaN distances") + return pl.DataFrame( + { + "postcode": pc_codes, + **{ + f"{group}_nearest_km": values + for group, values in result_min_dist.items() + }, + } + ) - for batch_idx in range(n_batches): - start_idx = batch_idx * batch_size - end_idx = min(start_idx + batch_size, n_postcodes) + poi_lats = pois["lat"].to_numpy() + poi_lngs = pois["lng"].to_numpy() + poi_cats = pois["category"].to_numpy() + valid_poi_mask = np.isfinite(poi_lats) & np.isfinite(poi_lngs) + origin_lat = float(np.nanmean(pc_lats[valid_pc_mask])) + query_xy = _project_lat_lng_km( + pc_lats[valid_pc_indices], pc_lons[valid_pc_indices], origin_lat + ) - if batch_idx % 5 == 0: - print( - f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}" - ) + batch_size = 200_000 + n_batches = (len(valid_pc_indices) + batch_size - 1) // batch_size - for i in range(start_idx, end_idx): - nearby = _get_nearby_indices(pc_lats[i], pc_lons[i], poi_grid, grid_size) - if nearby is None: - continue + for group, categories in groups.items(): + group_indices = np.flatnonzero(valid_poi_mask & np.isin(poi_cats, categories)) + print(f" {group}: {len(group_indices):,} POIs") + if len(group_indices) == 0: + continue - distances = haversine_km( - poi_lats[nearby], poi_lngs[nearby], pc_lats[i], pc_lons[i] - ) + poi_xy = _project_lat_lng_km( + poi_lats[group_indices], poi_lngs[group_indices], origin_lat + ) + tree = cKDTree(poi_xy) + k = min(8, len(group_indices)) - for group, cat_mask in category_masks.items(): - group_mask = cat_mask[nearby] - if group_mask.any(): - result_min_dist[group][i] = distances[group_mask].min() + for batch_idx in range(n_batches): + start_idx = batch_idx * batch_size + end_idx = min(start_idx + batch_size, len(valid_pc_indices)) + batch_pc_indices = valid_pc_indices[start_idx:end_idx] + batch_xy = query_xy[start_idx:end_idx] + + if batch_idx == 0 or (batch_idx + 1) % 5 == 0: + print( + f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}" + ) + + _, nearest = tree.query(batch_xy, k=k) + nearest = np.asarray(nearest) + + if k == 1: + candidate_indices = group_indices[nearest] + distances = haversine_km( + poi_lats[candidate_indices], + poi_lngs[candidate_indices], + pc_lats[batch_pc_indices], + pc_lons[batch_pc_indices], + ) + else: + candidate_indices = group_indices[nearest] + distances = haversine_km( + poi_lats[candidate_indices], + poi_lngs[candidate_indices], + pc_lats[batch_pc_indices, None], + pc_lons[batch_pc_indices, None], + ).min(axis=1) + + result_min_dist[group][batch_pc_indices] = distances.astype(np.float32) result_data = {"postcode": pc_codes} for group in groups: diff --git a/pipeline/utils/test_poi_counts.py b/pipeline/utils/test_poi_counts.py index 7e7ad96..6d61ccb 100644 --- a/pipeline/utils/test_poi_counts.py +++ b/pipeline/utils/test_poi_counts.py @@ -113,9 +113,9 @@ def test_min_distance_finds_nearest(postcodes, pois): # Restaurant is co-located — distance ~0 assert ec1a["restaurants_nearest_km"][0] < 0.01 - # Far-away postcode should have NaN (no POIs within grid range) + # Far-away postcode should still get the global nearest distance. zz99 = result.filter(pl.col("postcode") == "ZZ99 9ZZ") - assert np.isnan(zz99["train_tube_nearest_km"][0]) + assert zz99["train_tube_nearest_km"][0] > 300 def test_min_distance_no_pois_returns_nan(postcodes): diff --git a/r5-java/run.sh b/r5-java/run.sh index 10adce7..88209be 100755 --- a/r5-java/run.sh +++ b/r5-java/run.sh @@ -111,20 +111,24 @@ fi # R5 writes .mapdb temp files next to OSM/GTFS files during network construction. # Copy source data to a writable build dir to avoid polluting the originals. mkdir -p "$NETWORK_DIR" +OSM_PBF="property-data/england-latest.osm.pbf" TRANSIT_SRC="property-data/transit" -NETWORK_DATA_DIR="$TRANSIT_SRC" +NETWORK_DATA_DIR="$NETWORK_DIR/build" if [ ! -f "$NETWORK_DIR/network.dat" ]; then BUILD_DIR="$NETWORK_DIR/build" echo "--- No cached network — copying transit data to build dir ---" mkdir -p "$BUILD_DIR" - if ! cp "$TRANSIT_SRC"/raw/*.osm.pbf "$BUILD_DIR/" 2>/dev/null; then - echo "Warning: no .osm.pbf files found in $TRANSIT_SRC/raw/" + if [ ! -f "$OSM_PBF" ]; then + echo "Error: OSM PBF not found at $OSM_PBF" + echo "Download it from https://download.geofabrik.de/europe/united-kingdom/england-latest.osm.pbf" + exit 1 fi + cp "$OSM_PBF" "$BUILD_DIR/" if ! cp "$TRANSIT_SRC"/*.zip "$BUILD_DIR/" 2>/dev/null; then - echo "Warning: no .zip files found in $TRANSIT_SRC/" + echo "Warning: no GTFS .zip files found in $TRANSIT_SRC/ — transit routing would be unavailable" + exit 1 fi - NETWORK_DATA_DIR="$BUILD_DIR" fi # --- Step 5: Run batch --- diff --git a/r5-java/src/main/java/propertymap/Parquet.java b/r5-java/src/main/java/propertymap/Parquet.java index cb16f6c..f784508 100644 --- a/r5-java/src/main/java/propertymap/Parquet.java +++ b/r5-java/src/main/java/propertymap/Parquet.java @@ -32,7 +32,7 @@ public class Parquet { static Postcodes loadEnglandPostcodes(String parquetPath, Path refOut) throws Exception { try (DuckDBConnection conn = connect(); Statement stmt = conn.createStatement()) { stmt.execute("CREATE TABLE postcodes AS SELECT pcds, lat, \"long\" FROM read_parquet('" - + escapePath(parquetPath) + "') WHERE ctry = 'E92000001' AND doterm IS NULL"); + + escapePath(parquetPath) + "') WHERE ctry25cd = 'E92000001' AND doterm IS NULL"); copyToParquet(stmt, "SELECT * FROM postcodes", refOut); try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM postcodes")) {