from collections import Counter, defaultdict import numpy as np from scipy.spatial import cKDTree from shapely import STRtree, make_valid from shapely.geometry import MultiPoint, MultiPolygon, Point, Polygon from .geometry import safe_difference, safe_intersection, safe_union from .voronoi import compute_voronoi_regions MIN_GEOM_AREA = 0.01 # Minimal footprint (BNG metres) for a postcode whose UPRN seed wins no area in a # crowded multi-postcode OA — its Voronoi cell ∩ remaining collapses below # MIN_GEOM_AREA, or its seed sits inside an INSPIRE parcel wholly claimed by a # co-located postcode. Every *active* postcode must keep a boundary # (validate_outputs is zero-tolerance), so it gets a small disc at its true seed # location. ~28 m² clears MIN_GEOM_AREA and output snapping; the overlap it # creates with the area's winner is resolved at the output stage, where the # smaller postcode wins the contested ground (see output._resolve_overlaps). _MIN_SEED_FOOTPRINT_M = 3.0 def process_oa( oa_geom: Polygon | MultiPolygon, points: np.ndarray, postcodes: list[str], inspire_candidates: list[Polygon], ) -> list[tuple[str, Polygon | MultiPolygon]]: """Process a single OA → list of (postcode, geometry) fragments.""" unique_pcs = set(postcodes) if len(unique_pcs) == 1: return [(next(iter(unique_pcs)), oa_geom)] if len(points) == 0: return [] valid_oa = _clean_polygonal(oa_geom) if valid_oa is None: return [] if inspire_candidates: claimed = _claim_inspire_parcels(valid_oa, points, postcodes, inspire_candidates) else: claimed = {} # Compute remaining area if claimed: all_claimed = safe_union(list(claimed.values())) all_claimed = _clean_polygonal(all_claimed) remaining = ( safe_difference(valid_oa, all_claimed) if all_claimed is not None else valid_oa ) remaining = _clean_polygonal(remaining) else: remaining = valid_oa # Distribute non-parcel land via Voronoi if remaining is not None and not remaining.is_empty and remaining.area > MIN_GEOM_AREA: voronoi_result = compute_voronoi_regions(points, postcodes, remaining) else: voronoi_result = {} # Combine claimed + voronoi result: dict[str, list] = defaultdict(list) for pc, geom in claimed.items(): result[pc].append(geom) for pc, geom in voronoi_result.items(): result[pc].append(geom) fragments = [] for pc, parts in result.items(): merged = _clean_polygonal(safe_union(parts)) if merged is not None: fragments.append((pc, merged)) # Every postcode with a UPRN seed in this OA must keep at least a minimal # footprint — in a dense OA (a block of flats with hundreds of distinct # postcodes) a single-seed postcode's cell can collapse below MIN_GEOM_AREA or # be fully absorbed by a co-located postcode's INSPIRE parcel, producing no # fragment, and an active postcode must never be dropped. orphans = unique_pcs - {pc for pc, _ in fragments} if orphans: fragments.extend(_seed_footprints(orphans, points, postcodes, valid_oa)) return fragments def _seed_footprints( orphans: set[str], points: np.ndarray, postcodes: list[str], valid_oa: Polygon | MultiPolygon, ) -> list[tuple[str, Polygon | MultiPolygon]]: """Give each orphan postcode a minimal disc footprint at its UPRN seed(s). Orphans are postcodes with a UPRN in this OA that nonetheless won no area in the INSPIRE/Voronoi partition. Each keeps a small disc at its true location, clipped to the OA; the overlap with the area's winner is resolved at output. """ by_pc: dict[str, list] = defaultdict(list) for i, pc in enumerate(postcodes): if pc in orphans: by_pc[pc].append(points[i]) out: list[tuple[str, Polygon | MultiPolygon]] = [] for pc, pts in by_pc.items(): arr = np.asarray(pts, dtype=np.float64) seed = Point(arr[0]) if len(arr) == 1 else MultiPoint(arr) disc = seed.buffer(_MIN_SEED_FOOTPRINT_M) clipped = _clean_polygonal(safe_intersection(disc, valid_oa)) if clipped is None: # Seed on/near the OA edge: keep the unclipped disc so the postcode # still gets a footprint at its location rather than no boundary. clipped = _clean_polygonal(disc) if clipped is not None: out.append((pc, clipped)) return out def _claim_inspire_parcels( valid_oa: Polygon | MultiPolygon, points: np.ndarray, postcodes: list[str], inspire_candidates: list[Polygon], ) -> dict[str, Polygon | MultiPolygon]: """Assign INSPIRE parcels to postcodes before Voronoi fills non-parcel land.""" parcels = _prepare_inspire_parcels(valid_oa, inspire_candidates) if not parcels: return {} cand_tree = STRtree(parcels) from shapely import points as shp_points uprn_pts = shp_points(points) pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="within") # First priority: parcels that physically contain UPRNs. A parcel holding # UPRNs from a single postcode goes wholly to that postcode. A parcel shared # by several postcodes (a block of flats spanning postcodes, or overlapping # parcel data) is split between them via a sub-Voronoi over their own UPRNs # clipped to the parcel — so EVERY contained postcode keeps part of the # parcel. A bare majority vote would hand the whole parcel to one winner and # leave the losers' UPRNs trapped inside claimed land, dropping them from # both this claim and the `remaining` polygon handed to Voronoi downstream. cand_postcodes: dict[int, list[str]] = defaultdict(list) cand_point_idx: dict[int, list[int]] = defaultdict(list) for pi, ci in zip(pt_idx, cand_idx): cand_postcodes[ci].append(postcodes[pi]) cand_point_idx[ci].append(pi) points_f64 = points.astype(np.float64, copy=False) contained_parts: dict[str, list] = defaultdict(list) contained_scores: Counter[str] = Counter() for ci, pc_list in cand_postcodes.items(): pc_counts = Counter(pc_list) if len(pc_counts) == 1: winner = next(iter(pc_counts)) contained_parts[winner].append(parcels[ci]) contained_scores[winner] += pc_counts[winner] continue # Shared parcel: sub-Voronoi over the contained UPRNs so each postcode # present keeps a fragment instead of being absorbed by the winner. sub_idx = cand_point_idx[ci] sub_points = points_f64[sub_idx] sub_postcodes = [postcodes[pi] for pi in sub_idx] for pc, geom in compute_voronoi_regions( sub_points, sub_postcodes, parcels[ci] ).items(): cleaned = _clean_polygonal(geom) if cleaned is not None: contained_parts[pc].append(cleaned) contained_scores[pc] += pc_counts[pc] contained_claimed = _merge_parts_by_postcode(contained_parts) contained_claims = sorted( contained_claimed.items(), key=lambda item: (-contained_scores[item[0]], -item[1].area, item[0]), ) # Second priority: remaining INSPIRE parcels with no contained UPRN. Assign # each to the nearest UPRN/postcode so parcel boundaries carry more of the # visible postcode shape; Voronoi is then limited to roads, parks, water, and # any other non-parcel gaps. contained_union = _union_claims(contained_claims) nearest_tree = cKDTree(points_f64) nearest_parts: dict[str, list] = defaultdict(list) for i, parcel in enumerate(parcels): if i in cand_postcodes: continue assignable = parcel if contained_union is not None: assignable = safe_difference(assignable, contained_union) for part in _polygon_parts(assignable): part = _clean_polygonal(part) if part is None: continue pc = _nearest_postcode(part, nearest_tree, postcodes) nearest_parts[pc].append(part) nearest_claimed = _merge_parts_by_postcode(nearest_parts) nearest_claims = sorted( nearest_claimed.items(), key=lambda item: (-item[1].area, item[0]), ) return _resolve_ordered_claims(contained_claims + nearest_claims) def _prepare_inspire_parcels( valid_oa: Polygon | MultiPolygon, inspire_candidates: list[Polygon], ) -> list[Polygon | MultiPolygon]: parcels: list[Polygon | MultiPolygon] = [] for candidate in inspire_candidates: geom = _clean_polygonal(candidate) if geom is None: continue if not geom.intersects(valid_oa): continue clipped = _clean_polygonal(safe_intersection(geom, valid_oa)) if clipped is not None: parcels.append(clipped) return parcels def _nearest_postcode( geom: Polygon | MultiPolygon, tree: cKDTree, postcodes: list[str], ) -> str: point = geom.representative_point() _, idx = tree.query([point.x, point.y]) return postcodes[idx] def _polygon_parts(geom) -> list[Polygon]: geom = _clean_polygonal(geom) if geom is None: return [] if geom.geom_type == "Polygon": return [geom] return list(geom.geoms) def _merge_parts_by_postcode( parts_by_postcode: dict[str, list], ) -> dict[str, Polygon | MultiPolygon]: merged: dict[str, Polygon | MultiPolygon] = {} for pc, parts in parts_by_postcode.items(): geom = _clean_polygonal(safe_union(parts)) if geom is not None: merged[pc] = geom return merged def _union_claims( claims: list[tuple[str, Polygon | MultiPolygon]], ) -> Polygon | MultiPolygon | None: if not claims: return None return _clean_polygonal(safe_union([geom for _, geom in claims])) def _resolve_ordered_claims( claims: list[tuple[str, Polygon | MultiPolygon]], ) -> dict[str, Polygon | MultiPolygon]: """Resolve overlapping parcel claims in priority order.""" resolved_parts: dict[str, list] = defaultdict(list) used = None for pc, geom in claims: geom = _clean_polygonal(geom) if geom is None: continue if used is not None: geom = _clean_polygonal(safe_difference(geom, used)) if geom is None: continue resolved_parts[pc].append(geom) used = _clean_polygonal(geom if used is None else safe_union([used, geom])) return _merge_parts_by_postcode(resolved_parts) def _clean_polygonal(geom) -> Polygon | MultiPolygon | None: if geom is None or geom.is_empty: return None if not geom.is_valid: geom = make_valid(geom) geom = _extract_polygonal(geom) if geom is None or geom.is_empty or geom.area <= MIN_GEOM_AREA: return None return geom def _extract_polygonal(geom) -> Polygon | MultiPolygon | None: """Extract only Polygon/MultiPolygon parts from a geometry. make_valid can produce GeometryCollections containing lines and points; this strips those away and returns only the polygonal component. """ if geom.geom_type in ("Polygon", "MultiPolygon"): return geom if geom.geom_type == "GeometryCollection": polys = [g for g in geom.geoms if g.geom_type in ("Polygon", "MultiPolygon")] if not polys: return None if len(polys) == 1: return polys[0] # Union (not bare MultiPolygon construction): make_valid can emit # overlapping polygonal parts, and a MultiPolygon of overlapping parts is # invalid — it double-counts area and makes the next `.difference()` raise # a TopologyException that aborts the OA (and, in parallel mode, the # worker). safe_union merges them into a valid geometry. merged = safe_union(polys) return merged if not merged.is_empty else None return None