137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
from collections import Counter, defaultdict
|
|
|
|
import numpy as np
|
|
from shapely import STRtree, make_valid
|
|
from shapely.geometry import MultiPolygon, Polygon
|
|
from shapely.ops import unary_union
|
|
|
|
from .voronoi import compute_voronoi_regions
|
|
|
|
|
|
def process_oa(
|
|
oa_geom: Polygon | MultiPolygon,
|
|
points: np.ndarray,
|
|
postcodes: list[str],
|
|
inspire_candidates: list[Polygon],
|
|
) -> list[tuple[str, Polygon | MultiPolygon]]:
|
|
"""Process a single OA → list of (postcode, geometry) fragments."""
|
|
unique_pcs = set(postcodes)
|
|
if len(unique_pcs) == 1:
|
|
return [(next(iter(unique_pcs)), oa_geom)]
|
|
|
|
# Try INSPIRE-based assignment
|
|
claimed: dict[str, Polygon | MultiPolygon] = {}
|
|
|
|
if inspire_candidates:
|
|
cand_tree = STRtree(inspire_candidates)
|
|
|
|
from shapely import points as shp_points
|
|
|
|
uprn_pts = shp_points(points)
|
|
pt_idx, cand_idx = cand_tree.query(uprn_pts, predicate="intersects")
|
|
|
|
# Majority vote per candidate polygon
|
|
cand_postcodes: dict[int, list[str]] = defaultdict(list)
|
|
for pi, ci in zip(pt_idx, cand_idx):
|
|
cand_postcodes[ci].append(postcodes[pi])
|
|
|
|
pc_inspire_polys: dict[str, list[Polygon]] = defaultdict(list)
|
|
for ci, pc_list in cand_postcodes.items():
|
|
winner = Counter(pc_list).most_common(1)[0][0]
|
|
pc_inspire_polys[winner].append(inspire_candidates[ci])
|
|
|
|
for pc, polys in pc_inspire_polys.items():
|
|
merged = unary_union(polys)
|
|
if not merged.is_valid:
|
|
merged = make_valid(merged)
|
|
valid_oa = oa_geom if oa_geom.is_valid else make_valid(oa_geom)
|
|
clipped = merged.intersection(valid_oa)
|
|
if not clipped.is_empty:
|
|
if not clipped.is_valid:
|
|
clipped = make_valid(clipped)
|
|
clipped = _extract_polygonal(clipped)
|
|
if clipped is not None:
|
|
claimed[pc] = clipped
|
|
|
|
# Resolve overlaps: INSPIRE parcels can overlap geographically, so two
|
|
# postcodes may claim the same area. Give contested area to whichever
|
|
# postcode claimed it first (most UPRNs → first in insertion order).
|
|
if len(claimed) > 1:
|
|
resolved: dict[str, Polygon | MultiPolygon] = {}
|
|
used = None
|
|
for pc, geom in claimed.items():
|
|
if used is not None:
|
|
if not geom.is_valid:
|
|
geom = make_valid(geom)
|
|
if not used.is_valid:
|
|
used = make_valid(used)
|
|
geom = geom.difference(used)
|
|
if geom.is_empty:
|
|
continue
|
|
geom = _extract_polygonal(geom)
|
|
if geom is None:
|
|
continue
|
|
resolved[pc] = geom
|
|
used = geom if used is None else unary_union([used, geom])
|
|
claimed = resolved
|
|
|
|
# Compute remaining area
|
|
if claimed:
|
|
all_claimed = unary_union(list(claimed.values()))
|
|
if not all_claimed.is_valid:
|
|
all_claimed = make_valid(all_claimed)
|
|
valid_oa = oa_geom if oa_geom.is_valid else make_valid(oa_geom)
|
|
remaining = valid_oa.difference(all_claimed)
|
|
if not remaining.is_valid:
|
|
remaining = make_valid(remaining)
|
|
else:
|
|
remaining = oa_geom if oa_geom.is_valid else make_valid(oa_geom)
|
|
|
|
# Distribute remaining area via Voronoi
|
|
if not remaining.is_empty and remaining.area > 0.01:
|
|
voronoi_result = compute_voronoi_regions(points, postcodes, remaining)
|
|
else:
|
|
voronoi_result = {}
|
|
|
|
# Combine claimed + voronoi
|
|
result: dict[str, list] = defaultdict(list)
|
|
for pc, geom in claimed.items():
|
|
result[pc].append(geom)
|
|
for pc, geom in voronoi_result.items():
|
|
result[pc].append(geom)
|
|
|
|
fragments = []
|
|
for pc, parts in result.items():
|
|
merged = unary_union(parts)
|
|
if not merged.is_empty:
|
|
if not merged.is_valid:
|
|
merged = make_valid(merged)
|
|
merged = _extract_polygonal(merged)
|
|
if merged is not None:
|
|
fragments.append((pc, merged))
|
|
|
|
return fragments
|
|
|
|
|
|
def _extract_polygonal(geom) -> Polygon | MultiPolygon | None:
|
|
"""Extract only Polygon/MultiPolygon parts from a geometry.
|
|
|
|
make_valid can produce GeometryCollections containing lines and points;
|
|
this strips those away and returns only the polygonal component.
|
|
"""
|
|
if geom.geom_type in ("Polygon", "MultiPolygon"):
|
|
return geom
|
|
if geom.geom_type == "GeometryCollection":
|
|
polys = [g for g in geom.geoms if g.geom_type in ("Polygon", "MultiPolygon")]
|
|
if not polys:
|
|
return None
|
|
if len(polys) == 1:
|
|
return polys[0]
|
|
return MultiPolygon(
|
|
[
|
|
p
|
|
for g in polys
|
|
for p in (g.geoms if g.geom_type == "MultiPolygon" else [g])
|
|
]
|
|
)
|
|
return None
|