From 4506263e5b5f3071841ba7c004cf970bfb07f9e9 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 7 Feb 2026 19:28:57 +0000 Subject: [PATCH] Add postcode mapping --- Taskfile.data.yml | 41 ++ pipeline/download/inspire.py | 97 +++ pipeline/download/oa_boundaries.py | 36 ++ pipeline/download/uprn_lookup.py | 77 +++ pipeline/transform/postcode_boundaries.py | 715 ++++++++++++++++++++++ 5 files changed, 966 insertions(+) create mode 100644 pipeline/download/inspire.py create mode 100644 pipeline/download/oa_boundaries.py create mode 100644 pipeline/download/uprn_lookup.py create mode 100644 pipeline/transform/postcode_boundaries.py diff --git a/Taskfile.data.yml b/Taskfile.data.yml index adcceef..5118d7f 100644 --- a/Taskfile.data.yml +++ b/Taskfile.data.yml @@ -25,6 +25,10 @@ vars: POSTCODES_OUTPUT: "{{.DATA_DIR}}/postcodes" GEOSURE_OUTPUT: "{{.DATA_DIR}}/geosure" GEOSURE_PARQUET: "{{.DATA_DIR}}/geosure.parquet" + INSPIRE_OUTPUT: "{{.DATA_DIR}}/inspire" + OA_BOUNDARIES_OUTPUT: "{{.DATA_DIR}}/oa_boundaries.gpkg" + UPRN_LOOKUP_OUTPUT: "{{.DATA_DIR}}/uprn_lookup.parquet" + POSTCODE_BOUNDARIES_OUTPUT: "{{.DATA_DIR}}/postcode_boundaries" tasks: download:tiles: @@ -216,6 +220,43 @@ tasks: cmds: - uv run python -m pipeline.transform.transform_geosure --geosure {{.GEOSURE_OUTPUT}} --arcgis {{.ARCGIS_OUTPUT}} --output {{.GEOSURE_PARQUET}} + download:inspire: + desc: Download INSPIRE Index Polygon GML files from HM Land Registry + status: + - test -d {{.INSPIRE_OUTPUT}} + cmds: + - uv run python -m pipeline.download.inspire --output {{.INSPIRE_OUTPUT}} + + download:oa-boundaries: + desc: Download Output Areas (2021) boundary polygons (England & Wales) + status: + - test -f {{.OA_BOUNDARIES_OUTPUT}} + cmds: + - uv run python -m pipeline.download.oa_boundaries --output {{.OA_BOUNDARIES_OUTPUT}} + + download:uprn-lookup: + desc: Download National Statistics UPRN Lookup and convert to parquet + status: + - test -f {{.UPRN_LOOKUP_OUTPUT}} + cmds: + - uv run python -m pipeline.download.uprn_lookup --output {{.UPRN_LOOKUP_OUTPUT}} + + transform:postcode-boundaries: + desc: Generate postcode boundary polygons from OA boundaries + INSPIRE + UPRNs + deps: + - download:oa-boundaries + - download:inspire + - download:uprn-lookup + status: + - test -d {{.POSTCODE_BOUNDARIES_OUTPUT}}/units + cmds: + - >- + uv run python -m pipeline.transform.postcode_boundaries + --uprn {{.UPRN_LOOKUP_OUTPUT}} + --oa-boundaries {{.OA_BOUNDARIES_OUTPUT}} + --inspire {{.INSPIRE_OUTPUT}} + --output {{.POSTCODE_BOUNDARIES_OUTPUT}} + download:journey-times: desc: "Fetch TfL journey times: task download:journey-times" deps: diff --git a/pipeline/download/inspire.py b/pipeline/download/inspire.py new file mode 100644 index 0000000..72b2e8b --- /dev/null +++ b/pipeline/download/inspire.py @@ -0,0 +1,97 @@ +"""Download INSPIRE Index Polygons from HM Land Registry. + +Downloads GML files for all local authorities from the INSPIRE download page. +Each ZIP contains a GML file with title extent polygons for that authority. + +Source: https://use-land-property-data.service.gov.uk/datasets/inspire/download +License: INSPIRE End User Licence +""" + +import argparse +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +import httpx +from tqdm import tqdm + +BASE = "https://use-land-property-data.service.gov.uk" +INDEX_URL = f"{BASE}/datasets/inspire/download" + + +def get_zip_urls() -> list[str]: + """Scrape the INSPIRE download page for all .zip hrefs.""" + # The site requires a cookie jar to avoid redirect loops. + with httpx.Client( + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=60), + headers={"User-Agent": "Mozilla/5.0", "Accept": "text/html"}, + ) as client: + resp = client.get(INDEX_URL) + resp.raise_for_status() + html = resp.text + + pattern = r'href="(/datasets/inspire/download/[^"]+\.zip)"' + paths = sorted(set(re.findall(pattern, html))) + return [f"{BASE}{p}" for p in paths] + + +def download_one(url: str, output_dir: Path, client: httpx.Client) -> str: + """Download a single ZIP file. Returns the filename.""" + name = url.rsplit("/", 1)[-1] + dest = output_dir / name + if dest.exists(): + return f"{name} (skipped, exists)" + + resp = client.get(url) + resp.raise_for_status() + dest.write_bytes(resp.content) + return name + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download INSPIRE Index Polygon GML files" + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Output directory for downloaded ZIPs", + ) + parser.add_argument( + "--workers", + type=int, + default=8, + help="Number of parallel downloads (default: 8)", + ) + args = parser.parse_args() + + args.output.mkdir(parents=True, exist_ok=True) + + print("Fetching download index...") + urls = get_zip_urls() + print(f"Found {len(urls)} files to download") + + with ( + httpx.Client( + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=120), + headers={"User-Agent": "Mozilla/5.0"}, + ) as client, + tqdm(total=len(urls), unit="file") as pbar, + ): + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = { + pool.submit(download_one, url, args.output, client): url for url in urls + } + for future in as_completed(futures): + result = future.result() + pbar.set_postfix_str(result[:40]) + pbar.update(1) + + print(f"Done. {len(urls)} files in {args.output}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/download/oa_boundaries.py b/pipeline/download/oa_boundaries.py new file mode 100644 index 0000000..8639c66 --- /dev/null +++ b/pipeline/download/oa_boundaries.py @@ -0,0 +1,36 @@ +"""Download Output Areas (December 2021) Boundaries EW BGC (V2). + +Generalised clipped (20m) boundary polygons for 2021 Census Output Areas +covering England and Wales. + +Source: https://open-geography-portalx-ons.hub.arcgis.com/datasets/ons::output-areas-december-2021-boundaries-ew-bgc-v2 +License: Open Government Licence v3.0 +""" + +import argparse +from pathlib import Path + +from pipeline.utils import download + +URL = "https://open-geography-portalx-ons.hub.arcgis.com/api/download/v1/items/6beafcfd9b9c4c9993a06b6b199d7e6d/geoPackage?layers=0" + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download OA 2021 boundary polygons (England & Wales)" + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Output GeoPackage file path", + ) + args = parser.parse_args() + + args.output.parent.mkdir(parents=True, exist_ok=True) + download(URL, args.output, timeout=600) + print(f"Saved to {args.output}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/download/uprn_lookup.py b/pipeline/download/uprn_lookup.py new file mode 100644 index 0000000..ce644fd --- /dev/null +++ b/pipeline/download/uprn_lookup.py @@ -0,0 +1,77 @@ +"""Download National Statistics UPRN Lookup (December 2025, Epoch 123). + +Maps Unique Property Reference Numbers (UPRNs) to administrative and +statistical geographies (wards, output areas, LSOAs, etc.) across GB. + +Source: https://geoportal.statistics.gov.uk/datasets/ons::national-statistics-uprn-lookup-december-2025-epoch-123 +License: Contains Royal Mail, Ordnance Survey, and ONS data (OGL v3.0) +""" + +import argparse +import tempfile +from pathlib import Path + +import polars as pl + +from pipeline.utils import download, extract_zip + +URL = "https://www.arcgis.com/sharing/rest/content/items/4e0b4b3fbc2540caae27e7be532e61be/data" + + +def find_csvs(extract_path: Path) -> list[Path]: + """Find all NSUL regional CSVs inside the extracted archive.""" + csvs = sorted(extract_path.rglob("NSUL_*.csv")) + if not csvs: + csvs = sorted(extract_path.rglob("*.csv")) + if not csvs: + raise FileNotFoundError(f"No CSV files found in {extract_path}") + print(f"Found {len(csvs)} CSV(s):") + for f in csvs: + print(f" {f.name}") + return csvs + + +def convert_to_parquet(csv_paths: list[Path], parquet_path: Path) -> None: + # Some regional files infer different types for the same column (e.g. + # ruc21ind is String in most but Int64 in YH). Read all code columns as + # String to avoid schema mismatches. + CODE_COLS = { + "ruc21ind": pl.String, + "oac21ind": pl.String, + "imd19ind": pl.String, + } + df = pl.concat( + [ + pl.scan_csv(p, try_parse_dates=True, schema_overrides=CODE_COLS) + for p in csv_paths + ] + ) + print(f"Columns: {df.collect_schema().names()}") + parquet_path.parent.mkdir(parents=True, exist_ok=True) + df.sink_parquet(parquet_path, compression="zstd") + n = pl.scan_parquet(parquet_path).select(pl.len()).collect().item() + print(f"Saved {n:,} rows to {parquet_path}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download National Statistics UPRN Lookup" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + + with tempfile.TemporaryDirectory() as cache_dir: + zip_path = Path(cache_dir) / "uprn_lookup.zip" + extract_path = Path(cache_dir) / "uprn_extracted" + + download(URL, zip_path, timeout=600) + extract_zip(zip_path, extract_path) + + csv_paths = find_csvs(extract_path) + convert_to_parquet(csv_paths, args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/postcode_boundaries.py b/pipeline/transform/postcode_boundaries.py new file mode 100644 index 0000000..8bfb853 --- /dev/null +++ b/pipeline/transform/postcode_boundaries.py @@ -0,0 +1,715 @@ +"""Generate postcode boundary polygons from OA boundaries, INSPIRE parcels, and UPRN data. + +Produces per-district GeoJSON files compatible with the Rust server's postcode loader. +Each postcode gets a polygon (or MultiPolygon) guaranteed to be contained within its +Output Area(s), with 100% OA coverage and no overlaps between postcodes within an OA. + +Algorithm per OA: + 1. Single-postcode OA → entire OA polygon assigned to that postcode + 2. Multi-postcode OA: + a. Assign INSPIRE parcels to postcodes via UPRN point-in-polygon majority vote + b. Union INSPIRE parcels per postcode, clip to OA → "claimed" area + c. Distribute remaining (unclaimed) OA area via Voronoi of UPRN points + d. Final polygon = claimed + Voronoi share + +Memory-efficient design (<12GB total): + - INSPIRE polygons stored as raw coordinate bytes in parquet; Shapely objects built + lazily per-OA via numpy bbox pre-filter (~100-500 candidates at a time) + - UPRNs kept as sorted polars DataFrame with offset dict (Arrow storage, ~1.2GB) + - OA processing runs sequentially (no multiprocess INSPIRE duplication) + +Output format: {output}/units/{DISTRICT}.geojson with properties.postcodes and +properties.mapit_code fields matching server-rs/src/data/postcodes.rs expectations. +""" + +import argparse +import ctypes +import gc +import json +import sqlite3 +import zipfile +from collections import Counter, defaultdict +from pathlib import Path +from xml.etree.ElementTree import iterparse + +import numpy as np +import polars as pl +from pyproj import Transformer +from scipy.spatial import Voronoi +from shapely import STRtree, make_valid, wkb +from shapely.geometry import MultiPolygon, Polygon +from shapely.ops import unary_union +from tqdm import tqdm + +# --------------------------------------------------------------------------- +# GeoPackage helpers +# --------------------------------------------------------------------------- + + +def _release_memory() -> None: + """Force Python + glibc to release freed memory back to the OS.""" + gc.collect() + try: + ctypes.CDLL("libc.so.6").malloc_trim(0) + except OSError: + pass + + +_ENVELOPE_SIZES = {0: 0, 1: 32, 2: 48, 3: 48, 4: 64} + + +def parse_gpkg_geometry(blob: bytes): + """Extract a Shapely geometry from a GeoPackage binary blob.""" + flags = blob[3] + envelope_type = (flags >> 1) & 0x07 + header_size = 8 + _ENVELOPE_SIZES[envelope_type] + return wkb.loads(blob[header_size:]) + + +def load_oa_boundaries(gpkg_path: Path) -> dict[str, Polygon | MultiPolygon]: + """Load OA boundary polygons from a GeoPackage. Geometry is already in BNG.""" + print("Loading OA boundaries...") + + conn = sqlite3.connect(str(gpkg_path)) + cur = conn.cursor() + cur.execute("SELECT OA21CD, SHAPE FROM OA_2021_EW_BGC_V2") + + oa_geoms: dict[str, Polygon | MultiPolygon] = {} + for oa_code, blob in cur: + geom = parse_gpkg_geometry(bytes(blob)) + if geom.geom_type == "MultiPolygon" and len(geom.geoms) == 1: + geom = geom.geoms[0] + oa_geoms[oa_code] = geom + + conn.close() + print(f" Loaded {len(oa_geoms)} OA boundaries") + return oa_geoms + + +# --------------------------------------------------------------------------- +# UPRN loading (memory-efficient: sorted polars DataFrame + offset dict) +# --------------------------------------------------------------------------- + + +def load_uprns(uprn_path: Path) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]: + """Load UPRNs as a sorted polars DataFrame with OA offset lookup. + + Returns (df, offsets) where offsets[oa_code] = (start_row, end_row). + Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings). + """ + import tempfile + + print("Loading UPRN lookup...") + + # Sort via streaming sink to avoid polars doubling memory during in-memory sort + with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: + tmp_path = Path(tmp.name) + ( + pl.scan_parquet(uprn_path) + .select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") + .filter(~pl.col("OA21CD").str.starts_with("S")) + .filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null()) + .filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "")) + .with_columns(pl.col("PCDS").str.strip_chars()) + .sort("OA21CD") + .sink_parquet(tmp_path) + ) + _release_memory() + + # Read the sorted data — only one copy in memory (~2GB) + df = pl.read_parquet(tmp_path) + tmp_path.unlink() + n = len(df) + print(f" Loaded {n:,} UPRNs (England & Wales)") + + # Compute OA group offsets using polars (avoids 37M Python string creation) + boundary_df = ( + df.lazy() + .with_row_index("_i") + .filter(pl.col("OA21CD") != pl.col("OA21CD").shift(1)) + .select("_i", "OA21CD") + .collect() + ) + starts_list = boundary_df["_i"].to_list() + oa_list = boundary_df["OA21CD"].to_list() + del boundary_df + offsets: dict[str, tuple[int, int]] = {} + for j in range(len(starts_list)): + end = starts_list[j + 1] if j + 1 < len(starts_list) else n + offsets[oa_list[j]] = (starts_list[j], end) + del starts_list, oa_list + + # Drop OA column (no longer needed) to save ~400MB + df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS") + _release_memory() + + print(f" Grouped into {len(offsets)} OAs") + return df, offsets + + +def get_oa_uprns( + df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str +) -> tuple[np.ndarray, list[str]]: + """Get UPRN coordinates and postcodes for a single OA. + + Returns (points_nx2, postcodes_list). + """ + s, e = offsets[oa_code] + sub = df[s:e] + points = np.column_stack( + [ + sub["GRIDGB1E"].to_numpy(), + sub["GRIDGB1N"].to_numpy(), + ] + ) + postcodes = sub["PCDS"].to_list() + return points, postcodes + + +# --------------------------------------------------------------------------- +# INSPIRE GML parsing and caching +# --------------------------------------------------------------------------- + +_GML_NS = "{http://www.opengis.net/gml/3.2}" +_LR_NS = "{www.landregistry.gov.uk}" + + +def parse_inspire_zip(zip_path: Path) -> list[np.ndarray]: + """Parse a single INSPIRE ZIP → list of Nx2 coordinate arrays (easting, northing).""" + results = [] + with zipfile.ZipFile(zip_path) as zf: + gml_names = [n for n in zf.namelist() if n.endswith(".gml")] + if not gml_names: + return results + with zf.open(gml_names[0]) as f: + for event, elem in iterparse(f, events=("end",)): + if elem.tag != f"{_LR_NS}PREDEFINED": + continue + pos_list = elem.find(f".//{_GML_NS}posList") + if pos_list is not None and pos_list.text: + vals = pos_list.text.split() + n = len(vals) // 2 + if n >= 3: + coords = np.array(vals, dtype=np.float64).reshape(n, 2) + results.append(coords) + elem.clear() + return results + + +def cache_inspire(inspire_dir: Path, cache_dir: Path) -> None: + """Parse all INSPIRE ZIPs and cache as memory-mappable binary files. + + Processes ZIPs sequentially to keep memory under control (~2GB peak). + Each ZIP's polygons are streamed to disk immediately after parsing. + + Writes three files: + - inspire_bboxes.npy: float64 array (N, 4) of [min_e, min_n, max_e, max_n] + - inspire_offsets.npy: int64 array (N, 2) of [byte_offset, n_points] + - inspire_coords.bin: flat binary of all float64 coordinate pairs + """ + zip_files = sorted(inspire_dir.glob("*.zip")) + print( + f"Parsing {len(zip_files)} INSPIRE ZIP files (sequential, streaming to disk)..." + ) + cache_dir.mkdir(parents=True, exist_ok=True) + + # Pre-allocate arrays for bboxes and offsets (grow if needed) + capacity = 25_000_000 + bboxes = np.empty((capacity, 4), dtype=np.float64) + offsets = np.empty((capacity, 2), dtype=np.int64) + count = 0 + byte_offset = 0 + + coords_path = cache_dir / "inspire_coords.bin" + with open(coords_path, "wb") as cf: + for zip_path in tqdm(zip_files, desc="INSPIRE ZIPs", unit="file"): + for coords in parse_inspire_zip(zip_path): + if count >= capacity: + capacity = int(capacity * 1.5) + bboxes.resize((capacity, 4), refcheck=False) + offsets.resize((capacity, 2), refcheck=False) + bboxes[count, 0] = coords[:, 0].min() + bboxes[count, 1] = coords[:, 1].min() + bboxes[count, 2] = coords[:, 0].max() + bboxes[count, 3] = coords[:, 1].max() + offsets[count, 0] = byte_offset + offsets[count, 1] = len(coords) + raw = coords.astype(np.float64).tobytes() + cf.write(raw) + byte_offset += len(raw) + count += 1 + + # Trim to actual size and save + bboxes = bboxes[:count] + offsets = offsets[:count] + np.save(cache_dir / "inspire_bboxes.npy", bboxes) + np.save(cache_dir / "inspire_offsets.npy", offsets) + size_mb = byte_offset / (1024 * 1024) + print(f" Cached {count:,} INSPIRE polygons (coords: {size_mb:.0f} MB)") + + +def _inspire_cache_exists(cache_dir: Path) -> bool: + return all( + (cache_dir / f).exists() + for f in ("inspire_bboxes.npy", "inspire_offsets.npy", "inspire_coords.bin") + ) + + +def load_inspire( + cache_dir: Path, +) -> tuple[np.ndarray, np.ndarray, np.memmap]: + """Load INSPIRE cache → (bboxes, offsets, coords_mmap). + + Memory usage: ~1.1GB (bboxes ~777MB + offsets ~290MB, coords memory-mapped). + """ + print(f"Loading INSPIRE cache from {cache_dir}...") + bboxes = np.load(cache_dir / "inspire_bboxes.npy") + offsets = np.load(cache_dir / "inspire_offsets.npy") + coords_mmap = np.memmap( + cache_dir / "inspire_coords.bin", dtype=np.float64, mode="r" + ) + print( + f" Loaded {len(bboxes):,} INSPIRE polygon bboxes (~{bboxes.nbytes // (1024 * 1024)} MB)" + ) + print(f" Offsets: ~{offsets.nbytes // (1024 * 1024)} MB, coords: memory-mapped") + return bboxes, offsets, coords_mmap + + +def get_inspire_candidates( + oa_bounds: tuple[float, float, float, float], + bboxes: np.ndarray, + offsets: np.ndarray, + coords_mmap: np.memmap, +) -> list[Polygon]: + """Get INSPIRE polygons overlapping an OA via bbox pre-filter. + + Builds Shapely objects only for matches (typically 10-500 per OA). + Reads coordinate data on-demand from memory-mapped file. + """ + min_e, min_n, max_e, max_n = oa_bounds + + # Vectorized bbox overlap test + mask = ( + (bboxes[:, 2] >= min_e) + & (bboxes[:, 0] <= max_e) + & (bboxes[:, 3] >= min_n) + & (bboxes[:, 1] <= max_n) + ) + idxs = np.where(mask)[0] + if len(idxs) == 0: + return [] + + # Build Shapely polygons only for candidates (coords from mmap) + candidates = [] + for i in idxs: + byte_offset = offsets[i, 0] + n_pts = offsets[i, 1] + float_offset = byte_offset // 8 # float64 = 8 bytes + coords = coords_mmap[float_offset : float_offset + n_pts * 2].reshape(-1, 2) + poly = Polygon(coords) + if not poly.is_valid: + poly = make_valid(poly) + if poly.geom_type == "MultiPolygon": + poly = max(poly.geoms, key=lambda g: g.area) + elif poly.geom_type != "Polygon": + continue + if not poly.is_empty: + candidates.append(poly) + return candidates + + +# --------------------------------------------------------------------------- +# Voronoi computation +# --------------------------------------------------------------------------- + + +def compute_voronoi_regions( + points: np.ndarray, postcodes: list[str], boundary: Polygon | MultiPolygon +) -> dict[str, Polygon | MultiPolygon]: + """Compute Voronoi regions for points, clipped to boundary, grouped by postcode.""" + if len(points) == 0: + return {} + if len(points) == 1: + return {postcodes[0]: boundary} + + # Deduplicate points per postcode (flats at same coords) + seen: dict[tuple[float, float], str] = {} + unique_pts = [] + unique_pcs = [] + for i in range(len(points)): + key = (points[i, 0], points[i, 1]) + if key not in seen: + seen[key] = postcodes[i] + unique_pts.append(points[i]) + unique_pcs.append(postcodes[i]) + + if len(unique_pts) == 1: + return {unique_pcs[0]: boundary} + + pts = np.array(unique_pts) + min_e, min_n = pts.min(axis=0) + max_e, max_n = pts.max(axis=0) + span = max(max_e - min_e, max_n - min_n, 100) + + dummy = np.array( + [ + [min_e - span * 10, min_n - span * 10], + [max_e + span * 10, min_n - span * 10], + [min_e - span * 10, max_n + span * 10], + [max_e + span * 10, max_n + span * 10], + ] + ) + all_points = np.vstack([pts, dummy]) + + try: + vor = Voronoi(all_points) + except Exception: + return {unique_pcs[0]: boundary} + + n_real = len(pts) + pc_polys: dict[str, list[Polygon]] = defaultdict(list) + + for i in range(n_real): + region_idx = vor.point_region[i] + region = vor.regions[region_idx] + if -1 in region or len(region) < 3: + continue + vertices = vor.vertices[region] + poly = Polygon(vertices) + if not poly.is_valid: + poly = make_valid(poly) + clipped = poly.intersection(boundary) + if not clipped.is_empty: + pc_polys[unique_pcs[i]].append(clipped) + + result = {} + for pc, parts in pc_polys.items(): + merged = unary_union(parts) + if not merged.is_empty: + result[pc] = merged + return result + + +# --------------------------------------------------------------------------- +# Per-OA processing +# --------------------------------------------------------------------------- + + +def process_oa( + oa_geom: Polygon | MultiPolygon, + points: np.ndarray, + postcodes: list[str], + inspire_candidates: list[Polygon], +) -> list[tuple[str, Polygon | MultiPolygon]]: + """Process a single OA → list of (postcode, geometry) fragments.""" + unique_pcs = set(postcodes) + if len(unique_pcs) == 1: + return [(next(iter(unique_pcs)), oa_geom)] + + # Try INSPIRE-based assignment + claimed: dict[str, Polygon | MultiPolygon] = {} + + if inspire_candidates: + cand_tree = STRtree(inspire_candidates) + + from shapely import points as shp_points + + uprn_pts = shp_points(points) + pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects") + + # Majority vote per candidate polygon + cand_postcodes: dict[int, list[str]] = defaultdict(list) + for pi, ci in zip(pt_idx, cand_idx): + cand_postcodes[ci].append(postcodes[pi]) + + pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list) + for ci, pc_list in cand_postcodes.items(): + winner = Counter(pc_list).most_common(1)[0][0] + pc_inspire_polys[winner].append(inspire_candidates[ci]) + + for pc, polys in pc_inspire_polys.items(): + merged = unary_union(polys) + clipped = merged.intersection(oa_geom) + if not clipped.is_empty: + if not clipped.is_valid: + clipped = make_valid(clipped) + claimed[pc] = clipped + + # Compute remaining area + if claimed: + all_claimed = unary_union(list(claimed.values())) + if not all_claimed.is_valid: + all_claimed = make_valid(all_claimed) + remaining = oa_geom.difference(all_claimed) + if not remaining.is_valid: + remaining = make_valid(remaining) + else: + remaining = oa_geom + + # Distribute remaining area via Voronoi + if not remaining.is_empty and remaining.area > 0.01: + voronoi_result = compute_voronoi_regions(points, postcodes, remaining) + else: + voronoi_result = {} + + # Combine claimed + voronoi + result: dict[str, list] = defaultdict(list) + for pc, geom in claimed.items(): + result[pc].append(geom) + for pc, geom in voronoi_result.items(): + result[pc].append(geom) + + fragments = [] + for pc, parts in result.items(): + merged = unary_union(parts) + if not merged.is_empty: + if not merged.is_valid: + merged = make_valid(merged) + fragments.append((pc, merged)) + + return fragments + + +# --------------------------------------------------------------------------- +# Output: merge fragments and write GeoJSON +# --------------------------------------------------------------------------- + +_to_wgs84 = None + + +def _get_to_wgs84(): + global _to_wgs84 + if _to_wgs84 is None: + _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) + return _to_wgs84 + + +def to_wgs84_geojson( + geom: Polygon | MultiPolygon, tolerance: float = 1.0 +) -> dict | None: + """Simplify geometry in BNG, convert to WGS84, return GeoJSON dict.""" + if geom.is_empty: + return None + + simplified = geom.simplify(tolerance, preserve_topology=True) + if simplified.is_empty: + return None + + transformer = _get_to_wgs84() + + def transform_ring(coords): + xs, ys = zip(*coords) + lons, lats = transformer.transform(list(xs), list(ys)) + return list(zip(lons, lats)) + + def transform_polygon(poly): + exterior = transform_ring(poly.exterior.coords) + holes = [transform_ring(h.coords) for h in poly.interiors] + return [exterior] + holes + + if simplified.geom_type == "Polygon": + return { + "type": "Polygon", + "coordinates": transform_polygon(simplified), + } + elif simplified.geom_type == "MultiPolygon": + return { + "type": "MultiPolygon", + "coordinates": [transform_polygon(p) for p in simplified.geoms], + } + elif simplified.geom_type == "GeometryCollection": + polys = [ + g for g in simplified.geoms if g.geom_type in ("Polygon", "MultiPolygon") + ] + if not polys: + return None + return to_wgs84_geojson(unary_union(polys), tolerance=0) + return None + + +def merge_fragments( + all_fragments: list[tuple[str, Polygon | MultiPolygon]], +) -> dict[str, Polygon | MultiPolygon]: + """Merge cross-OA fragments for postcodes spanning multiple OAs.""" + by_postcode: dict[str, list] = defaultdict(list) + for pc, geom in all_fragments: + by_postcode[pc].append(geom) + + merged = {} + for pc, parts in by_postcode.items(): + combined = unary_union(parts) + if combined.is_empty: + continue + if not combined.is_valid: + combined = make_valid(combined) + # Close tiny gaps between adjacent OA boundary edges (float mismatches) + if combined.geom_type == "MultiPolygon": + combined = combined.buffer(1.0).buffer(-1.0) + if not combined.is_valid: + combined = make_valid(combined) + # Postcodes are contiguous delivery routes — keep only the largest + # polygon; small detached fragments are algorithm artifacts + if combined.geom_type == "MultiPolygon": + combined = max(combined.geoms, key=lambda g: g.area) + merged[pc] = combined + return merged + + +def write_district_geojson( + postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path +) -> int: + """Group postcodes by district, write GeoJSON files. Returns file count.""" + units_dir = output_dir / "units" + units_dir.mkdir(parents=True, exist_ok=True) + + by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) + for pc, geom in postcodes.items(): + parts = pc.split() + district = parts[0] if parts else pc[:4] + by_district[district].append((pc, geom)) + + file_count = 0 + for district, entries in tqdm( + sorted(by_district.items()), desc="Writing GeoJSON", unit="file" + ): + features = [] + for pc, geom in sorted(entries, key=lambda x: x[0]): + geojson_geom = to_wgs84_geojson(geom) + if geojson_geom is None: + continue + mapit_code = pc.replace(" ", "") + features.append( + { + "type": "Feature", + "geometry": geojson_geom, + "properties": { + "postcodes": pc, + "mapit_code": mapit_code, + }, + } + ) + + if not features: + continue + + collection = {"type": "FeatureCollection", "features": features} + out_path = units_dir / f"{district}.geojson" + with open(out_path, "w") as f: + json.dump(collection, f, separators=(",", ":")) + file_count += 1 + + return file_count + + +# --------------------------------------------------------------------------- +# Main orchestration +# --------------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Generate postcode boundary polygons from OA + INSPIRE + UPRN data" + ) + parser.add_argument("--uprn", type=Path, required=True, help="UPRN lookup parquet") + parser.add_argument( + "--oa-boundaries", type=Path, required=True, help="OA boundaries GeoPackage" + ) + parser.add_argument( + "--inspire", type=Path, required=True, help="INSPIRE ZIP directory" + ) + parser.add_argument("--output", type=Path, required=True, help="Output directory") + parser.add_argument( + "--limit", type=int, default=0, help="Process only first N OAs (0=all)" + ) + args = parser.parse_args() + + # Phase 1: Load all data + print("=" * 60) + print("Phase 1: Loading data") + print("=" * 60) + + oa_geoms = load_oa_boundaries(args.oa_boundaries) + uprn_df, uprn_offsets = load_uprns(args.uprn) + + # Phase 2: Parse/load INSPIRE + print() + print("=" * 60) + print("Phase 2: INSPIRE data") + print("=" * 60) + + inspire_cache_dir = args.output / "inspire_cache" + if not _inspire_cache_exists(inspire_cache_dir): + cache_inspire(args.inspire, inspire_cache_dir) + inspire_bboxes, inspire_offsets, inspire_coords = load_inspire(inspire_cache_dir) + + # Phase 3: Process OAs + print() + print("=" * 60) + print("Phase 3: Processing OAs") + print("=" * 60) + + # Build work list — precompute which OAs are single vs multi-postcode + oa_codes_with_data = sorted(set(oa_geoms.keys()) & set(uprn_offsets.keys())) + skipped_no_uprn = len(oa_geoms) - len(oa_codes_with_data) + skipped_no_boundary = len(uprn_offsets) - len(oa_codes_with_data) + + if args.limit > 0: + oa_codes_with_data = oa_codes_with_data[: args.limit] + + print(f" OAs with UPRNs + boundaries: {len(oa_codes_with_data)}") + print(f" Skipped (no UPRNs): {skipped_no_uprn}") + print(f" Skipped (no boundary): {skipped_no_boundary}") + + all_fragments: list[tuple[str, Polygon | MultiPolygon]] = [] + single_count = 0 + multi_count = 0 + + for oa_code in tqdm( + oa_codes_with_data, + desc="Processing OAs", + unit="OA", + smoothing=0.01, + miniters=100, + ): + oa_geom = oa_geoms[oa_code] + points, postcodes = get_oa_uprns(uprn_df, uprn_offsets, oa_code) + + if len(set(postcodes)) == 1: + # Fast path: entire OA = one postcode + all_fragments.append((postcodes[0], oa_geom)) + single_count += 1 + continue + + # Get INSPIRE candidates via bbox pre-filter + candidates = get_inspire_candidates( + oa_geom.bounds, inspire_bboxes, inspire_offsets, inspire_coords + ) + + fragments = process_oa(oa_geom, points, postcodes, candidates) + all_fragments.extend(fragments) + multi_count += 1 + + print(f"\n Single-postcode OAs (fast path): {single_count}") + print(f" Multi-postcode OAs (INSPIRE+Voronoi): {multi_count}") + print(f" Total fragments: {len(all_fragments)}") + + # Free data no longer needed + del oa_geoms, uprn_df, uprn_offsets + del inspire_bboxes, inspire_offsets, inspire_coords + + # Phase 4: Merge and write + print() + print("=" * 60) + print("Phase 4: Merging fragments and writing GeoJSON") + print("=" * 60) + + merged = merge_fragments(all_fragments) + print(f" Merged into {len(merged)} unique postcodes") + + file_count = write_district_geojson(merged, args.output) + print(f"\n Wrote {file_count} district GeoJSON files to {args.output / 'units'}") + print("Done!") + + +if __name__ == "__main__": + main()