from collections import Counter, defaultdict import numpy as np from shapely import STRtree, make_valid from shapely.geometry import MultiPolygon, Polygon from shapely.ops import unary_union from .voronoi import compute_voronoi_regions def process_oa( oa_geom: Polygon | MultiPolygon, points: np.ndarray, postcodes: list[str], inspire_candidates: list[Polygon], ) -> list[tuple[str, Polygon | MultiPolygon]]: """Process a single OA → list of (postcode, geometry) fragments.""" unique_pcs = set(postcodes) if len(unique_pcs) == 1: return [(next(iter(unique_pcs)), oa_geom)] # Try INSPIRE-based assignment claimed: dict[str, Polygon | MultiPolygon] = {} if inspire_candidates: cand_tree = STRtree(inspire_candidates) from shapely import points as shp_points uprn_pts = shp_points(points) pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects") # Majority vote per candidate polygon cand_postcodes: dict[int, list[str]] = defaultdict(list) for pi, ci in zip(pt_idx, cand_idx): cand_postcodes[ci].append(postcodes[pi]) pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list) for ci, pc_list in cand_postcodes.items(): winner = Counter(pc_list).most_common(1)[0][0] pc_inspire_polys[winner].append(inspire_candidates[ci]) for pc, polys in pc_inspire_polys.items(): merged = unary_union(polys) if not merged.is_valid: merged = make_valid(merged) valid_oa = oa_geom if oa_geom.is_valid else make_valid(oa_geom) clipped = merged.intersection(valid_oa) if not clipped.is_empty: if not clipped.is_valid: clipped = make_valid(clipped) clipped = _extract_polygonal(clipped) if clipped is not None: claimed[pc] = clipped # Resolve overlaps: INSPIRE parcels can overlap geographically, so two # postcodes may claim the same area. Give contested area to whichever # postcode claimed it first (most UPRNs → first in insertion order). if len(claimed) > 1: resolved: dict[str, Polygon | MultiPolygon] = {} used = None for pc, geom in claimed.items(): if used is not None: if not geom.is_valid: geom = make_valid(geom) if not used.is_valid: used = make_valid(used) geom = geom.difference(used) if geom.is_empty: continue geom = _extract_polygonal(geom) if geom is None: continue resolved[pc] = geom used = geom if used is None else unary_union([used, geom]) claimed = resolved # Compute remaining area if claimed: all_claimed = unary_union(list(claimed.values())) if not all_claimed.is_valid: all_claimed = make_valid(all_claimed) valid_oa = oa_geom if oa_geom.is_valid else make_valid(oa_geom) remaining = valid_oa.difference(all_claimed) if not remaining.is_valid: remaining = make_valid(remaining) else: remaining = oa_geom if oa_geom.is_valid else make_valid(oa_geom) # Distribute remaining area via Voronoi if not remaining.is_empty and remaining.area > 0.01: voronoi_result = compute_voronoi_regions(points, postcodes, remaining) else: voronoi_result = {} # Combine claimed + voronoi result: dict[str, list] = defaultdict(list) for pc, geom in claimed.items(): result[pc].append(geom) for pc, geom in voronoi_result.items(): result[pc].append(geom) fragments = [] for pc, parts in result.items(): merged = unary_union(parts) if not merged.is_empty: if not merged.is_valid: merged = make_valid(merged) merged = _extract_polygonal(merged) if merged is not None: fragments.append((pc, merged)) return fragments def _extract_polygonal(geom) -> Polygon | MultiPolygon | None: """Extract only Polygon/MultiPolygon parts from a geometry. make_valid can produce GeometryCollections containing lines and points; this strips those away and returns only the polygonal component. """ if geom.geom_type in ("Polygon", "MultiPolygon"): return geom if geom.geom_type == "GeometryCollection": polys = [g for g in geom.geoms if g.geom_type in ("Polygon", "MultiPolygon")] if not polys: return None if len(polys) == 1: return polys[0] return MultiPolygon( [p for g in polys for p in (g.geoms if g.geom_type == "MultiPolygon" else [g])] ) return None