128 lines
4.3 KiB
Python
128 lines
4.3 KiB
Python
from collections import defaultdict
|
|
|
|
import numpy as np
|
|
from scipy.spatial import Voronoi
|
|
from scipy.spatial.qhull import QhullError
|
|
from shapely import make_valid
|
|
from shapely.geometry import MultiPolygon, Polygon
|
|
from shapely.ops import unary_union
|
|
|
|
|
|
def compute_voronoi_regions(
|
|
points: np.ndarray, postcodes: list[str], boundary: Polygon | MultiPolygon
|
|
) -> dict[str, Polygon | MultiPolygon]:
|
|
"""Compute Voronoi regions for points, clipped to boundary, grouped by postcode."""
|
|
if len(points) == 0:
|
|
return {}
|
|
if len(points) == 1:
|
|
return {postcodes[0]: boundary}
|
|
|
|
# UPRN coordinates are int64 (BNG grid refs in whole metres).
|
|
# Convert to float64 so sub-metre jitter isn't truncated.
|
|
points = points.astype(np.float64)
|
|
|
|
# Deduplicate points, keeping one per (location, postcode) pair.
|
|
# Multiple postcodes at the same coordinate each get their own point,
|
|
# jittered by a tiny offset (0.01m) so Voronoi can distinguish them.
|
|
seen: dict[tuple[float, float, str], bool] = {}
|
|
unique_pts = []
|
|
unique_pcs = []
|
|
coord_counts: dict[tuple[float, float], int] = defaultdict(int)
|
|
for i in range(len(points)):
|
|
coord = (points[i, 0], points[i, 1])
|
|
key = (coord[0], coord[1], postcodes[i])
|
|
if key not in seen:
|
|
seen[key] = True
|
|
jitter_idx = coord_counts[coord]
|
|
coord_counts[coord] += 1
|
|
if jitter_idx == 0:
|
|
unique_pts.append(points[i].copy())
|
|
else:
|
|
# Tiny jitter so Voronoi sees distinct points (0.01m per step)
|
|
jittered = points[i].copy()
|
|
angle = (
|
|
2 * np.pi * jitter_idx / max(coord_counts[coord], jitter_idx + 1)
|
|
)
|
|
jittered[0] += 0.01 * np.cos(angle)
|
|
jittered[1] += 0.01 * np.sin(angle)
|
|
unique_pts.append(jittered)
|
|
unique_pcs.append(postcodes[i])
|
|
|
|
if len(unique_pts) == 1:
|
|
return {unique_pcs[0]: boundary}
|
|
|
|
pts = np.array(unique_pts)
|
|
min_e, min_n = pts.min(axis=0)
|
|
max_e, max_n = pts.max(axis=0)
|
|
span = max(max_e - min_e, max_n - min_n, 100)
|
|
|
|
dummy = np.array(
|
|
[
|
|
[min_e - span * 10, min_n - span * 10],
|
|
[max_e + span * 10, min_n - span * 10],
|
|
[min_e - span * 10, max_n + span * 10],
|
|
[max_e + span * 10, max_n + span * 10],
|
|
]
|
|
)
|
|
all_points = np.vstack([pts, dummy])
|
|
|
|
try:
|
|
vor = Voronoi(all_points)
|
|
except (ValueError, QhullError):
|
|
# Fallback: split boundary equally among all postcodes present
|
|
all_pcs = list(dict.fromkeys(unique_pcs))
|
|
if len(all_pcs) == 1:
|
|
return {all_pcs[0]: boundary}
|
|
return _equal_split_fallback(all_pcs, boundary)
|
|
|
|
n_real = len(pts)
|
|
pc_polys: dict[str, list[Polygon]] = defaultdict(list)
|
|
|
|
if not boundary.is_valid:
|
|
boundary = make_valid(boundary)
|
|
|
|
for i in range(n_real):
|
|
region_idx = vor.point_region[i]
|
|
region = vor.regions[region_idx]
|
|
if -1 in region or len(region) < 3:
|
|
continue
|
|
vertices = vor.vertices[region]
|
|
poly = Polygon(vertices)
|
|
if not poly.is_valid:
|
|
poly = make_valid(poly)
|
|
clipped = poly.intersection(boundary)
|
|
if not clipped.is_empty:
|
|
pc_polys[unique_pcs[i]].append(clipped)
|
|
|
|
result = {}
|
|
for pc, parts in pc_polys.items():
|
|
merged = unary_union(parts)
|
|
if not merged.is_empty:
|
|
result[pc] = merged
|
|
return result
|
|
|
|
|
|
def _equal_split_fallback(
|
|
postcodes: list[str], boundary: Polygon | MultiPolygon
|
|
) -> dict[str, Polygon | MultiPolygon]:
|
|
"""Split boundary into roughly equal horizontal strips, one per postcode."""
|
|
if not boundary.is_valid:
|
|
boundary = make_valid(boundary)
|
|
min_x, min_y, max_x, max_y = boundary.bounds
|
|
n = len(postcodes)
|
|
result = {}
|
|
for i, pc in enumerate(postcodes):
|
|
strip_min_y = min_y + (max_y - min_y) * i / n
|
|
strip_max_y = min_y + (max_y - min_y) * (i + 1) / n
|
|
strip = Polygon(
|
|
[
|
|
(min_x, strip_min_y),
|
|
(max_x, strip_min_y),
|
|
(max_x, strip_max_y),
|
|
(min_x, strip_max_y),
|
|
]
|
|
)
|
|
clipped = boundary.intersection(strip)
|
|
if not clipped.is_empty:
|
|
result[pc] = clipped
|
|
return result
|