import json import shutil from collections import defaultdict from pathlib import Path from pyproj import Transformer from shapely import make_valid, set_precision from shapely.geometry import MultiPolygon, Polygon, mapping, shape from shapely.ops import transform as transform_geometry from shapely.ops import unary_union from tqdm import tqdm _to_wgs84 = None def _get_to_wgs84(): global _to_wgs84 if _to_wgs84 is None: _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) return _to_wgs84 def _largest_polygonal(geom) -> Polygon | None: if geom is None or geom.is_empty: return None if not geom.is_valid: geom = make_valid(geom) if geom.geom_type == "Polygon": return geom if geom.geom_type == "MultiPolygon": return max(geom.geoms, key=lambda g: g.area) if geom.geom_type == "GeometryCollection": polygons = [ polygon for part in geom.geoms if (polygon := _largest_polygonal(part)) is not None ] if polygons: return max(polygons, key=lambda g: g.area) return None def to_wgs84_geojson( geom: Polygon | MultiPolygon, tolerance: float = 1.0 ) -> dict | None: """Simplify geometry in BNG, convert to WGS84, return GeoJSON dict.""" geom = _largest_polygonal(geom) if geom is None: return None simplified = geom.simplify(tolerance, preserve_topology=True) simplified = _largest_polygonal(simplified) if simplified is None: return None transformer = _get_to_wgs84() wgs84 = transform_geometry(transformer.transform, simplified) wgs84 = set_precision(wgs84, 0.000001, mode="valid_output") wgs84 = _largest_polygonal(wgs84) if wgs84 is None: return None return mapping(wgs84) def _fill_holes(geom): """Remove all interior rings (holes) from a polygon or multipolygon.""" if geom.geom_type == "Polygon": return Polygon(geom.exterior) elif geom.geom_type == "MultiPolygon": return MultiPolygon([Polygon(p.exterior) for p in geom.geoms]) return geom def _largest_polygon(geom): """Extract the largest polygon from a MultiPolygon.""" if geom.geom_type == "MultiPolygon": return max(geom.geoms, key=lambda g: g.area) return geom def merge_fragments( all_fragments: list[tuple[str, Polygon | MultiPolygon]], greenspace_tree=None, greenspace_geoms=None, ) -> dict[str, Polygon | MultiPolygon]: """Merge cross-OA fragments for postcodes spanning multiple OAs. Args: all_fragments: List of (postcode, geometry) pairs. greenspace_tree: Optional STRtree of park/water polygons. greenspace_geoms: Optional list of park/water geometries (indexed by tree). """ by_postcode: dict[str, list] = defaultdict(list) for pc, geom in all_fragments: by_postcode[pc].append(geom) merged = {} for pc, parts in by_postcode.items(): combined = unary_union(parts) if combined.is_empty: continue if not combined.is_valid: combined = make_valid(combined) # Close tiny gaps between adjacent OA boundary edges (float mismatches) if combined.geom_type == "MultiPolygon": combined = combined.buffer(5.0).buffer(-5.0) if not combined.is_valid: combined = make_valid(combined) # Postcodes are contiguous delivery routes — keep only the largest # polygon; small detached fragments are algorithm artifacts combined = _largest_polygon(combined) # Remove artifact interior holes from INSPIRE+Voronoi+make_valid chain combined = _fill_holes(combined) # Subtract parks/water if provided if greenspace_tree is not None and greenspace_geoms is not None: from .greenspace import subtract_greenspace pre_green = combined combined = subtract_greenspace(combined, greenspace_tree, greenspace_geoms) combined = _largest_polygon(combined) combined = _fill_holes(combined) # Revert if subtraction + fragment selection lost >90% of area if pre_green.area > 0 and combined.area / pre_green.area < 0.1: combined = pre_green merged[pc] = combined return merged def write_district_geojson( postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path ) -> int: """Group postcodes by district, write GeoJSON files. Returns file count.""" units_dir = output_dir / "units" tmp_units_dir = output_dir / "units.tmp" output_dir.mkdir(parents=True, exist_ok=True) if tmp_units_dir.exists(): shutil.rmtree(tmp_units_dir) tmp_units_dir.mkdir(parents=True) by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) for pc, geom in postcodes.items(): parts = pc.split() district = parts[0] if parts else pc[:4] by_district[district].append((pc, geom)) file_count = 0 seen_postcodes: set[str] = set() for district, entries in tqdm( sorted(by_district.items()), desc="Writing GeoJSON", unit="file" ): features = [] for pc, geom in sorted(entries, key=lambda x: x[0]): if pc in seen_postcodes: raise ValueError(f"Duplicate postcode boundary feature: {pc}") seen_postcodes.add(pc) geojson_geom = to_wgs84_geojson(geom) if geojson_geom is None: raise ValueError(f"Postcode boundary collapsed to empty geometry: {pc}") written_geom = shape(geojson_geom) if written_geom.is_empty or not written_geom.is_valid: raise ValueError( f"Invalid postcode boundary geometry after output: {pc}" ) mapit_code = pc.replace(" ", "") features.append( { "type": "Feature", "geometry": geojson_geom, "properties": { "postcodes": pc, "mapit_code": mapit_code, }, } ) if not features: continue collection = {"type": "FeatureCollection", "features": features} out_path = tmp_units_dir / f"{district}.geojson" with open(out_path, "w") as f: json.dump(collection, f, separators=(",", ":")) file_count += 1 if units_dir.exists(): shutil.rmtree(units_dir) tmp_units_dir.replace(units_dir) return file_count