"""Generate postcode boundary polygons from OA boundaries, INSPIRE parcels, and UPRN data. Produces per-district GeoJSON files compatible with the Rust server's postcode loader. Each postcode gets a polygon (or MultiPolygon) guaranteed to be contained within its Output Area(s), with 100% OA coverage and no overlaps between postcodes within an OA. Algorithm per OA: 1. Single-postcode OA → entire OA polygon assigned to that postcode 2. Multi-postcode OA: a. Assign INSPIRE parcels to postcodes via UPRN point-in-polygon majority vote b. Union INSPIRE parcels per postcode, clip to OA → "claimed" area c. Distribute remaining (unclaimed) OA area via Voronoi of UPRN points d. Final polygon = claimed + Voronoi share Memory-efficient design (<12GB total): - INSPIRE polygons stored as raw coordinate bytes in parquet; Shapely objects built lazily per-OA via numpy bbox pre-filter (~100-500 candidates at a time) - UPRNs kept as sorted polars DataFrame with offset dict (Arrow storage, ~1.2GB) - OA processing runs sequentially (no multiprocess INSPIRE duplication) Output format: {output}/units/{DISTRICT}.geojson with properties.postcodes and properties.mapit_code fields matching server-rs/src/data/postcodes.rs expectations. """ import argparse import ctypes import gc import json import sqlite3 import zipfile from collections import Counter, defaultdict from pathlib import Path from xml.etree.ElementTree import iterparse import numpy as np import polars as pl from pyproj import Transformer from scipy.spatial import Voronoi from shapely import STRtree, make_valid, wkb from shapely.geometry import MultiPolygon, Polygon from shapely.ops import unary_union from tqdm import tqdm # --------------------------------------------------------------------------- # GeoPackage helpers # --------------------------------------------------------------------------- def _release_memory() -> None: """Force Python + glibc to release freed memory back to the OS.""" gc.collect() try: ctypes.CDLL("libc.so.6").malloc_trim(0) except OSError: pass _ENVELOPE_SIZES = {0: 0, 1: 32, 2: 48, 3: 48, 4: 64} def parse_gpkg_geometry(blob: bytes): """Extract a Shapely geometry from a GeoPackage binary blob.""" flags = blob[3] envelope_type = (flags >> 1) & 0x07 header_size = 8 + _ENVELOPE_SIZES[envelope_type] return wkb.loads(blob[header_size:]) def load_oa_boundaries(gpkg_path: Path) -> dict[str, Polygon | MultiPolygon]: """Load OA boundary polygons from a GeoPackage. Geometry is already in BNG.""" print("Loading OA boundaries...") conn = sqlite3.connect(str(gpkg_path)) cur = conn.cursor() cur.execute("SELECT OA21CD, SHAPE FROM OA_2021_EW_BGC_V2") oa_geoms: dict[str, Polygon | MultiPolygon] = {} for oa_code, blob in cur: geom = parse_gpkg_geometry(bytes(blob)) if geom.geom_type == "MultiPolygon" and len(geom.geoms) == 1: geom = geom.geoms[0] oa_geoms[oa_code] = geom conn.close() print(f" Loaded {len(oa_geoms)} OA boundaries") return oa_geoms # --------------------------------------------------------------------------- # UPRN loading (memory-efficient: sorted polars DataFrame + offset dict) # --------------------------------------------------------------------------- def load_uprns(uprn_path: Path) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]: """Load UPRNs as a sorted polars DataFrame with OA offset lookup. Returns (df, offsets) where offsets[oa_code] = (start_row, end_row). Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings). """ import tempfile print("Loading UPRN lookup...") # Sort via streaming sink to avoid polars doubling memory during in-memory sort with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: tmp_path = Path(tmp.name) ( pl.scan_parquet(uprn_path) .select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") .filter(~pl.col("OA21CD").str.starts_with("S")) .filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null()) .filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "")) .with_columns(pl.col("PCDS").str.strip_chars()) .sort("OA21CD") .sink_parquet(tmp_path) ) _release_memory() # Read the sorted data — only one copy in memory (~2GB) df = pl.read_parquet(tmp_path) tmp_path.unlink() n = len(df) print(f" Loaded {n:,} UPRNs (England & Wales)") # Compute OA group offsets using polars (avoids 37M Python string creation) boundary_df = ( df.lazy() .with_row_index("_i") .filter(pl.col("OA21CD") != pl.col("OA21CD").shift(1)) .select("_i", "OA21CD") .collect() ) starts_list = boundary_df["_i"].to_list() oa_list = boundary_df["OA21CD"].to_list() del boundary_df offsets: dict[str, tuple[int, int]] = {} for j in range(len(starts_list)): end = starts_list[j + 1] if j + 1 < len(starts_list) else n offsets[oa_list[j]] = (starts_list[j], end) del starts_list, oa_list # Drop OA column (no longer needed) to save ~400MB df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS") _release_memory() print(f" Grouped into {len(offsets)} OAs") return df, offsets def get_oa_uprns( df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str ) -> tuple[np.ndarray, list[str]]: """Get UPRN coordinates and postcodes for a single OA. Returns (points_nx2, postcodes_list). """ s, e = offsets[oa_code] sub = df[s:e] points = np.column_stack( [ sub["GRIDGB1E"].to_numpy(), sub["GRIDGB1N"].to_numpy(), ] ) postcodes = sub["PCDS"].to_list() return points, postcodes # --------------------------------------------------------------------------- # INSPIRE GML parsing and caching # --------------------------------------------------------------------------- _GML_NS = "{http://www.opengis.net/gml/3.2}" _LR_NS = "{www.landregistry.gov.uk}" def parse_inspire_zip(zip_path: Path) -> list[np.ndarray]: """Parse a single INSPIRE ZIP → list of Nx2 coordinate arrays (easting, northing).""" results = [] with zipfile.ZipFile(zip_path) as zf: gml_names = [n for n in zf.namelist() if n.endswith(".gml")] if not gml_names: return results with zf.open(gml_names[0]) as f: for event, elem in iterparse(f, events=("end",)): if elem.tag != f"{_LR_NS}PREDEFINED": continue pos_list = elem.find(f".//{_GML_NS}posList") if pos_list is not None and pos_list.text: vals = pos_list.text.split() n = len(vals) // 2 if n >= 3: coords = np.array(vals, dtype=np.float64).reshape(n, 2) results.append(coords) elem.clear() return results def cache_inspire(inspire_dir: Path, cache_dir: Path) -> None: """Parse all INSPIRE ZIPs and cache as memory-mappable binary files. Processes ZIPs sequentially to keep memory under control (~2GB peak). Each ZIP's polygons are streamed to disk immediately after parsing. Writes three files: - inspire_bboxes.npy: float64 array (N, 4) of [min_e, min_n, max_e, max_n] - inspire_offsets.npy: int64 array (N, 2) of [byte_offset, n_points] - inspire_coords.bin: flat binary of all float64 coordinate pairs """ zip_files = sorted(inspire_dir.glob("*.zip")) print( f"Parsing {len(zip_files)} INSPIRE ZIP files (sequential, streaming to disk)..." ) cache_dir.mkdir(parents=True, exist_ok=True) # Pre-allocate arrays for bboxes and offsets (grow if needed) capacity = 25_000_000 bboxes = np.empty((capacity, 4), dtype=np.float64) offsets = np.empty((capacity, 2), dtype=np.int64) count = 0 byte_offset = 0 coords_path = cache_dir / "inspire_coords.bin" with open(coords_path, "wb") as cf: for zip_path in tqdm(zip_files, desc="INSPIRE ZIPs", unit="file"): for coords in parse_inspire_zip(zip_path): if count >= capacity: capacity = int(capacity * 1.5) bboxes.resize((capacity, 4), refcheck=False) offsets.resize((capacity, 2), refcheck=False) bboxes[count, 0] = coords[:, 0].min() bboxes[count, 1] = coords[:, 1].min() bboxes[count, 2] = coords[:, 0].max() bboxes[count, 3] = coords[:, 1].max() offsets[count, 0] = byte_offset offsets[count, 1] = len(coords) raw = coords.astype(np.float64).tobytes() cf.write(raw) byte_offset += len(raw) count += 1 # Trim to actual size and save bboxes = bboxes[:count] offsets = offsets[:count] np.save(cache_dir / "inspire_bboxes.npy", bboxes) np.save(cache_dir / "inspire_offsets.npy", offsets) size_mb = byte_offset / (1024 * 1024) print(f" Cached {count:,} INSPIRE polygons (coords: {size_mb:.0f} MB)") def _inspire_cache_exists(cache_dir: Path) -> bool: return all( (cache_dir / f).exists() for f in ("inspire_bboxes.npy", "inspire_offsets.npy", "inspire_coords.bin") ) def load_inspire( cache_dir: Path, ) -> tuple[np.ndarray, np.ndarray, np.memmap]: """Load INSPIRE cache → (bboxes, offsets, coords_mmap). Memory usage: ~1.1GB (bboxes ~777MB + offsets ~290MB, coords memory-mapped). """ print(f"Loading INSPIRE cache from {cache_dir}...") bboxes = np.load(cache_dir / "inspire_bboxes.npy") offsets = np.load(cache_dir / "inspire_offsets.npy") coords_mmap = np.memmap( cache_dir / "inspire_coords.bin", dtype=np.float64, mode="r" ) print( f" Loaded {len(bboxes):,} INSPIRE polygon bboxes (~{bboxes.nbytes // (1024 * 1024)} MB)" ) print(f" Offsets: ~{offsets.nbytes // (1024 * 1024)} MB, coords: memory-mapped") return bboxes, offsets, coords_mmap def get_inspire_candidates( oa_bounds: tuple[float, float, float, float], bboxes: np.ndarray, offsets: np.ndarray, coords_mmap: np.memmap, ) -> list[Polygon]: """Get INSPIRE polygons overlapping an OA via bbox pre-filter. Builds Shapely objects only for matches (typically 10-500 per OA). Reads coordinate data on-demand from memory-mapped file. """ min_e, min_n, max_e, max_n = oa_bounds # Vectorized bbox overlap test mask = ( (bboxes[:, 2] >= min_e) & (bboxes[:, 0] <= max_e) & (bboxes[:, 3] >= min_n) & (bboxes[:, 1] <= max_n) ) idxs = np.where(mask)[0] if len(idxs) == 0: return [] # Build Shapely polygons only for candidates (coords from mmap) candidates = [] for i in idxs: byte_offset = offsets[i, 0] n_pts = offsets[i, 1] float_offset = byte_offset // 8 # float64 = 8 bytes coords = coords_mmap[float_offset : float_offset + n_pts * 2].reshape(-1, 2) poly = Polygon(coords) if not poly.is_valid: poly = make_valid(poly) if poly.geom_type == "MultiPolygon": poly = max(poly.geoms, key=lambda g: g.area) elif poly.geom_type != "Polygon": continue if not poly.is_empty: candidates.append(poly) return candidates # --------------------------------------------------------------------------- # Voronoi computation # --------------------------------------------------------------------------- def compute_voronoi_regions( points: np.ndarray, postcodes: list[str], boundary: Polygon | MultiPolygon ) -> dict[str, Polygon | MultiPolygon]: """Compute Voronoi regions for points, clipped to boundary, grouped by postcode.""" if len(points) == 0: return {} if len(points) == 1: return {postcodes[0]: boundary} # Deduplicate points per postcode (flats at same coords) seen: dict[tuple[float, float], str] = {} unique_pts = [] unique_pcs = [] for i in range(len(points)): key = (points[i, 0], points[i, 1]) if key not in seen: seen[key] = postcodes[i] unique_pts.append(points[i]) unique_pcs.append(postcodes[i]) if len(unique_pts) == 1: return {unique_pcs[0]: boundary} pts = np.array(unique_pts) min_e, min_n = pts.min(axis=0) max_e, max_n = pts.max(axis=0) span = max(max_e - min_e, max_n - min_n, 100) dummy = np.array( [ [min_e - span * 10, min_n - span * 10], [max_e + span * 10, min_n - span * 10], [min_e - span * 10, max_n + span * 10], [max_e + span * 10, max_n + span * 10], ] ) all_points = np.vstack([pts, dummy]) try: vor = Voronoi(all_points) except Exception: return {unique_pcs[0]: boundary} n_real = len(pts) pc_polys: dict[str, list[Polygon]] = defaultdict(list) for i in range(n_real): region_idx = vor.point_region[i] region = vor.regions[region_idx] if -1 in region or len(region) < 3: continue vertices = vor.vertices[region] poly = Polygon(vertices) if not poly.is_valid: poly = make_valid(poly) clipped = poly.intersection(boundary) if not clipped.is_empty: pc_polys[unique_pcs[i]].append(clipped) result = {} for pc, parts in pc_polys.items(): merged = unary_union(parts) if not merged.is_empty: result[pc] = merged return result # --------------------------------------------------------------------------- # Per-OA processing # --------------------------------------------------------------------------- def process_oa( oa_geom: Polygon | MultiPolygon, points: np.ndarray, postcodes: list[str], inspire_candidates: list[Polygon], ) -> list[tuple[str, Polygon | MultiPolygon]]: """Process a single OA → list of (postcode, geometry) fragments.""" unique_pcs = set(postcodes) if len(unique_pcs) == 1: return [(next(iter(unique_pcs)), oa_geom)] # Try INSPIRE-based assignment claimed: dict[str, Polygon | MultiPolygon] = {} if inspire_candidates: cand_tree = STRtree(inspire_candidates) from shapely import points as shp_points uprn_pts = shp_points(points) pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects") # Majority vote per candidate polygon cand_postcodes: dict[int, list[str]] = defaultdict(list) for pi, ci in zip(pt_idx, cand_idx): cand_postcodes[ci].append(postcodes[pi]) pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list) for ci, pc_list in cand_postcodes.items(): winner = Counter(pc_list).most_common(1)[0][0] pc_inspire_polys[winner].append(inspire_candidates[ci]) for pc, polys in pc_inspire_polys.items(): merged = unary_union(polys) clipped = merged.intersection(oa_geom) if not clipped.is_empty: if not clipped.is_valid: clipped = make_valid(clipped) claimed[pc] = clipped # Compute remaining area if claimed: all_claimed = unary_union(list(claimed.values())) if not all_claimed.is_valid: all_claimed = make_valid(all_claimed) remaining = oa_geom.difference(all_claimed) if not remaining.is_valid: remaining = make_valid(remaining) else: remaining = oa_geom # Distribute remaining area via Voronoi if not remaining.is_empty and remaining.area > 0.01: voronoi_result = compute_voronoi_regions(points, postcodes, remaining) else: voronoi_result = {} # Combine claimed + voronoi result: dict[str, list] = defaultdict(list) for pc, geom in claimed.items(): result[pc].append(geom) for pc, geom in voronoi_result.items(): result[pc].append(geom) fragments = [] for pc, parts in result.items(): merged = unary_union(parts) if not merged.is_empty: if not merged.is_valid: merged = make_valid(merged) fragments.append((pc, merged)) return fragments # --------------------------------------------------------------------------- # Output: merge fragments and write GeoJSON # --------------------------------------------------------------------------- _to_wgs84 = None def _get_to_wgs84(): global _to_wgs84 if _to_wgs84 is None: _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) return _to_wgs84 def to_wgs84_geojson( geom: Polygon | MultiPolygon, tolerance: float = 1.0 ) -> dict | None: """Simplify geometry in BNG, convert to WGS84, return GeoJSON dict.""" if geom.is_empty: return None simplified = geom.simplify(tolerance, preserve_topology=True) if simplified.is_empty: return None transformer = _get_to_wgs84() def transform_ring(coords): xs, ys = zip(*coords) lons, lats = transformer.transform(list(xs), list(ys)) return list(zip(lons, lats)) def transform_polygon(poly): exterior = transform_ring(poly.exterior.coords) holes = [transform_ring(h.coords) for h in poly.interiors] return [exterior] + holes if simplified.geom_type == "Polygon": return { "type": "Polygon", "coordinates": transform_polygon(simplified), } elif simplified.geom_type == "MultiPolygon": return { "type": "MultiPolygon", "coordinates": [transform_polygon(p) for p in simplified.geoms], } elif simplified.geom_type == "GeometryCollection": polys = [ g for g in simplified.geoms if g.geom_type in ("Polygon", "MultiPolygon") ] if not polys: return None return to_wgs84_geojson(unary_union(polys), tolerance=0) return None def merge_fragments( all_fragments: list[tuple[str, Polygon | MultiPolygon]], ) -> dict[str, Polygon | MultiPolygon]: """Merge cross-OA fragments for postcodes spanning multiple OAs.""" by_postcode: dict[str, list] = defaultdict(list) for pc, geom in all_fragments: by_postcode[pc].append(geom) merged = {} for pc, parts in by_postcode.items(): combined = unary_union(parts) if combined.is_empty: continue if not combined.is_valid: combined = make_valid(combined) # Close tiny gaps between adjacent OA boundary edges (float mismatches) if combined.geom_type == "MultiPolygon": combined = combined.buffer(1.0).buffer(-1.0) if not combined.is_valid: combined = make_valid(combined) # Postcodes are contiguous delivery routes — keep only the largest # polygon; small detached fragments are algorithm artifacts if combined.geom_type == "MultiPolygon": combined = max(combined.geoms, key=lambda g: g.area) merged[pc] = combined return merged def write_district_geojson( postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path ) -> int: """Group postcodes by district, write GeoJSON files. Returns file count.""" units_dir = output_dir / "units" units_dir.mkdir(parents=True, exist_ok=True) by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) for pc, geom in postcodes.items(): parts = pc.split() district = parts[0] if parts else pc[:4] by_district[district].append((pc, geom)) file_count = 0 for district, entries in tqdm( sorted(by_district.items()), desc="Writing GeoJSON", unit="file" ): features = [] for pc, geom in sorted(entries, key=lambda x: x[0]): geojson_geom = to_wgs84_geojson(geom) if geojson_geom is None: continue mapit_code = pc.replace(" ", "") features.append( { "type": "Feature", "geometry": geojson_geom, "properties": { "postcodes": pc, "mapit_code": mapit_code, }, } ) if not features: continue collection = {"type": "FeatureCollection", "features": features} out_path = units_dir / f"{district}.geojson" with open(out_path, "w") as f: json.dump(collection, f, separators=(",", ":")) file_count += 1 return file_count # --------------------------------------------------------------------------- # Main orchestration # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Generate postcode boundary polygons from OA + INSPIRE + UPRN data" ) parser.add_argument("--uprn", type=Path, required=True, help="UPRN lookup parquet") parser.add_argument( "--oa-boundaries", type=Path, required=True, help="OA boundaries GeoPackage" ) parser.add_argument( "--inspire", type=Path, required=True, help="INSPIRE ZIP directory" ) parser.add_argument("--output", type=Path, required=True, help="Output directory") parser.add_argument( "--limit", type=int, default=0, help="Process only first N OAs (0=all)" ) args = parser.parse_args() # Phase 1: Load all data print("=" * 60) print("Phase 1: Loading data") print("=" * 60) oa_geoms = load_oa_boundaries(args.oa_boundaries) uprn_df, uprn_offsets = load_uprns(args.uprn) # Phase 2: Parse/load INSPIRE print() print("=" * 60) print("Phase 2: INSPIRE data") print("=" * 60) inspire_cache_dir = args.output / "inspire_cache" if not _inspire_cache_exists(inspire_cache_dir): cache_inspire(args.inspire, inspire_cache_dir) inspire_bboxes, inspire_offsets, inspire_coords = load_inspire(inspire_cache_dir) # Phase 3: Process OAs print() print("=" * 60) print("Phase 3: Processing OAs") print("=" * 60) # Build work list — precompute which OAs are single vs multi-postcode oa_codes_with_data = sorted(set(oa_geoms.keys()) & set(uprn_offsets.keys())) skipped_no_uprn = len(oa_geoms) - len(oa_codes_with_data) skipped_no_boundary = len(uprn_offsets) - len(oa_codes_with_data) if args.limit > 0: oa_codes_with_data = oa_codes_with_data[: args.limit] print(f" OAs with UPRNs + boundaries: {len(oa_codes_with_data)}") print(f" Skipped (no UPRNs): {skipped_no_uprn}") print(f" Skipped (no boundary): {skipped_no_boundary}") all_fragments: list[tuple[str, Polygon | MultiPolygon]] = [] single_count = 0 multi_count = 0 for oa_code in tqdm( oa_codes_with_data, desc="Processing OAs", unit="OA", smoothing=0.01, miniters=100, ): oa_geom = oa_geoms[oa_code] points, postcodes = get_oa_uprns(uprn_df, uprn_offsets, oa_code) if len(set(postcodes)) == 1: # Fast path: entire OA = one postcode all_fragments.append((postcodes[0], oa_geom)) single_count += 1 continue # Get INSPIRE candidates via bbox pre-filter candidates = get_inspire_candidates( oa_geom.bounds, inspire_bboxes, inspire_offsets, inspire_coords ) fragments = process_oa(oa_geom, points, postcodes, candidates) all_fragments.extend(fragments) multi_count += 1 print(f"\n Single-postcode OAs (fast path): {single_count}") print(f" Multi-postcode OAs (INSPIRE+Voronoi): {multi_count}") print(f" Total fragments: {len(all_fragments)}") # Free data no longer needed del oa_geoms, uprn_df, uprn_offsets del inspire_bboxes, inspire_offsets, inspire_coords # Phase 4: Merge and write print() print("=" * 60) print("Phase 4: Merging fragments and writing GeoJSON") print("=" * 60) merged = merge_fragments(all_fragments) print(f" Merged into {len(merged)} unique postcodes") file_count = write_district_geojson(merged, args.output) print(f"\n Wrote {file_count} district GeoJSON files to {args.output / 'units'}") print("Done!") if __name__ == "__main__": main()