173 lines
5.9 KiB
Python
173 lines
5.9 KiB
Python
import json
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
from pyproj import Transformer
|
|
from shapely import make_valid
|
|
from shapely.geometry import MultiPolygon, Polygon
|
|
from shapely.ops import unary_union
|
|
from tqdm import tqdm
|
|
|
|
_to_wgs84 = None
|
|
|
|
|
|
def _get_to_wgs84():
|
|
global _to_wgs84
|
|
if _to_wgs84 is None:
|
|
_to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)
|
|
return _to_wgs84
|
|
|
|
|
|
def to_wgs84_geojson(
|
|
geom: Polygon | MultiPolygon, tolerance: float = 1.0
|
|
) -> dict | None:
|
|
"""Simplify geometry in BNG, convert to WGS84, return GeoJSON dict."""
|
|
if geom.is_empty:
|
|
return None
|
|
|
|
simplified = geom.simplify(tolerance, preserve_topology=True)
|
|
if simplified.is_empty:
|
|
return None
|
|
|
|
transformer = _get_to_wgs84()
|
|
|
|
def transform_ring(coords):
|
|
xs, ys = zip(*coords)
|
|
lons, lats = transformer.transform(list(xs), list(ys))
|
|
return [(round(lon, 6), round(lat, 6)) for lon, lat in zip(lons, lats)]
|
|
|
|
def transform_polygon(poly):
|
|
exterior = transform_ring(poly.exterior.coords)
|
|
holes = [transform_ring(h.coords) for h in poly.interiors]
|
|
return [exterior] + holes
|
|
|
|
# Force single Polygon — postcodes are contiguous delivery routes
|
|
if simplified.geom_type == "MultiPolygon":
|
|
simplified = max(simplified.geoms, key=lambda g: g.area)
|
|
elif simplified.geom_type == "GeometryCollection":
|
|
polys = [
|
|
g for g in simplified.geoms if g.geom_type in ("Polygon", "MultiPolygon")
|
|
]
|
|
if not polys:
|
|
return None
|
|
simplified = max(polys, key=lambda g: g.area)
|
|
if simplified.geom_type == "MultiPolygon":
|
|
simplified = max(simplified.geoms, key=lambda g: g.area)
|
|
|
|
if simplified.geom_type != "Polygon" or simplified.is_empty:
|
|
return None
|
|
|
|
return {
|
|
"type": "Polygon",
|
|
"coordinates": transform_polygon(simplified),
|
|
}
|
|
|
|
|
|
def _fill_holes(geom):
|
|
"""Remove all interior rings (holes) from a polygon or multipolygon."""
|
|
if geom.geom_type == "Polygon":
|
|
return Polygon(geom.exterior)
|
|
elif geom.geom_type == "MultiPolygon":
|
|
return MultiPolygon([Polygon(p.exterior) for p in geom.geoms])
|
|
return geom
|
|
|
|
|
|
def _largest_polygon(geom):
|
|
"""Extract the largest polygon from a MultiPolygon."""
|
|
if geom.geom_type == "MultiPolygon":
|
|
return max(geom.geoms, key=lambda g: g.area)
|
|
return geom
|
|
|
|
|
|
def merge_fragments(
|
|
all_fragments: list[tuple[str, Polygon | MultiPolygon]],
|
|
greenspace_tree=None,
|
|
greenspace_geoms=None,
|
|
) -> dict[str, Polygon | MultiPolygon]:
|
|
"""Merge cross-OA fragments for postcodes spanning multiple OAs.
|
|
|
|
Args:
|
|
all_fragments: List of (postcode, geometry) pairs.
|
|
greenspace_tree: Optional STRtree of park/water polygons.
|
|
greenspace_geoms: Optional list of park/water geometries (indexed by tree).
|
|
"""
|
|
by_postcode: dict[str, list] = defaultdict(list)
|
|
for pc, geom in all_fragments:
|
|
by_postcode[pc].append(geom)
|
|
|
|
merged = {}
|
|
for pc, parts in by_postcode.items():
|
|
combined = unary_union(parts)
|
|
if combined.is_empty:
|
|
continue
|
|
if not combined.is_valid:
|
|
combined = make_valid(combined)
|
|
# Close tiny gaps between adjacent OA boundary edges (float mismatches)
|
|
if combined.geom_type == "MultiPolygon":
|
|
combined = combined.buffer(5.0).buffer(-5.0)
|
|
if not combined.is_valid:
|
|
combined = make_valid(combined)
|
|
# Postcodes are contiguous delivery routes — keep only the largest
|
|
# polygon; small detached fragments are algorithm artifacts
|
|
combined = _largest_polygon(combined)
|
|
# Remove artifact interior holes from INSPIRE+Voronoi+make_valid chain
|
|
combined = _fill_holes(combined)
|
|
# Subtract parks/water if provided
|
|
if greenspace_tree is not None and greenspace_geoms is not None:
|
|
from .greenspace import subtract_greenspace
|
|
|
|
pre_green = combined
|
|
combined = subtract_greenspace(combined, greenspace_tree, greenspace_geoms)
|
|
combined = _largest_polygon(combined)
|
|
combined = _fill_holes(combined)
|
|
# Revert if subtraction + fragment selection lost >90% of area
|
|
if pre_green.area > 0 and combined.area / pre_green.area < 0.1:
|
|
combined = pre_green
|
|
merged[pc] = combined
|
|
return merged
|
|
|
|
|
|
def write_district_geojson(
|
|
postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path
|
|
) -> int:
|
|
"""Group postcodes by district, write GeoJSON files. Returns file count."""
|
|
units_dir = output_dir / "units"
|
|
units_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list)
|
|
for pc, geom in postcodes.items():
|
|
parts = pc.split()
|
|
district = parts[0] if parts else pc[:4]
|
|
by_district[district].append((pc, geom))
|
|
|
|
file_count = 0
|
|
for district, entries in tqdm(
|
|
sorted(by_district.items()), desc="Writing GeoJSON", unit="file"
|
|
):
|
|
features = []
|
|
for pc, geom in sorted(entries, key=lambda x: x[0]):
|
|
geojson_geom = to_wgs84_geojson(geom)
|
|
if geojson_geom is None:
|
|
continue
|
|
mapit_code = pc.replace(" ", "")
|
|
features.append(
|
|
{
|
|
"type": "Feature",
|
|
"geometry": geojson_geom,
|
|
"properties": {
|
|
"postcodes": pc,
|
|
"mapit_code": mapit_code,
|
|
},
|
|
}
|
|
)
|
|
|
|
if not features:
|
|
continue
|
|
|
|
collection = {"type": "FeatureCollection", "features": features}
|
|
out_path = units_dir / f"{district}.geojson"
|
|
with open(out_path, "w") as f:
|
|
json.dump(collection, f, separators=(",", ":"))
|
|
file_count += 1
|
|
|
|
return file_count
|