import json import shutil from collections import defaultdict from pathlib import Path from pyproj import Transformer from shapely import make_valid, set_precision from shapely.errors import GEOSException from shapely.geometry import MultiPolygon, Polygon, mapping, shape from shapely.ops import transform as transform_geometry from shapely.ops import unary_union from tqdm import tqdm _to_wgs84 = None def _get_to_wgs84(): global _to_wgs84 if _to_wgs84 is None: _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) return _to_wgs84 def _largest_polygonal(geom) -> Polygon | None: if geom is None or geom.is_empty: return None if not geom.is_valid: geom = make_valid(geom) if geom.geom_type == "Polygon": return geom if geom.geom_type == "MultiPolygon": return max(geom.geoms, key=lambda g: g.area) if geom.geom_type == "GeometryCollection": polygons = [ polygon for part in geom.geoms if (polygon := _largest_polygonal(part)) is not None ] if polygons: return max(polygons, key=lambda g: g.area) return None def to_wgs84_geojson( geom: Polygon | MultiPolygon, tolerance: float = 1.0 ) -> dict | None: """Simplify geometry in BNG, convert to WGS84, return a valid GeoJSON dict. Validates the *serialized* GeoJSON dict (via a ``shape()`` round-trip), not just the intermediate Shapely object: coordinate snapping during serialization can otherwise leave a self-intersecting ring that only shows up once the feature is read back from disk. Any such geometry is repaired with ``make_valid`` before returning so written features are always valid. """ geom = _largest_polygonal(geom) if geom is None: return None simplified = geom.simplify(tolerance, preserve_topology=True) simplified = _largest_polygonal(simplified) if simplified is None: return None transformer = _get_to_wgs84() wgs84 = transform_geometry(transformer.transform, simplified) try: wgs84 = set_precision(wgs84, 0.000001, mode="valid_output") except GEOSException: # Precision snapping can fail on pathological geometries; fall back to a # plain validity repair without coordinate snapping. wgs84 = make_valid(wgs84) wgs84 = _largest_polygonal(wgs84) if wgs84 is None: return None geojson_dict = mapping(wgs84) # The geometry that actually reaches disk is the GeoJSON dict, so validate # *that* (not the pre-serialization object) and repair if needed. round_trip = shape(geojson_dict) if round_trip.is_empty or not round_trip.is_valid: round_trip = _largest_polygonal(make_valid(round_trip)) if round_trip is None or round_trip.is_empty: return None geojson_dict = mapping(round_trip) return geojson_dict def _fill_holes(geom): """Remove all interior rings (holes) from a polygon or multipolygon.""" if geom.geom_type == "Polygon": return Polygon(geom.exterior) elif geom.geom_type == "MultiPolygon": return MultiPolygon([Polygon(p.exterior) for p in geom.geoms]) return geom def _largest_polygon(geom): """Extract the largest polygon from a MultiPolygon.""" if geom.geom_type == "MultiPolygon": return max(geom.geoms, key=lambda g: g.area) return geom def merge_fragments( all_fragments: list[tuple[str, Polygon | MultiPolygon]], greenspace_tree=None, greenspace_geoms=None, ) -> dict[str, Polygon | MultiPolygon]: """Merge cross-OA fragments for postcodes spanning multiple OAs. Args: all_fragments: List of (postcode, geometry) pairs. greenspace_tree: Optional STRtree of park/water polygons. greenspace_geoms: Optional list of park/water geometries (indexed by tree). """ by_postcode: dict[str, list] = defaultdict(list) for pc, geom in all_fragments: by_postcode[pc].append(geom) merged = {} for pc, parts in by_postcode.items(): combined = unary_union(parts) if combined.is_empty: continue if not combined.is_valid: combined = make_valid(combined) # Close tiny gaps between adjacent OA boundary edges (float mismatches) if combined.geom_type == "MultiPolygon": combined = combined.buffer(5.0).buffer(-5.0) if not combined.is_valid: combined = make_valid(combined) # Postcodes are contiguous delivery routes — keep only the largest # polygon; small detached fragments are algorithm artifacts combined = _largest_polygon(combined) # Remove artifact interior holes from INSPIRE+Voronoi+make_valid chain combined = _fill_holes(combined) # Subtract parks/water if provided if greenspace_tree is not None and greenspace_geoms is not None: from .greenspace import subtract_greenspace pre_green = combined combined = subtract_greenspace(combined, greenspace_tree, greenspace_geoms) combined = _largest_polygon(combined) # Do NOT _fill_holes here: interior holes carved by the greenspace # subtraction (lakes, enclosed parks) are intentional, not artifacts. # Filling them would re-add the removed area and negate the # subtraction. Artifact holes from the INSPIRE+Voronoi+make_valid # chain were already removed by the _fill_holes above (pre-subtraction). # Revert if subtraction + fragment selection lost >90% of area if pre_green.area > 0 and combined.area / pre_green.area < 0.1: combined = pre_green merged[pc] = combined return merged def write_district_geojson( postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path ) -> int: """Group postcodes by district, write GeoJSON files. Returns file count.""" units_dir = output_dir / "units" tmp_units_dir = output_dir / "units.tmp" output_dir.mkdir(parents=True, exist_ok=True) if tmp_units_dir.exists(): shutil.rmtree(tmp_units_dir) tmp_units_dir.mkdir(parents=True) by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) for pc, geom in postcodes.items(): parts = pc.split() district = parts[0] if parts else pc[:4] by_district[district].append((pc, geom)) file_count = 0 seen_postcodes: set[str] = set() for district, entries in tqdm( sorted(by_district.items()), desc="Writing GeoJSON", unit="file" ): features = [] for pc, geom in sorted(entries, key=lambda x: x[0]): if pc in seen_postcodes: raise ValueError(f"Duplicate postcode boundary feature: {pc}") seen_postcodes.add(pc) geojson_geom = to_wgs84_geojson(geom) if geojson_geom is None: raise ValueError(f"Postcode boundary collapsed to empty geometry: {pc}") written_geom = shape(geojson_geom) if written_geom.is_empty or not written_geom.is_valid: raise ValueError( f"Invalid postcode boundary geometry after output: {pc}" ) mapit_code = pc.replace(" ", "") features.append( { "type": "Feature", "geometry": geojson_geom, "properties": { "postcodes": pc, "mapit_code": mapit_code, }, } ) if not features: continue collection = {"type": "FeatureCollection", "features": features} out_path = tmp_units_dir / f"{district}.geojson" with open(out_path, "w") as f: json.dump(collection, f, separators=(",", ":")) file_count += 1 if units_dir.exists(): shutil.rmtree(units_dir) tmp_units_dir.replace(units_dir) return file_count