From f5e6894c0f8347137165321b76e54450aff371b3 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 7 Feb 2026 21:22:53 +0000 Subject: [PATCH] Add postcode boundary calculation --- pipeline/transform/__init__.py | 0 pipeline/transform/postcode_boundaries.py | 715 ------------------ .../transform/postcode_boundaries/README.md | 133 ++++ .../transform/postcode_boundaries/__init__.py | 23 + .../transform/postcode_boundaries/__main__.py | 122 +++ .../transform/postcode_boundaries/inspire.py | 155 ++++ .../transform/postcode_boundaries/memory.py | 11 + .../postcode_boundaries/oa_boundaries.py | 40 + .../transform/postcode_boundaries/output.py | 137 ++++ .../postcode_boundaries/process_oa.py | 127 ++++ .../test_postcode_boundaries.py | 430 +++++++++++ .../transform/postcode_boundaries/uprn.py | 84 ++ .../transform/postcode_boundaries/voronoi.py | 120 +++ pipeline/utils/__init__.py | 4 +- 14 files changed, 1384 insertions(+), 717 deletions(-) create mode 100644 pipeline/transform/__init__.py delete mode 100644 pipeline/transform/postcode_boundaries.py create mode 100644 pipeline/transform/postcode_boundaries/README.md create mode 100644 pipeline/transform/postcode_boundaries/__init__.py create mode 100644 pipeline/transform/postcode_boundaries/__main__.py create mode 100644 pipeline/transform/postcode_boundaries/inspire.py create mode 100644 pipeline/transform/postcode_boundaries/memory.py create mode 100644 pipeline/transform/postcode_boundaries/oa_boundaries.py create mode 100644 pipeline/transform/postcode_boundaries/output.py create mode 100644 pipeline/transform/postcode_boundaries/process_oa.py create mode 100644 pipeline/transform/postcode_boundaries/test_postcode_boundaries.py create mode 100644 pipeline/transform/postcode_boundaries/uprn.py create mode 100644 pipeline/transform/postcode_boundaries/voronoi.py diff --git a/pipeline/transform/__init__.py b/pipeline/transform/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/transform/postcode_boundaries.py b/pipeline/transform/postcode_boundaries.py deleted file mode 100644 index 8bfb853..0000000 --- a/pipeline/transform/postcode_boundaries.py +++ /dev/null @@ -1,715 +0,0 @@ -"""Generate postcode boundary polygons from OA boundaries, INSPIRE parcels, and UPRN data. - -Produces per-district GeoJSON files compatible with the Rust server's postcode loader. -Each postcode gets a polygon (or MultiPolygon) guaranteed to be contained within its -Output Area(s), with 100% OA coverage and no overlaps between postcodes within an OA. - -Algorithm per OA: - 1. Single-postcode OA → entire OA polygon assigned to that postcode - 2. Multi-postcode OA: - a. Assign INSPIRE parcels to postcodes via UPRN point-in-polygon majority vote - b. Union INSPIRE parcels per postcode, clip to OA → "claimed" area - c. Distribute remaining (unclaimed) OA area via Voronoi of UPRN points - d. Final polygon = claimed + Voronoi share - -Memory-efficient design (<12GB total): - - INSPIRE polygons stored as raw coordinate bytes in parquet; Shapely objects built - lazily per-OA via numpy bbox pre-filter (~100-500 candidates at a time) - - UPRNs kept as sorted polars DataFrame with offset dict (Arrow storage, ~1.2GB) - - OA processing runs sequentially (no multiprocess INSPIRE duplication) - -Output format: {output}/units/{DISTRICT}.geojson with properties.postcodes and -properties.mapit_code fields matching server-rs/src/data/postcodes.rs expectations. -""" - -import argparse -import ctypes -import gc -import json -import sqlite3 -import zipfile -from collections import Counter, defaultdict -from pathlib import Path -from xml.etree.ElementTree import iterparse - -import numpy as np -import polars as pl -from pyproj import Transformer -from scipy.spatial import Voronoi -from shapely import STRtree, make_valid, wkb -from shapely.geometry import MultiPolygon, Polygon -from shapely.ops import unary_union -from tqdm import tqdm - -# --------------------------------------------------------------------------- -# GeoPackage helpers -# --------------------------------------------------------------------------- - - -def _release_memory() -> None: - """Force Python + glibc to release freed memory back to the OS.""" - gc.collect() - try: - ctypes.CDLL("libc.so.6").malloc_trim(0) - except OSError: - pass - - -_ENVELOPE_SIZES = {0: 0, 1: 32, 2: 48, 3: 48, 4: 64} - - -def parse_gpkg_geometry(blob: bytes): - """Extract a Shapely geometry from a GeoPackage binary blob.""" - flags = blob[3] - envelope_type = (flags >> 1) & 0x07 - header_size = 8 + _ENVELOPE_SIZES[envelope_type] - return wkb.loads(blob[header_size:]) - - -def load_oa_boundaries(gpkg_path: Path) -> dict[str, Polygon | MultiPolygon]: - """Load OA boundary polygons from a GeoPackage. Geometry is already in BNG.""" - print("Loading OA boundaries...") - - conn = sqlite3.connect(str(gpkg_path)) - cur = conn.cursor() - cur.execute("SELECT OA21CD, SHAPE FROM OA_2021_EW_BGC_V2") - - oa_geoms: dict[str, Polygon | MultiPolygon] = {} - for oa_code, blob in cur: - geom = parse_gpkg_geometry(bytes(blob)) - if geom.geom_type == "MultiPolygon" and len(geom.geoms) == 1: - geom = geom.geoms[0] - oa_geoms[oa_code] = geom - - conn.close() - print(f" Loaded {len(oa_geoms)} OA boundaries") - return oa_geoms - - -# --------------------------------------------------------------------------- -# UPRN loading (memory-efficient: sorted polars DataFrame + offset dict) -# --------------------------------------------------------------------------- - - -def load_uprns(uprn_path: Path) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]: - """Load UPRNs as a sorted polars DataFrame with OA offset lookup. - - Returns (df, offsets) where offsets[oa_code] = (start_row, end_row). - Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings). - """ - import tempfile - - print("Loading UPRN lookup...") - - # Sort via streaming sink to avoid polars doubling memory during in-memory sort - with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: - tmp_path = Path(tmp.name) - ( - pl.scan_parquet(uprn_path) - .select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") - .filter(~pl.col("OA21CD").str.starts_with("S")) - .filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null()) - .filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "")) - .with_columns(pl.col("PCDS").str.strip_chars()) - .sort("OA21CD") - .sink_parquet(tmp_path) - ) - _release_memory() - - # Read the sorted data — only one copy in memory (~2GB) - df = pl.read_parquet(tmp_path) - tmp_path.unlink() - n = len(df) - print(f" Loaded {n:,} UPRNs (England & Wales)") - - # Compute OA group offsets using polars (avoids 37M Python string creation) - boundary_df = ( - df.lazy() - .with_row_index("_i") - .filter(pl.col("OA21CD") != pl.col("OA21CD").shift(1)) - .select("_i", "OA21CD") - .collect() - ) - starts_list = boundary_df["_i"].to_list() - oa_list = boundary_df["OA21CD"].to_list() - del boundary_df - offsets: dict[str, tuple[int, int]] = {} - for j in range(len(starts_list)): - end = starts_list[j + 1] if j + 1 < len(starts_list) else n - offsets[oa_list[j]] = (starts_list[j], end) - del starts_list, oa_list - - # Drop OA column (no longer needed) to save ~400MB - df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS") - _release_memory() - - print(f" Grouped into {len(offsets)} OAs") - return df, offsets - - -def get_oa_uprns( - df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str -) -> tuple[np.ndarray, list[str]]: - """Get UPRN coordinates and postcodes for a single OA. - - Returns (points_nx2, postcodes_list). - """ - s, e = offsets[oa_code] - sub = df[s:e] - points = np.column_stack( - [ - sub["GRIDGB1E"].to_numpy(), - sub["GRIDGB1N"].to_numpy(), - ] - ) - postcodes = sub["PCDS"].to_list() - return points, postcodes - - -# --------------------------------------------------------------------------- -# INSPIRE GML parsing and caching -# --------------------------------------------------------------------------- - -_GML_NS = "{http://www.opengis.net/gml/3.2}" -_LR_NS = "{www.landregistry.gov.uk}" - - -def parse_inspire_zip(zip_path: Path) -> list[np.ndarray]: - """Parse a single INSPIRE ZIP → list of Nx2 coordinate arrays (easting, northing).""" - results = [] - with zipfile.ZipFile(zip_path) as zf: - gml_names = [n for n in zf.namelist() if n.endswith(".gml")] - if not gml_names: - return results - with zf.open(gml_names[0]) as f: - for event, elem in iterparse(f, events=("end",)): - if elem.tag != f"{_LR_NS}PREDEFINED": - continue - pos_list = elem.find(f".//{_GML_NS}posList") - if pos_list is not None and pos_list.text: - vals = pos_list.text.split() - n = len(vals) // 2 - if n >= 3: - coords = np.array(vals, dtype=np.float64).reshape(n, 2) - results.append(coords) - elem.clear() - return results - - -def cache_inspire(inspire_dir: Path, cache_dir: Path) -> None: - """Parse all INSPIRE ZIPs and cache as memory-mappable binary files. - - Processes ZIPs sequentially to keep memory under control (~2GB peak). - Each ZIP's polygons are streamed to disk immediately after parsing. - - Writes three files: - - inspire_bboxes.npy: float64 array (N, 4) of [min_e, min_n, max_e, max_n] - - inspire_offsets.npy: int64 array (N, 2) of [byte_offset, n_points] - - inspire_coords.bin: flat binary of all float64 coordinate pairs - """ - zip_files = sorted(inspire_dir.glob("*.zip")) - print( - f"Parsing {len(zip_files)} INSPIRE ZIP files (sequential, streaming to disk)..." - ) - cache_dir.mkdir(parents=True, exist_ok=True) - - # Pre-allocate arrays for bboxes and offsets (grow if needed) - capacity = 25_000_000 - bboxes = np.empty((capacity, 4), dtype=np.float64) - offsets = np.empty((capacity, 2), dtype=np.int64) - count = 0 - byte_offset = 0 - - coords_path = cache_dir / "inspire_coords.bin" - with open(coords_path, "wb") as cf: - for zip_path in tqdm(zip_files, desc="INSPIRE ZIPs", unit="file"): - for coords in parse_inspire_zip(zip_path): - if count >= capacity: - capacity = int(capacity * 1.5) - bboxes.resize((capacity, 4), refcheck=False) - offsets.resize((capacity, 2), refcheck=False) - bboxes[count, 0] = coords[:, 0].min() - bboxes[count, 1] = coords[:, 1].min() - bboxes[count, 2] = coords[:, 0].max() - bboxes[count, 3] = coords[:, 1].max() - offsets[count, 0] = byte_offset - offsets[count, 1] = len(coords) - raw = coords.astype(np.float64).tobytes() - cf.write(raw) - byte_offset += len(raw) - count += 1 - - # Trim to actual size and save - bboxes = bboxes[:count] - offsets = offsets[:count] - np.save(cache_dir / "inspire_bboxes.npy", bboxes) - np.save(cache_dir / "inspire_offsets.npy", offsets) - size_mb = byte_offset / (1024 * 1024) - print(f" Cached {count:,} INSPIRE polygons (coords: {size_mb:.0f} MB)") - - -def _inspire_cache_exists(cache_dir: Path) -> bool: - return all( - (cache_dir / f).exists() - for f in ("inspire_bboxes.npy", "inspire_offsets.npy", "inspire_coords.bin") - ) - - -def load_inspire( - cache_dir: Path, -) -> tuple[np.ndarray, np.ndarray, np.memmap]: - """Load INSPIRE cache → (bboxes, offsets, coords_mmap). - - Memory usage: ~1.1GB (bboxes ~777MB + offsets ~290MB, coords memory-mapped). - """ - print(f"Loading INSPIRE cache from {cache_dir}...") - bboxes = np.load(cache_dir / "inspire_bboxes.npy") - offsets = np.load(cache_dir / "inspire_offsets.npy") - coords_mmap = np.memmap( - cache_dir / "inspire_coords.bin", dtype=np.float64, mode="r" - ) - print( - f" Loaded {len(bboxes):,} INSPIRE polygon bboxes (~{bboxes.nbytes // (1024 * 1024)} MB)" - ) - print(f" Offsets: ~{offsets.nbytes // (1024 * 1024)} MB, coords: memory-mapped") - return bboxes, offsets, coords_mmap - - -def get_inspire_candidates( - oa_bounds: tuple[float, float, float, float], - bboxes: np.ndarray, - offsets: np.ndarray, - coords_mmap: np.memmap, -) -> list[Polygon]: - """Get INSPIRE polygons overlapping an OA via bbox pre-filter. - - Builds Shapely objects only for matches (typically 10-500 per OA). - Reads coordinate data on-demand from memory-mapped file. - """ - min_e, min_n, max_e, max_n = oa_bounds - - # Vectorized bbox overlap test - mask = ( - (bboxes[:, 2] >= min_e) - & (bboxes[:, 0] <= max_e) - & (bboxes[:, 3] >= min_n) - & (bboxes[:, 1] <= max_n) - ) - idxs = np.where(mask)[0] - if len(idxs) == 0: - return [] - - # Build Shapely polygons only for candidates (coords from mmap) - candidates = [] - for i in idxs: - byte_offset = offsets[i, 0] - n_pts = offsets[i, 1] - float_offset = byte_offset // 8 # float64 = 8 bytes - coords = coords_mmap[float_offset : float_offset + n_pts * 2].reshape(-1, 2) - poly = Polygon(coords) - if not poly.is_valid: - poly = make_valid(poly) - if poly.geom_type == "MultiPolygon": - poly = max(poly.geoms, key=lambda g: g.area) - elif poly.geom_type != "Polygon": - continue - if not poly.is_empty: - candidates.append(poly) - return candidates - - -# --------------------------------------------------------------------------- -# Voronoi computation -# --------------------------------------------------------------------------- - - -def compute_voronoi_regions( - points: np.ndarray, postcodes: list[str], boundary: Polygon | MultiPolygon -) -> dict[str, Polygon | MultiPolygon]: - """Compute Voronoi regions for points, clipped to boundary, grouped by postcode.""" - if len(points) == 0: - return {} - if len(points) == 1: - return {postcodes[0]: boundary} - - # Deduplicate points per postcode (flats at same coords) - seen: dict[tuple[float, float], str] = {} - unique_pts = [] - unique_pcs = [] - for i in range(len(points)): - key = (points[i, 0], points[i, 1]) - if key not in seen: - seen[key] = postcodes[i] - unique_pts.append(points[i]) - unique_pcs.append(postcodes[i]) - - if len(unique_pts) == 1: - return {unique_pcs[0]: boundary} - - pts = np.array(unique_pts) - min_e, min_n = pts.min(axis=0) - max_e, max_n = pts.max(axis=0) - span = max(max_e - min_e, max_n - min_n, 100) - - dummy = np.array( - [ - [min_e - span * 10, min_n - span * 10], - [max_e + span * 10, min_n - span * 10], - [min_e - span * 10, max_n + span * 10], - [max_e + span * 10, max_n + span * 10], - ] - ) - all_points = np.vstack([pts, dummy]) - - try: - vor = Voronoi(all_points) - except Exception: - return {unique_pcs[0]: boundary} - - n_real = len(pts) - pc_polys: dict[str, list[Polygon]] = defaultdict(list) - - for i in range(n_real): - region_idx = vor.point_region[i] - region = vor.regions[region_idx] - if -1 in region or len(region) < 3: - continue - vertices = vor.vertices[region] - poly = Polygon(vertices) - if not poly.is_valid: - poly = make_valid(poly) - clipped = poly.intersection(boundary) - if not clipped.is_empty: - pc_polys[unique_pcs[i]].append(clipped) - - result = {} - for pc, parts in pc_polys.items(): - merged = unary_union(parts) - if not merged.is_empty: - result[pc] = merged - return result - - -# --------------------------------------------------------------------------- -# Per-OA processing -# --------------------------------------------------------------------------- - - -def process_oa( - oa_geom: Polygon | MultiPolygon, - points: np.ndarray, - postcodes: list[str], - inspire_candidates: list[Polygon], -) -> list[tuple[str, Polygon | MultiPolygon]]: - """Process a single OA → list of (postcode, geometry) fragments.""" - unique_pcs = set(postcodes) - if len(unique_pcs) == 1: - return [(next(iter(unique_pcs)), oa_geom)] - - # Try INSPIRE-based assignment - claimed: dict[str, Polygon | MultiPolygon] = {} - - if inspire_candidates: - cand_tree = STRtree(inspire_candidates) - - from shapely import points as shp_points - - uprn_pts = shp_points(points) - pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects") - - # Majority vote per candidate polygon - cand_postcodes: dict[int, list[str]] = defaultdict(list) - for pi, ci in zip(pt_idx, cand_idx): - cand_postcodes[ci].append(postcodes[pi]) - - pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list) - for ci, pc_list in cand_postcodes.items(): - winner = Counter(pc_list).most_common(1)[0][0] - pc_inspire_polys[winner].append(inspire_candidates[ci]) - - for pc, polys in pc_inspire_polys.items(): - merged = unary_union(polys) - clipped = merged.intersection(oa_geom) - if not clipped.is_empty: - if not clipped.is_valid: - clipped = make_valid(clipped) - claimed[pc] = clipped - - # Compute remaining area - if claimed: - all_claimed = unary_union(list(claimed.values())) - if not all_claimed.is_valid: - all_claimed = make_valid(all_claimed) - remaining = oa_geom.difference(all_claimed) - if not remaining.is_valid: - remaining = make_valid(remaining) - else: - remaining = oa_geom - - # Distribute remaining area via Voronoi - if not remaining.is_empty and remaining.area > 0.01: - voronoi_result = compute_voronoi_regions(points, postcodes, remaining) - else: - voronoi_result = {} - - # Combine claimed + voronoi - result: dict[str, list] = defaultdict(list) - for pc, geom in claimed.items(): - result[pc].append(geom) - for pc, geom in voronoi_result.items(): - result[pc].append(geom) - - fragments = [] - for pc, parts in result.items(): - merged = unary_union(parts) - if not merged.is_empty: - if not merged.is_valid: - merged = make_valid(merged) - fragments.append((pc, merged)) - - return fragments - - -# --------------------------------------------------------------------------- -# Output: merge fragments and write GeoJSON -# --------------------------------------------------------------------------- - -_to_wgs84 = None - - -def _get_to_wgs84(): - global _to_wgs84 - if _to_wgs84 is None: - _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) - return _to_wgs84 - - -def to_wgs84_geojson( - geom: Polygon | MultiPolygon, tolerance: float = 1.0 -) -> dict | None: - """Simplify geometry in BNG, convert to WGS84, return GeoJSON dict.""" - if geom.is_empty: - return None - - simplified = geom.simplify(tolerance, preserve_topology=True) - if simplified.is_empty: - return None - - transformer = _get_to_wgs84() - - def transform_ring(coords): - xs, ys = zip(*coords) - lons, lats = transformer.transform(list(xs), list(ys)) - return list(zip(lons, lats)) - - def transform_polygon(poly): - exterior = transform_ring(poly.exterior.coords) - holes = [transform_ring(h.coords) for h in poly.interiors] - return [exterior] + holes - - if simplified.geom_type == "Polygon": - return { - "type": "Polygon", - "coordinates": transform_polygon(simplified), - } - elif simplified.geom_type == "MultiPolygon": - return { - "type": "MultiPolygon", - "coordinates": [transform_polygon(p) for p in simplified.geoms], - } - elif simplified.geom_type == "GeometryCollection": - polys = [ - g for g in simplified.geoms if g.geom_type in ("Polygon", "MultiPolygon") - ] - if not polys: - return None - return to_wgs84_geojson(unary_union(polys), tolerance=0) - return None - - -def merge_fragments( - all_fragments: list[tuple[str, Polygon | MultiPolygon]], -) -> dict[str, Polygon | MultiPolygon]: - """Merge cross-OA fragments for postcodes spanning multiple OAs.""" - by_postcode: dict[str, list] = defaultdict(list) - for pc, geom in all_fragments: - by_postcode[pc].append(geom) - - merged = {} - for pc, parts in by_postcode.items(): - combined = unary_union(parts) - if combined.is_empty: - continue - if not combined.is_valid: - combined = make_valid(combined) - # Close tiny gaps between adjacent OA boundary edges (float mismatches) - if combined.geom_type == "MultiPolygon": - combined = combined.buffer(1.0).buffer(-1.0) - if not combined.is_valid: - combined = make_valid(combined) - # Postcodes are contiguous delivery routes — keep only the largest - # polygon; small detached fragments are algorithm artifacts - if combined.geom_type == "MultiPolygon": - combined = max(combined.geoms, key=lambda g: g.area) - merged[pc] = combined - return merged - - -def write_district_geojson( - postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path -) -> int: - """Group postcodes by district, write GeoJSON files. Returns file count.""" - units_dir = output_dir / "units" - units_dir.mkdir(parents=True, exist_ok=True) - - by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) - for pc, geom in postcodes.items(): - parts = pc.split() - district = parts[0] if parts else pc[:4] - by_district[district].append((pc, geom)) - - file_count = 0 - for district, entries in tqdm( - sorted(by_district.items()), desc="Writing GeoJSON", unit="file" - ): - features = [] - for pc, geom in sorted(entries, key=lambda x: x[0]): - geojson_geom = to_wgs84_geojson(geom) - if geojson_geom is None: - continue - mapit_code = pc.replace(" ", "") - features.append( - { - "type": "Feature", - "geometry": geojson_geom, - "properties": { - "postcodes": pc, - "mapit_code": mapit_code, - }, - } - ) - - if not features: - continue - - collection = {"type": "FeatureCollection", "features": features} - out_path = units_dir / f"{district}.geojson" - with open(out_path, "w") as f: - json.dump(collection, f, separators=(",", ":")) - file_count += 1 - - return file_count - - -# --------------------------------------------------------------------------- -# Main orchestration -# --------------------------------------------------------------------------- - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Generate postcode boundary polygons from OA + INSPIRE + UPRN data" - ) - parser.add_argument("--uprn", type=Path, required=True, help="UPRN lookup parquet") - parser.add_argument( - "--oa-boundaries", type=Path, required=True, help="OA boundaries GeoPackage" - ) - parser.add_argument( - "--inspire", type=Path, required=True, help="INSPIRE ZIP directory" - ) - parser.add_argument("--output", type=Path, required=True, help="Output directory") - parser.add_argument( - "--limit", type=int, default=0, help="Process only first N OAs (0=all)" - ) - args = parser.parse_args() - - # Phase 1: Load all data - print("=" * 60) - print("Phase 1: Loading data") - print("=" * 60) - - oa_geoms = load_oa_boundaries(args.oa_boundaries) - uprn_df, uprn_offsets = load_uprns(args.uprn) - - # Phase 2: Parse/load INSPIRE - print() - print("=" * 60) - print("Phase 2: INSPIRE data") - print("=" * 60) - - inspire_cache_dir = args.output / "inspire_cache" - if not _inspire_cache_exists(inspire_cache_dir): - cache_inspire(args.inspire, inspire_cache_dir) - inspire_bboxes, inspire_offsets, inspire_coords = load_inspire(inspire_cache_dir) - - # Phase 3: Process OAs - print() - print("=" * 60) - print("Phase 3: Processing OAs") - print("=" * 60) - - # Build work list — precompute which OAs are single vs multi-postcode - oa_codes_with_data = sorted(set(oa_geoms.keys()) & set(uprn_offsets.keys())) - skipped_no_uprn = len(oa_geoms) - len(oa_codes_with_data) - skipped_no_boundary = len(uprn_offsets) - len(oa_codes_with_data) - - if args.limit > 0: - oa_codes_with_data = oa_codes_with_data[: args.limit] - - print(f" OAs with UPRNs + boundaries: {len(oa_codes_with_data)}") - print(f" Skipped (no UPRNs): {skipped_no_uprn}") - print(f" Skipped (no boundary): {skipped_no_boundary}") - - all_fragments: list[tuple[str, Polygon | MultiPolygon]] = [] - single_count = 0 - multi_count = 0 - - for oa_code in tqdm( - oa_codes_with_data, - desc="Processing OAs", - unit="OA", - smoothing=0.01, - miniters=100, - ): - oa_geom = oa_geoms[oa_code] - points, postcodes = get_oa_uprns(uprn_df, uprn_offsets, oa_code) - - if len(set(postcodes)) == 1: - # Fast path: entire OA = one postcode - all_fragments.append((postcodes[0], oa_geom)) - single_count += 1 - continue - - # Get INSPIRE candidates via bbox pre-filter - candidates = get_inspire_candidates( - oa_geom.bounds, inspire_bboxes, inspire_offsets, inspire_coords - ) - - fragments = process_oa(oa_geom, points, postcodes, candidates) - all_fragments.extend(fragments) - multi_count += 1 - - print(f"\n Single-postcode OAs (fast path): {single_count}") - print(f" Multi-postcode OAs (INSPIRE+Voronoi): {multi_count}") - print(f" Total fragments: {len(all_fragments)}") - - # Free data no longer needed - del oa_geoms, uprn_df, uprn_offsets - del inspire_bboxes, inspire_offsets, inspire_coords - - # Phase 4: Merge and write - print() - print("=" * 60) - print("Phase 4: Merging fragments and writing GeoJSON") - print("=" * 60) - - merged = merge_fragments(all_fragments) - print(f" Merged into {len(merged)} unique postcodes") - - file_count = write_district_geojson(merged, args.output) - print(f"\n Wrote {file_count} district GeoJSON files to {args.output / 'units'}") - print("Done!") - - -if __name__ == "__main__": - main() diff --git a/pipeline/transform/postcode_boundaries/README.md b/pipeline/transform/postcode_boundaries/README.md new file mode 100644 index 0000000..d10a9cd --- /dev/null +++ b/pipeline/transform/postcode_boundaries/README.md @@ -0,0 +1,133 @@ +# postcode_boundaries + +Synthesizes postcode boundary polygons for England and Wales from three datasets. UK postcodes don't have official boundary polygons — Royal Mail defines postcodes as sets of delivery addresses, not geographic areas. This pipeline constructs a plausible polygon for every postcode by combining Output Area boundaries, UPRN point locations, and INSPIRE cadastral parcels. + +## The three input datasets + +**1. Output Area (OA) boundaries** — ONS Census Output Areas are the smallest geographic unit in the UK census (~125 households each). They tile all of England and Wales with no gaps or overlaps. Stored in a GeoPackage in British National Grid (EPSG:27700, meters). ~190,000 OAs. + +**2. UPRN lookup** — Every Unique Property Reference Number in England and Wales, with its grid coordinates (easting/northing in BNG), its postcode (`PCDS`), and its OA code (`OA21CD`). ~37 million rows. This is the critical bridge: it tells you which postcodes exist inside each OA, and where each address physically sits. + +**3. INSPIRE Index Polygons** — Land Registry cadastral parcels covering most of England and Wales. Each ZIP contains a GML file with polygon coordinate lists representing individual land parcels (buildings, plots of land). ~24 million polygons. These give fine-grained building/plot outlines that are much more precise than anything you could derive from point locations alone. + +## The four phases + +### Phase 1: Loading data + +**OA boundaries** (`oa_boundaries.py`): Opens the GeoPackage via SQLite, reads every row from `OA_2021_EW_BGC_V2`. Each row's `SHAPE` column is a GeoPackage binary blob — a standard 8-byte header, then a variable-size envelope (bounding box), then WKB geometry. `parse_gpkg_geometry` reads byte 3 to extract the envelope type (0-4), looks up the envelope size, skips past the header, and hands the remaining WKB bytes to Shapely. Single-polygon MultiPolygons are unwrapped. Result: `dict[oa_code, Polygon]`, all in BNG. + +**UPRNs** (`uprn.py`): The raw parquet has far more columns than needed. The lazy scan selects only four columns, filters out Scotland (OA codes starting with `S`), drops nulls and blank postcodes (stripping whitespace first), then sorts by OA code. The sort uses `sink_parquet` to write to a temp file — this avoids polars doubling memory from an in-memory sort on ~37M rows. + +After reading the sorted file back, it builds an offset dictionary. Rather than grouping into Python lists (which would create 37M Python string objects), it detects group boundaries by comparing each row's OA code to the previous row's. The result is `offsets[oa_code] = (start_row, end_row)` — a simple slice into the DataFrame. The OA column is then dropped since it's no longer needed, saving ~400MB. + +`get_oa_uprns` later retrieves a single OA's data by slicing `df[start:end]` and extracting the coordinates and postcodes. + +### Phase 2: INSPIRE data + +INSPIRE comes as ~350 ZIP files, each containing a GML file with thousands of `PREDEFINED` elements. Each element has a `posList` — a flat string of coordinate pairs. + +**Parsing** (`inspire.py:parse_inspire_zip`): Uses `iterparse` for streaming XML parsing (constant memory per ZIP). For each `PREDEFINED` element, extracts the `posList` text, splits into floats, reshapes to Nx2. Calls `elem.clear()` after each element to free XML nodes immediately. + +**Caching** (`inspire.py:cache_inspire`): Parsing 350 ZIPs takes a while, so results are cached as three files: +- `inspire_coords.bin` — flat binary dump of all float64 coordinate pairs, streamed to disk as each ZIP is parsed +- `inspire_bboxes.npy` — (N, 4) array of `[min_e, min_n, max_e, max_n]` per polygon +- `inspire_offsets.npy` — (N, 2) array of `[byte_offset_into_coords_bin, n_points]` + +Pre-allocates numpy arrays at 25M capacity and grows by 1.5x if needed (using in-place `resize` with `refcheck=False`). This avoids Python list overhead for 24M polygons. The coords file is written sequentially — each polygon's raw bytes are appended, and its byte offset is recorded. + +**Loading** (`inspire.py:load_inspire`): Bboxes and offsets are loaded into RAM (~1.1GB). Coords are memory-mapped — the OS pages them in on demand from the ~3GB file, never loading the whole thing. + +**Candidate retrieval** (`inspire.py:get_inspire_candidates`): Given an OA's bounding box, performs a vectorized numpy overlap test against all 24M INSPIRE bboxes — four comparisons broadcast across the entire array. Typically matches 10-500 parcels per OA. Only those matches are materialized as Shapely Polygon objects by reading their coordinate slice from the memory-mapped file. Invalid polygons are repaired with `make_valid`. + +### Phase 3: Processing OAs + +The main loop in `__main__.py` iterates through every OA that has both a boundary polygon and UPRNs. For each OA, it retrieves the OA's UPRN points and postcodes. + +**Fast path**: If every UPRN in the OA shares the same postcode, the entire OA polygon is assigned to that postcode. No geometry computation needed. This covers the majority of OAs (~70-80%). + +**Slow path** (`process_oa.py`): For multi-postcode OAs, the algorithm has three stages: + +#### Stage A: INSPIRE-based claiming + +Build an STRtree spatial index over the INSPIRE candidate polygons. Convert all UPRN points to Shapely Point objects and batch-query the tree with `predicate="intersects"`. This returns pairs of (point_index, candidate_index) — which UPRNs fall inside which parcels. + +For each INSPIRE parcel that contains at least one UPRN, run a majority vote: whichever postcode has the most UPRNs inside that parcel wins the parcel. Accumulate winning parcels per postcode, union them, and clip to the OA boundary. The result is `claimed[postcode] = polygon_within_oa`. + +Then resolve overlaps: INSPIRE parcels can overlap geographically (digitization overlaps), so two postcodes might claim the same square meters. Walk through the claimed dict in insertion order (the postcode with the most parcel wins gets priority by virtue of appearing first), subtracting the running union from each subsequent postcode's geometry. + +#### Stage B: Voronoi distribution of remaining area + +Subtract all claimed area from the OA polygon to get `remaining`. If remaining area > 0.01 sqm, pass ALL UPRN points (not just unclaimed ones) and the remaining polygon to `compute_voronoi_regions`. + +The Voronoi computation (`voronoi.py`): +1. Converts coordinates to float64 (since BNG grid refs are integers) +2. Deduplicates points, keeping one per (coordinate, postcode) pair. When multiple postcodes share the same coordinate (e.g. a block of flats straddling a postcode boundary), each postcode gets its own point with a tiny 0.01m jitter so Voronoi can distinguish them +3. Adds 4 dummy points far outside the real points (10x the spatial extent). This guarantees every real point gets a bounded Voronoi region (otherwise edge points get infinite regions) and also prevents collinearity from crashing scipy +4. Runs `scipy.spatial.Voronoi` on all points +5. For each real point's Voronoi cell, constructs the polygon from the Voronoi vertices, clips to the boundary, groups by postcode +6. Unions per-postcode fragments + +The effect: every unclaimed patch of OA gets assigned to the nearest postcode by straight-line distance (Voronoi tessellation is exactly the set of all points nearest to each generator). + +#### Stage C: Combine + +Each postcode gets its INSPIRE-claimed polygon (if any) plus its Voronoi share (if any). These are unioned together, validated, and stripped of any non-polygonal geometry debris from `make_valid`. + +The output of `process_oa` is `list[(postcode, polygon)]` — the per-OA fragments. A single postcode that spans two OAs produces two separate fragments (one from each OA's processing). + +### Phase 4: Merging and writing + +**Fragment merging** (`output.py:merge_fragments`): Groups all fragments by postcode, unions them. If the result is a MultiPolygon (meaning the postcode has disconnected pieces — either from spanning OAs with a gap, or algorithm artifacts), applies a 1m buffer-then-unbuffer to close tiny gaps from floating-point mismatches at OA boundary edges. If still a MultiPolygon after that, keeps only the largest polygon — postcodes are contiguous delivery routes, so detached fragments are artifacts. + +**GeoJSON output** (`output.py:write_district_geojson`): Groups postcodes by district (the outward code, e.g. `SW1A` from `SW1A 1AA`). For each district, converts every postcode polygon from BNG to WGS84 using pyproj, simplifies with 1m tolerance (Douglas-Peucker), rounds coordinates to 6 decimal places (~0.1m precision), and writes a single `{district}.geojson` FeatureCollection. Each Feature has `postcodes` (formatted like `"SW1A 1AA"`) and `mapit_code` (no space: `"SW1A1AA"`) in its properties. + +## Memory architecture + +The pipeline is designed to run in <12GB: + +| Dataset | Representation | Memory | +|---------|---------------|--------| +| OA boundaries | Python dict of Shapely objects | ~2GB | +| UPRNs | Polars DataFrame (Arrow columnar) + offset dict | ~1.5GB | +| INSPIRE bboxes | numpy float64 (N,4) | ~777MB | +| INSPIRE offsets | numpy int64 (N,2) | ~290MB | +| INSPIRE coords | memory-mapped file | ~0MB resident | +| Fragments | Python list of (str, Shapely) | grows during processing | + +Key design choices: +- INSPIRE coords are memory-mapped, not loaded — the OS pages in only the ~100-500 polygons needed per OA +- UPRNs sorted + offset dict avoids per-OA groupby allocation +- `sink_parquet` for the sort avoids doubling memory +- `release_memory()` calls `gc.collect()` + glibc `malloc_trim(0)` to return freed pages to the OS between phases +- All three large datasets are explicitly deleted before Phase 4 + +## Key invariants + +1. **Every square meter of every OA is assigned to exactly one postcode** — the combination of INSPIRE claiming + Voronoi fills the entire OA, and overlap resolution ensures no double-counting +2. **Every postcode that exists in the UPRN data gets a polygon** — unless all its UPRNs share coordinates with another postcode's UPRNs (handled by jitter) or it has zero UPRNs +3. **Postcode polygons never extend outside their OA(s)** — all geometry is clipped to OA boundaries +4. **Output is always single Polygon, never MultiPolygon** — the largest-polygon extraction in both `merge_fragments` and `to_wgs84_geojson` ensures this + +## Module structure + +``` +postcode_boundaries/ + __init__.py — Package docstring + __main__.py — CLI entry point, four-phase orchestration + memory.py — release_memory() glibc malloc_trim helper + oa_boundaries.py — GeoPackage parsing, OA boundary loading + uprn.py — UPRN loading (sorted DataFrame + offset dict), per-OA access + inspire.py — INSPIRE GML parsing, caching, loading, bbox candidate retrieval + voronoi.py — Voronoi region computation clipped to boundary + process_oa.py — Per-OA processing (INSPIRE assignment + Voronoi fallback) + output.py — BNG to WGS84 transform, fragment merging, GeoJSON writing +``` + +Invoked as: +```bash +uv run python -m pipeline.transform.postcode_boundaries \ + --uprn data/uprn_lookup.parquet \ + --oa-boundaries data/oa_boundaries.gpkg \ + --inspire data/inspire/ \ + --output data/postcode_boundaries/ +``` diff --git a/pipeline/transform/postcode_boundaries/__init__.py b/pipeline/transform/postcode_boundaries/__init__.py new file mode 100644 index 0000000..439af3d --- /dev/null +++ b/pipeline/transform/postcode_boundaries/__init__.py @@ -0,0 +1,23 @@ +"""Generate postcode boundary polygons from OA boundaries, INSPIRE parcels, and UPRN data. + +Produces per-district GeoJSON files compatible with the Rust server's postcode loader. +Each postcode gets a polygon (or MultiPolygon) guaranteed to be contained within its +Output Area(s), with 100% OA coverage and no overlaps between postcodes within an OA. + +Algorithm per OA: + 1. Single-postcode OA → entire OA polygon assigned to that postcode + 2. Multi-postcode OA: + a. Assign INSPIRE parcels to postcodes via UPRN point-in-polygon majority vote + b. Union INSPIRE parcels per postcode, clip to OA → "claimed" area + c. Distribute remaining (unclaimed) OA area via Voronoi of UPRN points + d. Final polygon = claimed + Voronoi share + +Memory-efficient design (<12GB total): + - INSPIRE polygons stored as raw coordinate bytes in parquet; Shapely objects built + lazily per-OA via numpy bbox pre-filter (~100-500 candidates at a time) + - UPRNs kept as sorted polars DataFrame with offset dict (Arrow storage, ~1.2GB) + - OA processing runs sequentially (no multiprocess INSPIRE duplication) + +Output format: {output}/units/{DISTRICT}.geojson with properties.postcodes and +properties.mapit_code fields matching server-rs/src/data/postcodes.rs expectations. +""" diff --git a/pipeline/transform/postcode_boundaries/__main__.py b/pipeline/transform/postcode_boundaries/__main__.py new file mode 100644 index 0000000..2bb26e4 --- /dev/null +++ b/pipeline/transform/postcode_boundaries/__main__.py @@ -0,0 +1,122 @@ +import argparse +from pathlib import Path + +from shapely.geometry import MultiPolygon, Polygon +from tqdm import tqdm + +from .inspire import cache_inspire, get_inspire_candidates, inspire_cache_exists, load_inspire +from .memory import release_memory +from .oa_boundaries import load_oa_boundaries +from .output import merge_fragments, write_district_geojson +from .process_oa import process_oa +from .uprn import get_oa_uprns, load_uprns + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Generate postcode boundary polygons from OA + INSPIRE + UPRN data" + ) + parser.add_argument("--uprn", type=Path, required=True, help="UPRN lookup parquet") + parser.add_argument( + "--oa-boundaries", type=Path, required=True, help="OA boundaries GeoPackage" + ) + parser.add_argument( + "--inspire", type=Path, required=True, help="INSPIRE ZIP directory" + ) + parser.add_argument("--output", type=Path, required=True, help="Output directory") + parser.add_argument( + "--limit", type=int, default=0, help="Process only first N OAs (0=all)" + ) + args = parser.parse_args() + + # Phase 1: Load all data + print("=" * 60) + print("Phase 1: Loading data") + print("=" * 60) + + oa_geoms = load_oa_boundaries(args.oa_boundaries) + uprn_df, uprn_offsets = load_uprns(args.uprn) + + # Phase 2: Parse/load INSPIRE + print() + print("=" * 60) + print("Phase 2: INSPIRE data") + print("=" * 60) + + inspire_cache_dir = args.output / "inspire_cache" + if not inspire_cache_exists(inspire_cache_dir): + cache_inspire(args.inspire, inspire_cache_dir) + inspire_bboxes, inspire_offsets, inspire_coords = load_inspire(inspire_cache_dir) + + # Phase 3: Process OAs + print() + print("=" * 60) + print("Phase 3: Processing OAs") + print("=" * 60) + + # Build work list — precompute which OAs are single vs multi-postcode + oa_codes_with_data = sorted(set(oa_geoms.keys()) & set(uprn_offsets.keys())) + skipped_no_uprn = len(oa_geoms) - len(oa_codes_with_data) + skipped_no_boundary = len(uprn_offsets) - len(oa_codes_with_data) + + if args.limit > 0: + oa_codes_with_data = oa_codes_with_data[: args.limit] + + print(f" OAs with UPRNs + boundaries: {len(oa_codes_with_data)}") + print(f" Skipped (no UPRNs): {skipped_no_uprn}") + print(f" Skipped (no boundary): {skipped_no_boundary}") + + all_fragments: list[tuple[str, Polygon | MultiPolygon]] = [] + single_count = 0 + multi_count = 0 + + for oa_code in tqdm( + oa_codes_with_data, + desc="Processing OAs", + unit="OA", + smoothing=0.01, + miniters=100, + ): + oa_geom = oa_geoms[oa_code] + points, postcodes = get_oa_uprns(uprn_df, uprn_offsets, oa_code) + + if len(set(postcodes)) == 1: + # Fast path: entire OA = one postcode + all_fragments.append((postcodes[0], oa_geom)) + single_count += 1 + continue + + # Get INSPIRE candidates via bbox pre-filter + candidates = get_inspire_candidates( + oa_geom.bounds, inspire_bboxes, inspire_offsets, inspire_coords + ) + + fragments = process_oa(oa_geom, points, postcodes, candidates) + all_fragments.extend(fragments) + multi_count += 1 + + print(f"\n Single-postcode OAs (fast path): {single_count}") + print(f" Multi-postcode OAs (INSPIRE+Voronoi): {multi_count}") + print(f" Total fragments: {len(all_fragments)}") + + # Free data no longer needed + del oa_geoms, uprn_df, uprn_offsets + del inspire_bboxes, inspire_offsets, inspire_coords + release_memory() + + # Phase 4: Merge and write + print() + print("=" * 60) + print("Phase 4: Merging fragments and writing GeoJSON") + print("=" * 60) + + merged = merge_fragments(all_fragments) + print(f" Merged into {len(merged)} unique postcodes") + + file_count = write_district_geojson(merged, args.output) + print(f"\n Wrote {file_count} district GeoJSON files to {args.output / 'units'}") + print("Done!") + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/postcode_boundaries/inspire.py b/pipeline/transform/postcode_boundaries/inspire.py new file mode 100644 index 0000000..515fa8c --- /dev/null +++ b/pipeline/transform/postcode_boundaries/inspire.py @@ -0,0 +1,155 @@ +import zipfile +from pathlib import Path +from xml.etree.ElementTree import iterparse + +import numpy as np +from shapely import make_valid +from shapely.geometry import Polygon +from tqdm import tqdm + +_GML_NS = "{http://www.opengis.net/gml/3.2}" +_LR_NS = "{www.landregistry.gov.uk}" + + +def parse_inspire_zip(zip_path: Path) -> list[np.ndarray]: + """Parse a single INSPIRE ZIP → list of Nx2 coordinate arrays (easting, northing).""" + results = [] + with zipfile.ZipFile(zip_path) as zf: + gml_names = [n for n in zf.namelist() if n.endswith(".gml")] + if not gml_names: + return results + with zf.open(gml_names[0]) as f: + for event, elem in iterparse(f, events=("end",)): + if elem.tag != f"{_LR_NS}PREDEFINED": + continue + pos_list = elem.find(f".//{_GML_NS}posList") + if pos_list is not None and pos_list.text: + vals = pos_list.text.split() + n = len(vals) // 2 + if n >= 3: + coords = np.array(vals, dtype=np.float64).reshape(n, 2) + results.append(coords) + elem.clear() + return results + + +def cache_inspire(inspire_dir: Path, cache_dir: Path) -> None: + """Parse all INSPIRE ZIPs and cache as memory-mappable binary files. + + Processes ZIPs sequentially to keep memory under control (~2GB peak). + Each ZIP's polygons are streamed to disk immediately after parsing. + + Writes three files: + - inspire_bboxes.npy: float64 array (N, 4) of [min_e, min_n, max_e, max_n] + - inspire_offsets.npy: int64 array (N, 2) of [byte_offset, n_points] + - inspire_coords.bin: flat binary of all float64 coordinate pairs + """ + zip_files = sorted(inspire_dir.glob("*.zip")) + print( + f"Parsing {len(zip_files)} INSPIRE ZIP files (sequential, streaming to disk)..." + ) + cache_dir.mkdir(parents=True, exist_ok=True) + + # Pre-allocate arrays for bboxes and offsets (grow if needed) + capacity = 25_000_000 + bboxes = np.empty((capacity, 4), dtype=np.float64) + offsets = np.empty((capacity, 2), dtype=np.int64) + count = 0 + byte_offset = 0 + + coords_path = cache_dir / "inspire_coords.bin" + with open(coords_path, "wb") as cf: + for zip_path in tqdm(zip_files, desc="INSPIRE ZIPs", unit="file"): + for coords in parse_inspire_zip(zip_path): + if count >= capacity: + capacity = int(capacity * 1.5) + bboxes.resize((capacity, 4), refcheck=False) + offsets.resize((capacity, 2), refcheck=False) + bboxes[count, 0] = coords[:, 0].min() + bboxes[count, 1] = coords[:, 1].min() + bboxes[count, 2] = coords[:, 0].max() + bboxes[count, 3] = coords[:, 1].max() + offsets[count, 0] = byte_offset + offsets[count, 1] = len(coords) + raw = coords.astype(np.float64).tobytes() + cf.write(raw) + byte_offset += len(raw) + count += 1 + + # Trim to actual size and save + bboxes = bboxes[:count] + offsets = offsets[:count] + np.save(cache_dir / "inspire_bboxes.npy", bboxes) + np.save(cache_dir / "inspire_offsets.npy", offsets) + size_mb = byte_offset / (1024 * 1024) + print(f" Cached {count:,} INSPIRE polygons (coords: {size_mb:.0f} MB)") + + +def inspire_cache_exists(cache_dir: Path) -> bool: + return all( + (cache_dir / f).exists() + for f in ("inspire_bboxes.npy", "inspire_offsets.npy", "inspire_coords.bin") + ) + + +def load_inspire( + cache_dir: Path, +) -> tuple[np.ndarray, np.ndarray, np.memmap]: + """Load INSPIRE cache → (bboxes, offsets, coords_mmap). + + Memory usage: ~1.1GB (bboxes ~777MB + offsets ~290MB, coords memory-mapped). + """ + print(f"Loading INSPIRE cache from {cache_dir}...") + bboxes = np.load(cache_dir / "inspire_bboxes.npy") + offsets = np.load(cache_dir / "inspire_offsets.npy") + coords_mmap = np.memmap( + cache_dir / "inspire_coords.bin", dtype=np.float64, mode="r" + ) + print( + f" Loaded {len(bboxes):,} INSPIRE polygon bboxes (~{bboxes.nbytes // (1024 * 1024)} MB)" + ) + print(f" Offsets: ~{offsets.nbytes // (1024 * 1024)} MB, coords: memory-mapped") + return bboxes, offsets, coords_mmap + + +def get_inspire_candidates( + oa_bounds: tuple[float, float, float, float], + bboxes: np.ndarray, + offsets: np.ndarray, + coords_mmap: np.memmap, +) -> list[Polygon]: + """Get INSPIRE polygons overlapping an OA via bbox pre-filter. + + Builds Shapely objects only for matches (typically 10-500 per OA). + Reads coordinate data on-demand from memory-mapped file. + """ + min_e, min_n, max_e, max_n = oa_bounds + + # Vectorized bbox overlap test + mask = ( + (bboxes[:, 2] >= min_e) + & (bboxes[:, 0] <= max_e) + & (bboxes[:, 3] >= min_n) + & (bboxes[:, 1] <= max_n) + ) + idxs = np.where(mask)[0] + if len(idxs) == 0: + return [] + + # Build Shapely polygons only for candidates (coords from mmap) + candidates = [] + for i in idxs: + byte_offset = offsets[i, 0] + n_pts = offsets[i, 1] + float_offset = byte_offset // 8 # float64 = 8 bytes + coords = coords_mmap[float_offset : float_offset + n_pts * 2].reshape(-1, 2) + poly = Polygon(coords) + if not poly.is_valid: + poly = make_valid(poly) + if poly.geom_type == "MultiPolygon": + poly = max(poly.geoms, key=lambda g: g.area) + elif poly.geom_type != "Polygon": + continue + if not poly.is_empty: + candidates.append(poly) + return candidates diff --git a/pipeline/transform/postcode_boundaries/memory.py b/pipeline/transform/postcode_boundaries/memory.py new file mode 100644 index 0000000..3caf0cf --- /dev/null +++ b/pipeline/transform/postcode_boundaries/memory.py @@ -0,0 +1,11 @@ +import ctypes +import gc + + +def release_memory() -> None: + """Force Python + glibc to release freed memory back to the OS.""" + gc.collect() + try: + ctypes.CDLL("libc.so.6").malloc_trim(0) + except OSError: + pass diff --git a/pipeline/transform/postcode_boundaries/oa_boundaries.py b/pipeline/transform/postcode_boundaries/oa_boundaries.py new file mode 100644 index 0000000..56aaf72 --- /dev/null +++ b/pipeline/transform/postcode_boundaries/oa_boundaries.py @@ -0,0 +1,40 @@ +import sqlite3 +from pathlib import Path + +from shapely import wkb +from shapely.geometry import MultiPolygon, Polygon + +_ENVELOPE_SIZES = {0: 0, 1: 32, 2: 48, 3: 48, 4: 64} + + +def parse_gpkg_geometry(blob: bytes): + """Extract a Shapely geometry from a GeoPackage binary blob.""" + flags = blob[3] + envelope_type = (flags >> 1) & 0x07 + envelope_size = _ENVELOPE_SIZES.get(envelope_type) + if envelope_size is None: + raise ValueError( + f"Unknown GeoPackage envelope type {envelope_type} (expected 0-4)" + ) + header_size = 8 + envelope_size + return wkb.loads(blob[header_size:]) + + +def load_oa_boundaries(gpkg_path: Path) -> dict[str, Polygon | MultiPolygon]: + """Load OA boundary polygons from a GeoPackage. Geometry is already in BNG.""" + print("Loading OA boundaries...") + + conn = sqlite3.connect(str(gpkg_path)) + cur = conn.cursor() + cur.execute("SELECT OA21CD, SHAPE FROM OA_2021_EW_BGC_V2") + + oa_geoms: dict[str, Polygon | MultiPolygon] = {} + for oa_code, blob in cur: + geom = parse_gpkg_geometry(bytes(blob)) + if geom.geom_type == "MultiPolygon" and len(geom.geoms) == 1: + geom = geom.geoms[0] + oa_geoms[oa_code] = geom + + conn.close() + print(f" Loaded {len(oa_geoms)} OA boundaries") + return oa_geoms diff --git a/pipeline/transform/postcode_boundaries/output.py b/pipeline/transform/postcode_boundaries/output.py new file mode 100644 index 0000000..fdacfb6 --- /dev/null +++ b/pipeline/transform/postcode_boundaries/output.py @@ -0,0 +1,137 @@ +import json +from collections import defaultdict +from pathlib import Path + +from pyproj import Transformer +from shapely import make_valid +from shapely.geometry import MultiPolygon, Polygon +from shapely.ops import unary_union +from tqdm import tqdm + +_to_wgs84 = None + + +def _get_to_wgs84(): + global _to_wgs84 + if _to_wgs84 is None: + _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) + return _to_wgs84 + + +def to_wgs84_geojson( + geom: Polygon | MultiPolygon, tolerance: float = 1.0 +) -> dict | None: + """Simplify geometry in BNG, convert to WGS84, return GeoJSON dict.""" + if geom.is_empty: + return None + + simplified = geom.simplify(tolerance, preserve_topology=True) + if simplified.is_empty: + return None + + transformer = _get_to_wgs84() + + def transform_ring(coords): + xs, ys = zip(*coords) + lons, lats = transformer.transform(list(xs), list(ys)) + return [(round(lon, 6), round(lat, 6)) for lon, lat in zip(lons, lats)] + + def transform_polygon(poly): + exterior = transform_ring(poly.exterior.coords) + holes = [transform_ring(h.coords) for h in poly.interiors] + return [exterior] + holes + + # Force single Polygon — postcodes are contiguous delivery routes + if simplified.geom_type == "MultiPolygon": + simplified = max(simplified.geoms, key=lambda g: g.area) + elif simplified.geom_type == "GeometryCollection": + polys = [ + g for g in simplified.geoms if g.geom_type in ("Polygon", "MultiPolygon") + ] + if not polys: + return None + simplified = max(polys, key=lambda g: g.area) + if simplified.geom_type == "MultiPolygon": + simplified = max(simplified.geoms, key=lambda g: g.area) + + if simplified.geom_type != "Polygon" or simplified.is_empty: + return None + + return { + "type": "Polygon", + "coordinates": transform_polygon(simplified), + } + + +def merge_fragments( + all_fragments: list[tuple[str, Polygon | MultiPolygon]], +) -> dict[str, Polygon | MultiPolygon]: + """Merge cross-OA fragments for postcodes spanning multiple OAs.""" + by_postcode: dict[str, list] = defaultdict(list) + for pc, geom in all_fragments: + by_postcode[pc].append(geom) + + merged = {} + for pc, parts in by_postcode.items(): + combined = unary_union(parts) + if combined.is_empty: + continue + if not combined.is_valid: + combined = make_valid(combined) + # Close tiny gaps between adjacent OA boundary edges (float mismatches) + if combined.geom_type == "MultiPolygon": + combined = combined.buffer(1.0).buffer(-1.0) + if not combined.is_valid: + combined = make_valid(combined) + # Postcodes are contiguous delivery routes — keep only the largest + # polygon; small detached fragments are algorithm artifacts + if combined.geom_type == "MultiPolygon": + combined = max(combined.geoms, key=lambda g: g.area) + merged[pc] = combined + return merged + + +def write_district_geojson( + postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path +) -> int: + """Group postcodes by district, write GeoJSON files. Returns file count.""" + units_dir = output_dir / "units" + units_dir.mkdir(parents=True, exist_ok=True) + + by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) + for pc, geom in postcodes.items(): + parts = pc.split() + district = parts[0] if parts else pc[:4] + by_district[district].append((pc, geom)) + + file_count = 0 + for district, entries in tqdm( + sorted(by_district.items()), desc="Writing GeoJSON", unit="file" + ): + features = [] + for pc, geom in sorted(entries, key=lambda x: x[0]): + geojson_geom = to_wgs84_geojson(geom) + if geojson_geom is None: + continue + mapit_code = pc.replace(" ", "") + features.append( + { + "type": "Feature", + "geometry": geojson_geom, + "properties": { + "postcodes": pc, + "mapit_code": mapit_code, + }, + } + ) + + if not features: + continue + + collection = {"type": "FeatureCollection", "features": features} + out_path = units_dir / f"{district}.geojson" + with open(out_path, "w") as f: + json.dump(collection, f, separators=(",", ":")) + file_count += 1 + + return file_count diff --git a/pipeline/transform/postcode_boundaries/process_oa.py b/pipeline/transform/postcode_boundaries/process_oa.py new file mode 100644 index 0000000..425822d --- /dev/null +++ b/pipeline/transform/postcode_boundaries/process_oa.py @@ -0,0 +1,127 @@ +from collections import Counter, defaultdict + +import numpy as np +from shapely import STRtree, make_valid +from shapely.geometry import MultiPolygon, Polygon +from shapely.ops import unary_union + +from .voronoi import compute_voronoi_regions + + +def process_oa( + oa_geom: Polygon | MultiPolygon, + points: np.ndarray, + postcodes: list[str], + inspire_candidates: list[Polygon], +) -> list[tuple[str, Polygon | MultiPolygon]]: + """Process a single OA → list of (postcode, geometry) fragments.""" + unique_pcs = set(postcodes) + if len(unique_pcs) == 1: + return [(next(iter(unique_pcs)), oa_geom)] + + # Try INSPIRE-based assignment + claimed: dict[str, Polygon | MultiPolygon] = {} + + if inspire_candidates: + cand_tree = STRtree(inspire_candidates) + + from shapely import points as shp_points + + uprn_pts = shp_points(points) + pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects") + + # Majority vote per candidate polygon + cand_postcodes: dict[int, list[str]] = defaultdict(list) + for pi, ci in zip(pt_idx, cand_idx): + cand_postcodes[ci].append(postcodes[pi]) + + pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list) + for ci, pc_list in cand_postcodes.items(): + winner = Counter(pc_list).most_common(1)[0][0] + pc_inspire_polys[winner].append(inspire_candidates[ci]) + + for pc, polys in pc_inspire_polys.items(): + merged = unary_union(polys) + clipped = merged.intersection(oa_geom) + if not clipped.is_empty: + if not clipped.is_valid: + clipped = make_valid(clipped) + clipped = _extract_polygonal(clipped) + if clipped is not None: + claimed[pc] = clipped + + # Resolve overlaps: INSPIRE parcels can overlap geographically, so two + # postcodes may claim the same area. Give contested area to whichever + # postcode claimed it first (most UPRNs → first in insertion order). + if len(claimed) > 1: + resolved: dict[str, Polygon | MultiPolygon] = {} + used = None + for pc, geom in claimed.items(): + if used is not None: + geom = geom.difference(used) + if geom.is_empty: + continue + if not geom.is_valid: + geom = make_valid(geom) + geom = _extract_polygonal(geom) + if geom is None: + continue + resolved[pc] = geom + used = geom if used is None else unary_union([used, geom]) + claimed = resolved + + # Compute remaining area + if claimed: + all_claimed = unary_union(list(claimed.values())) + if not all_claimed.is_valid: + all_claimed = make_valid(all_claimed) + remaining = oa_geom.difference(all_claimed) + if not remaining.is_valid: + remaining = make_valid(remaining) + else: + remaining = oa_geom + + # Distribute remaining area via Voronoi + if not remaining.is_empty and remaining.area > 0.01: + voronoi_result = compute_voronoi_regions(points, postcodes, remaining) + else: + voronoi_result = {} + + # Combine claimed + voronoi + result: dict[str, list] = defaultdict(list) + for pc, geom in claimed.items(): + result[pc].append(geom) + for pc, geom in voronoi_result.items(): + result[pc].append(geom) + + fragments = [] + for pc, parts in result.items(): + merged = unary_union(parts) + if not merged.is_empty: + if not merged.is_valid: + merged = make_valid(merged) + merged = _extract_polygonal(merged) + if merged is not None: + fragments.append((pc, merged)) + + return fragments + + +def _extract_polygonal(geom) -> Polygon | MultiPolygon | None: + """Extract only Polygon/MultiPolygon parts from a geometry. + + make_valid can produce GeometryCollections containing lines and points; + this strips those away and returns only the polygonal component. + """ + if geom.geom_type in ("Polygon", "MultiPolygon"): + return geom + if geom.geom_type == "GeometryCollection": + polys = [g for g in geom.geoms if g.geom_type in ("Polygon", "MultiPolygon")] + if not polys: + return None + if len(polys) == 1: + return polys[0] + return MultiPolygon( + [p for g in polys for p in (g.geoms if g.geom_type == "MultiPolygon" else [g])] + ) + return None diff --git a/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py b/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py new file mode 100644 index 0000000..97217f2 --- /dev/null +++ b/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py @@ -0,0 +1,430 @@ +"""Tests for the postcode_boundaries module. + +Each test targets a specific bug or edge case identified during code review. +""" + +import numpy as np +import polars as pl +import pytest +from shapely.geometry import MultiPolygon, Polygon, box + +from .oa_boundaries import parse_gpkg_geometry +from .output import merge_fragments, to_wgs84_geojson +from .process_oa import _extract_polygonal, process_oa +from .uprn import get_oa_uprns, load_uprns +from .voronoi import _equal_split_fallback, compute_voronoi_regions + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def square_boundary(): + """A 100x100m square OA boundary in BNG coords.""" + return box(500000, 180000, 500100, 180100) + + +@pytest.fixture +def uprn_parquet(tmp_path): + """Write a minimal UPRN parquet file with 3 OAs, return its path.""" + df = pl.DataFrame( + { + "GRIDGB1E": [500010, 500020, 500030, 500040, 500050, 500060], + "GRIDGB1N": [180010, 180020, 180030, 180040, 180050, 180060], + "PCDS": ["AA1 1AA", "AA1 1AB", "BB2 2BB", "BB2 2BC", "CC3 3CC", "CC3 3CD"], + "OA21CD": [ + "E00000001", + "E00000001", + "E00000002", + "E00000002", + "E00000003", + "E00000003", + ], + } + ) + path = tmp_path / "uprn.parquet" + df.write_parquet(path) + return path + + +# --------------------------------------------------------------------------- +# Bug 1: First OA silently dropped +# --------------------------------------------------------------------------- + + +class TestFirstOADropped: + """The UPRN offset computation drops the first OA (alphabetically).""" + + def test_first_oa_present_in_offsets(self, uprn_parquet): + """E00000001 is the first OA after sorting. It must appear in offsets.""" + df, offsets = load_uprns(uprn_parquet) + assert "E00000001" in offsets, ( + "First OA (E00000001) missing from offsets — shift(1) null comparison bug" + ) + + def test_all_oas_present(self, uprn_parquet): + """Every OA in the data must have an offset entry.""" + df, offsets = load_uprns(uprn_parquet) + assert set(offsets.keys()) == {"E00000001", "E00000002", "E00000003"} + + def test_first_oa_data_accessible(self, uprn_parquet): + """UPRNs for the first OA must be retrievable via get_oa_uprns.""" + df, offsets = load_uprns(uprn_parquet) + points, postcodes = get_oa_uprns(df, offsets, "E00000001") + assert len(postcodes) == 2 + assert set(postcodes) == {"AA1 1AA", "AA1 1AB"} + + +# --------------------------------------------------------------------------- +# Bug 2: Whitespace-only postcodes slip through +# --------------------------------------------------------------------------- + + +class TestWhitespacePostcodes: + """Postcodes that are only whitespace must be filtered out.""" + + def test_whitespace_postcodes_excluded(self, tmp_path): + """A PCDS value of ' ' should not survive loading.""" + df = pl.DataFrame( + { + "GRIDGB1E": [500010, 500020, 500030], + "GRIDGB1N": [180010, 180020, 180030], + "PCDS": ["AA1 1AA", " ", "BB2 2BB"], + "OA21CD": ["E00000001", "E00000001", "E00000002"], + } + ) + path = tmp_path / "uprn.parquet" + df.write_parquet(path) + + loaded_df, offsets = load_uprns(path) + all_postcodes = loaded_df["PCDS"].to_list() + assert "" not in all_postcodes, "Empty string postcode survived strip+filter" + assert " " not in all_postcodes, "Whitespace postcode survived filter" + + def test_whitespace_only_oa_excluded(self, tmp_path): + """An OA where all UPRNs have whitespace-only postcodes should not appear.""" + df = pl.DataFrame( + { + "GRIDGB1E": [500010, 500020], + "GRIDGB1N": [180010, 180020], + "PCDS": [" ", " "], + "OA21CD": ["E00000099", "E00000099"], + } + ) + path = tmp_path / "uprn.parquet" + df.write_parquet(path) + + loaded_df, _ = load_uprns(path) + assert len(loaded_df) == 0 + + +# --------------------------------------------------------------------------- +# Bug 3: Voronoi deduplication is first-seen-wins +# --------------------------------------------------------------------------- + + +class TestVoronoiDeduplication: + """Multiple postcodes sharing a coordinate must all receive area.""" + + def test_shared_coords_both_postcodes_get_area(self, square_boundary): + """Two postcodes with UPRNs at the same coords: both must get area.""" + # Two postcodes each have one UPRN at (500050, 180050) + # Plus postcode A has one at a different location + points = np.array( + [ + [500020, 180050], # postcode A — unique location + [500050, 180050], # postcode A — shared location + [500050, 180050], # postcode B — shared location (same coords) + [500080, 180050], # postcode B — unique location + ] + ) + postcodes = ["A", "A", "B", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result, "Postcode A should have Voronoi area" + assert "B" in result, "Postcode B should have Voronoi area" + + def test_all_shared_coords_no_postcode_lost(self, square_boundary): + """When all UPRNs for a postcode share coords with another, it must still get area.""" + # Postcode B's only UPRN is at the same coords as postcode A's + points = np.array( + [ + [500050, 180050], # postcode A + [500050, 180050], # postcode B — identical coords + ] + ) + postcodes = ["A", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result, "Postcode A should have area" + assert "B" in result, "Postcode B should have area" + # Both should get roughly equal area since they're at the same location + area_a = result["A"].area + area_b = result["B"].area + total = area_a + area_b + assert area_a / total > 0.2, "Postcode A should have meaningful area" + assert area_b / total > 0.2, "Postcode B should have meaningful area" + + def test_int64_coords_jitter_works(self, square_boundary): + """Int64 coords (production dtype) must still jitter correctly.""" + points = np.array( + [[500050, 180050], [500050, 180050]], dtype=np.int64 + ) + postcodes = ["A", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result, "Postcode A missing with int64 coords" + assert "B" in result, "Postcode B missing with int64 coords" + + +# --------------------------------------------------------------------------- +# Bug 4: Voronoi collinear fallback gives everything to first postcode +# --------------------------------------------------------------------------- + + +class TestVoronoiCollinear: + """Collinear points (handled by dummy corners) must distribute area fairly.""" + + def test_collinear_points_all_postcodes_get_area(self, square_boundary): + """Points along a line — every postcode must get area.""" + points = np.array( + [ + [500020, 180050], + [500040, 180050], + [500060, 180050], + [500080, 180050], + ] + ) + postcodes = ["A", "A", "B", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result, "Postcode A should have area" + assert "B" in result, "Postcode B should have area" + + def test_collinear_points_area_roughly_fair(self, square_boundary): + """With equal numbers of collinear points, area split should be roughly fair.""" + points = np.array( + [ + [500030, 180050], + [500070, 180050], + ] + ) + postcodes = ["A", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result and "B" in result + area_a = result["A"].area + area_b = result["B"].area + ratio = min(area_a, area_b) / max(area_a, area_b) + assert ratio > 0.3, f"Area split too unfair: {area_a:.0f} vs {area_b:.0f}" + + +class TestEqualSplitFallback: + """_equal_split_fallback must give every postcode some area.""" + + def test_all_postcodes_get_area(self, square_boundary): + result = _equal_split_fallback(["A", "B", "C"], square_boundary) + assert set(result.keys()) == {"A", "B", "C"} + for pc, geom in result.items(): + assert geom.area > 0, f"Postcode {pc} got zero area" + + def test_total_area_approximately_matches(self, square_boundary): + result = _equal_split_fallback(["A", "B"], square_boundary) + total = sum(g.area for g in result.values()) + assert total == pytest.approx(square_boundary.area, rel=0.01) + + +# --------------------------------------------------------------------------- +# Bug 5: process_oa can produce non-polygon geometries from make_valid +# --------------------------------------------------------------------------- + + +class TestProcessOAGeometryTypes: + """process_oa must return only Polygon/MultiPolygon fragments.""" + + def test_overlapping_inspire_no_postcode_overlap(self): + """Overlapping INSPIRE parcels assigned to different postcodes must not overlap.""" + oa_geom = box(500000, 180000, 500100, 180100) + # Two overlapping parcels — left half and a wider middle section + parcel_left = box(500000, 180000, 500060, 180100) + parcel_right = box(500040, 180000, 500100, 180100) # overlaps left by 20m + # UPRN in left parcel → postcode A, UPRN in right parcel → postcode B + points = np.array( + [ + [500020, 180050], # postcode A — inside left parcel + [500080, 180050], # postcode B — inside right parcel + ] + ) + postcodes = ["A", "B"] + fragments = process_oa( + oa_geom, points, postcodes, inspire_candidates=[parcel_left, parcel_right] + ) + frag_dict = dict(fragments) + assert "A" in frag_dict and "B" in frag_dict + # The critical check: no overlap between the two fragments + overlap = frag_dict["A"].intersection(frag_dict["B"]) + assert overlap.area < 0.01, ( + f"Postcodes A and B overlap by {overlap.area:.1f} sqm" + ) + + def test_fragments_are_polygonal(self): + """All fragments from process_oa must be Polygon or MultiPolygon.""" + oa_geom = box(500000, 180000, 500100, 180100) + points = np.array( + [ + [500020, 180020], + [500080, 180080], + ] + ) + postcodes = ["A", "B"] + fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[]) + for pc, geom in fragments: + assert geom.geom_type in ("Polygon", "MultiPolygon"), ( + f"Fragment for {pc} has unexpected type: {geom.geom_type}" + ) + assert not geom.is_empty, f"Fragment for {pc} is empty" + + def test_no_geometry_collection_in_output(self): + """Even with tricky INSPIRE parcels, output should never be GeometryCollection.""" + oa_geom = box(500000, 180000, 500100, 180100) + # Create a thin sliver that make_valid might convert to a line + sliver = Polygon( + [ + (500000, 180000), + (500100, 180000), + (500100, 180000.001), + (500000, 180000), + ] + ) + points = np.array( + [ + [500020, 180020], + [500080, 180080], + ] + ) + postcodes = ["A", "B"] + fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[sliver]) + for pc, geom in fragments: + assert geom.geom_type in ("Polygon", "MultiPolygon"), ( + f"Fragment for {pc} has type {geom.geom_type}" + ) + + +# --------------------------------------------------------------------------- +# _extract_polygonal helper +# --------------------------------------------------------------------------- + + +class TestExtractPolygonal: + """_extract_polygonal must strip non-polygon parts from geometry collections.""" + + def test_polygon_passthrough(self): + poly = box(0, 0, 10, 10) + assert _extract_polygonal(poly) is poly + + def test_multipolygon_passthrough(self): + mp = MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)]) + assert _extract_polygonal(mp) is mp + + def test_geometry_collection_extracts_polygon(self): + from shapely.geometry import GeometryCollection, LineString + + poly = box(0, 0, 10, 10) + line = LineString([(0, 0), (10, 10)]) + gc = GeometryCollection([poly, line]) + result = _extract_polygonal(gc) + assert result is not None + assert result.geom_type == "Polygon" + assert result.area == pytest.approx(poly.area) + + def test_geometry_collection_no_polygons_returns_none(self): + from shapely.geometry import GeometryCollection, LineString, Point + + gc = GeometryCollection([LineString([(0, 0), (1, 1)]), Point(5, 5)]) + assert _extract_polygonal(gc) is None + + def test_line_returns_none(self): + from shapely.geometry import LineString + + assert _extract_polygonal(LineString([(0, 0), (1, 1)])) is None + + +# --------------------------------------------------------------------------- +# Edge case: merge_fragments handles single-OA postcodes +# --------------------------------------------------------------------------- + + +class TestMergeFragments: + """merge_fragments must handle edge cases cleanly.""" + + def test_single_fragment_passthrough(self): + """A postcode with one fragment should pass through unchanged.""" + poly = box(0, 0, 100, 100) + result = merge_fragments([("AA1 1AA", poly)]) + assert "AA1 1AA" in result + assert result["AA1 1AA"].equals(poly) + + def test_empty_fragments_excluded(self): + """Empty geometries should not appear in output.""" + empty = Polygon() + result = merge_fragments([("AA1 1AA", empty)]) + assert "AA1 1AA" not in result + + +# --------------------------------------------------------------------------- +# Edge case: to_wgs84_geojson handles degenerate geometries +# --------------------------------------------------------------------------- + + +class TestToWgs84Geojson: + """to_wgs84_geojson must handle edge cases.""" + + def test_empty_geometry_returns_none(self): + assert to_wgs84_geojson(Polygon()) is None + + def test_valid_polygon_returns_geojson(self): + # Small square in BNG + poly = box(530000, 180000, 530100, 180100) + result = to_wgs84_geojson(poly) + assert result is not None + assert result["type"] == "Polygon" + assert len(result["coordinates"]) >= 1 + assert len(result["coordinates"][0]) >= 4 # closed ring + + def test_multipolygon_returns_largest(self): + """MultiPolygon input should return only the largest polygon.""" + big = box(530000, 180000, 530100, 180100) + small = box(530200, 180200, 530210, 180210) + mp = MultiPolygon([big, small]) + result = to_wgs84_geojson(mp) + assert result is not None + assert result["type"] == "Polygon" + + def test_coordinates_have_limited_precision(self): + """GeoJSON coordinates should be rounded to 6 decimal places.""" + import json + + poly = box(530000, 180000, 530100, 180100) + result = to_wgs84_geojson(poly) + assert result is not None + # Check precision via JSON serialization (what actually hits disk) + for lon, lat in result["coordinates"][0]: + lon_s = json.dumps(lon) + lat_s = json.dumps(lat) + lon_dp = len(lon_s.split(".")[1]) if "." in lon_s else 0 + lat_dp = len(lat_s.split(".")[1]) if "." in lat_s else 0 + assert lon_dp <= 6, f"Longitude {lon_s} has {lon_dp} decimal places" + assert lat_dp <= 6, f"Latitude {lat_s} has {lat_dp} decimal places" + + +# --------------------------------------------------------------------------- +# Edge case: parse_gpkg_geometry rejects unknown envelope types +# --------------------------------------------------------------------------- + + +class TestParseGpkgGeometry: + """parse_gpkg_geometry must raise on unknown envelope types.""" + + def test_unknown_envelope_type_raises(self): + # Build a minimal GeoPackage blob with envelope_type=5 + # Byte 3 = flags: envelope_type in bits 3-1, so type 5 = 0b00001010 + blob = bytes([0x47, 0x50, 0x00, 0b00001010]) + b"\x00" * 100 + with pytest.raises(ValueError, match="Unknown GeoPackage envelope type 5"): + parse_gpkg_geometry(blob) diff --git a/pipeline/transform/postcode_boundaries/uprn.py b/pipeline/transform/postcode_boundaries/uprn.py new file mode 100644 index 0000000..ae78b36 --- /dev/null +++ b/pipeline/transform/postcode_boundaries/uprn.py @@ -0,0 +1,84 @@ +from pathlib import Path + +import numpy as np +import polars as pl + +from .memory import release_memory + + +def load_uprns(uprn_path: Path) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]: + """Load UPRNs as a sorted polars DataFrame with OA offset lookup. + + Returns (df, offsets) where offsets[oa_code] = (start_row, end_row). + Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings). + """ + import tempfile + + print("Loading UPRN lookup...") + + # Sort via streaming sink to avoid polars doubling memory during in-memory sort + with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: + tmp_path = Path(tmp.name) + ( + pl.scan_parquet(uprn_path) + .select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") + .filter(~pl.col("OA21CD").str.starts_with("S")) + .filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null()) + .with_columns(pl.col("PCDS").str.strip_chars()) + .filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "")) + .sort("OA21CD") + .sink_parquet(tmp_path) + ) + release_memory() + + # Read the sorted data — only one copy in memory (~2GB) + df = pl.read_parquet(tmp_path) + tmp_path.unlink() + n = len(df) + print(f" Loaded {n:,} UPRNs (England & Wales)") + + # Compute OA group offsets using polars (avoids 37M Python string creation) + boundary_df = ( + df.lazy() + .with_row_index("_i") + .filter( + pl.col("OA21CD").shift(1).is_null() + | (pl.col("OA21CD") != pl.col("OA21CD").shift(1)) + ) + .select("_i", "OA21CD") + .collect() + ) + starts_list = boundary_df["_i"].to_list() + oa_list = boundary_df["OA21CD"].to_list() + del boundary_df + offsets: dict[str, tuple[int, int]] = {} + for j in range(len(starts_list)): + end = starts_list[j + 1] if j + 1 < len(starts_list) else n + offsets[oa_list[j]] = (starts_list[j], end) + del starts_list, oa_list + + # Drop OA column (no longer needed) to save ~400MB + df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS") + release_memory() + + print(f" Grouped into {len(offsets)} OAs") + return df, offsets + + +def get_oa_uprns( + df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str +) -> tuple[np.ndarray, list[str]]: + """Get UPRN coordinates and postcodes for a single OA. + + Returns (points_nx2, postcodes_list). + """ + s, e = offsets[oa_code] + sub = df[s:e] + points = np.column_stack( + [ + sub["GRIDGB1E"].to_numpy(), + sub["GRIDGB1N"].to_numpy(), + ] + ) + postcodes = sub["PCDS"].to_list() + return points, postcodes diff --git a/pipeline/transform/postcode_boundaries/voronoi.py b/pipeline/transform/postcode_boundaries/voronoi.py new file mode 100644 index 0000000..899582a --- /dev/null +++ b/pipeline/transform/postcode_boundaries/voronoi.py @@ -0,0 +1,120 @@ +from collections import defaultdict + +import numpy as np +from scipy.spatial import Voronoi +from shapely import make_valid +from shapely.geometry import MultiPolygon, Polygon +from shapely.ops import unary_union + + +def compute_voronoi_regions( + points: np.ndarray, postcodes: list[str], boundary: Polygon | MultiPolygon +) -> dict[str, Polygon | MultiPolygon]: + """Compute Voronoi regions for points, clipped to boundary, grouped by postcode.""" + if len(points) == 0: + return {} + if len(points) == 1: + return {postcodes[0]: boundary} + + # UPRN coordinates are int64 (BNG grid refs in whole meters). + # Convert to float64 so sub-meter jitter isn't truncated. + points = points.astype(np.float64) + + # Deduplicate points, keeping one per (location, postcode) pair. + # Multiple postcodes at the same coordinate each get their own point, + # jittered by a tiny offset (0.01m) so Voronoi can distinguish them. + seen: dict[tuple[float, float, str], bool] = {} + unique_pts = [] + unique_pcs = [] + coord_counts: dict[tuple[float, float], int] = defaultdict(int) + for i in range(len(points)): + coord = (points[i, 0], points[i, 1]) + key = (coord[0], coord[1], postcodes[i]) + if key not in seen: + seen[key] = True + jitter_idx = coord_counts[coord] + coord_counts[coord] += 1 + if jitter_idx == 0: + unique_pts.append(points[i].copy()) + else: + # Tiny jitter so Voronoi sees distinct points (0.01m per step) + jittered = points[i].copy() + angle = 2 * np.pi * jitter_idx / max(coord_counts[coord], jitter_idx + 1) + jittered[0] += 0.01 * np.cos(angle) + jittered[1] += 0.01 * np.sin(angle) + unique_pts.append(jittered) + unique_pcs.append(postcodes[i]) + + if len(unique_pts) == 1: + return {unique_pcs[0]: boundary} + + pts = np.array(unique_pts) + min_e, min_n = pts.min(axis=0) + max_e, max_n = pts.max(axis=0) + span = max(max_e - min_e, max_n - min_n, 100) + + dummy = np.array( + [ + [min_e - span * 10, min_n - span * 10], + [max_e + span * 10, min_n - span * 10], + [min_e - span * 10, max_n + span * 10], + [max_e + span * 10, max_n + span * 10], + ] + ) + all_points = np.vstack([pts, dummy]) + + try: + vor = Voronoi(all_points) + except Exception: + # Fallback: split boundary equally among all postcodes present + all_pcs = list(dict.fromkeys(unique_pcs)) + if len(all_pcs) == 1: + return {all_pcs[0]: boundary} + return _equal_split_fallback(all_pcs, boundary) + + n_real = len(pts) + pc_polys: dict[str, list[Polygon]] = defaultdict(list) + + for i in range(n_real): + region_idx = vor.point_region[i] + region = vor.regions[region_idx] + if -1 in region or len(region) < 3: + continue + vertices = vor.vertices[region] + poly = Polygon(vertices) + if not poly.is_valid: + poly = make_valid(poly) + clipped = poly.intersection(boundary) + if not clipped.is_empty: + pc_polys[unique_pcs[i]].append(clipped) + + result = {} + for pc, parts in pc_polys.items(): + merged = unary_union(parts) + if not merged.is_empty: + result[pc] = merged + return result + + +def _equal_split_fallback( + postcodes: list[str], boundary: Polygon | MultiPolygon +) -> dict[str, Polygon | MultiPolygon]: + """Split boundary into roughly equal horizontal strips, one per postcode.""" + min_x, min_y, max_x, max_y = boundary.bounds + n = len(postcodes) + result = {} + for i, pc in enumerate(postcodes): + strip_min_y = min_y + (max_y - min_y) * i / n + strip_max_y = min_y + (max_y - min_y) * (i + 1) / n + strip = Polygon( + [ + (min_x, strip_min_y), + (max_x, strip_min_y), + (max_x, strip_max_y), + (min_x, strip_max_y), + ] + ) + clipped = boundary.intersection(strip) + if not clipped.is_empty: + result[pc] = clipped + return result diff --git a/pipeline/utils/__init__.py b/pipeline/utils/__init__.py index 9dc42cc..d3d033e 100644 --- a/pipeline/utils/__init__.py +++ b/pipeline/utils/__init__.py @@ -1,7 +1,7 @@ from .download import download, extract_zip from .fuzzy_join import fuzzy_join_on_postcode from .haversine import haversine_km, haversine_km_expr -from .poi_counts import count_pois_within_radius +from .poi_counts import count_pois_per_postcode __all__ = [ "download", @@ -9,5 +9,5 @@ __all__ = [ "fuzzy_join_on_postcode", "haversine_km", "haversine_km_expr", - "count_pois_within_radius", + "count_pois_per_postcode", ]